aio_overpass.client

Interface for making API calls.

  1"""Interface for making API calls."""
  2
  3import math
  4import re
  5from collections.abc import AsyncIterator
  6from contextlib import asynccontextmanager, suppress
  7from dataclasses import dataclass
  8from typing import Final
  9from urllib.parse import urljoin
 10
 11from aio_overpass import __version__
 12from aio_overpass._clock import sleep
 13from aio_overpass.error import (
 14    AlreadyRunningError,
 15    CallError,
 16    CallTimeoutError,
 17    ClientError,
 18    GiveupCause,
 19    GiveupError,
 20    RunnerError,
 21    _raise_for_request_error,
 22    _raise_for_response,
 23    _result_or_raise,
 24    is_too_many_queries,
 25)
 26from aio_overpass.query import DefaultQueryRunner, Query, QueryRunner
 27
 28import aiohttp
 29from aiohttp import ClientTimeout
 30
 31
 32__docformat__ = "google"
 33__all__ = (
 34    "Client",
 35    "Status",
 36    "DEFAULT_INSTANCE",
 37    "DEFAULT_USER_AGENT",
 38)
 39
 40
 41DEFAULT_INSTANCE: Final[str] = "https://overpass-api.de/api/"
 42"""Main Overpass API instance."""
 43
 44DEFAULT_USER_AGENT: Final[str] = (
 45    f"aio-overpass/{__version__} (https://github.com/timwie/aio-overpass)"
 46)
 47"""User agent that points to the ``aio-overpass`` repo."""
 48
 49
 50@dataclass(kw_only=True, slots=True)
 51class Status:
 52    """
 53    Information about the API server's rate limit.
 54
 55    Attributes:
 56        slots: The maximum number of concurrent queries per IP
 57               (or ``None`` if there is no rate limit).
 58        free_slots: The number of slots open for this IP
 59                    (or ``None`` if there is no rate limit).
 60        cooldown_secs: The number of seconds until a slot opens for this IP
 61                       (or 0 if there is a free slot).
 62        endpoint: Announced endpoint. For example, there are two distinct servers
 63                  that both can be reached by the main Overpass API instance.
 64                  Depending on server load, a query may be sent to either of them.
 65                  This value is the server name, f.e. ``"gall.openstreetmap.de/"``.
 66        nb_running_queries: Number of currently running queries for this IP.
 67    """
 68
 69    slots: int | None
 70    free_slots: int | None
 71    cooldown_secs: int
 72    endpoint: str | None
 73    nb_running_queries: int
 74
 75    def __repr__(self) -> str:
 76        rep = f"{type(self).__name__}(free_slots="
 77        rep += f"{self.free_slots}/{self.slots}" if self.slots else "∞"
 78        rep += f", cooldown={self.cooldown_secs}s"
 79        rep += f", running={self.nb_running_queries}"
 80        if self.endpoint:
 81            rep += f", endpoint={self.endpoint!r}"
 82        rep += ")"
 83        return rep
 84
 85
 86class Client:
 87    """
 88    A client for the Overpass API.
 89
 90    Requests are rate-limited according to the configured number of slots per IP for the specified
 91    API server. By default, queries are retried whenever the server is too busy, or the rate limit
 92    was exceeded. Custom query runners can be used to implement your own retry strategy.
 93
 94    Args:
 95        url: The url of an Overpass API instance. Defaults to the main Overpass API instance.
 96        user_agent: A string used for the User-Agent header. It is good practice to provide a string
 97                    that identifies your application, and includes a way to contact you (f.e. an
 98                    e-mail, or a link to a repository). This is important if you make too many
 99                    requests, or queries that require a lot of resources.
100        concurrency: The maximum number of simultaneous connections. In practice the amount
101                     of concurrent queries may be limited by the number of slots it provides for
102                     each IP.
103        status_timeout_secs: If set, status requests to the Overpass API will time out after
104                             this duration in seconds. Defaults to no timeout.
105        runner: You can provide another query runner if you want to implement your own retry
106                strategy.
107
108    References:
109        - https://wiki.openstreetmap.org/wiki/Overpass_API#Public_Overpass_API_instances
110    """
111
112    __slots__ = (
113        "_concurrency",
114        "_maybe_session",
115        "_runner",
116        "_status_timeout_secs",
117        "_url",
118        "_user_agent",
119    )
120
121    def __init__(
122        self,
123        url: str = DEFAULT_INSTANCE,
124        user_agent: str = DEFAULT_USER_AGENT,
125        concurrency: int = 32,
126        status_timeout_secs: float | None = None,
127        runner: QueryRunner | None = None,
128    ) -> None:
129        if concurrency <= 0:
130            msg = "'concurrency' must be > 0"
131            raise ValueError(msg)
132        if status_timeout_secs is not None and (
133            not math.isfinite(status_timeout_secs) or status_timeout_secs <= 0.0
134        ):
135            msg = "'status_timeout_secs' must be finite > 0"
136            raise ValueError(msg)
137
138        self._url: Final[str] = url
139        self._user_agent: Final[str] = user_agent
140        self._concurrency: Final[int] = concurrency
141        self._status_timeout_secs: Final[float | None] = status_timeout_secs
142        self._runner: Final[QueryRunner] = runner or DefaultQueryRunner()
143
144        self._maybe_session: aiohttp.ClientSession | None = None
145
146    def _session(self) -> aiohttp.ClientSession:
147        """The session used for all requests of this client."""
148        if not self._maybe_session or self._maybe_session.closed:
149            headers = {"User-Agent": self._user_agent}
150            connector = aiohttp.TCPConnector(limit=self._concurrency)
151            self._maybe_session = aiohttp.ClientSession(headers=headers, connector=connector)
152
153        return self._maybe_session
154
155    async def close(self) -> None:
156        """Cancel all running queries and close the underlying session."""
157        if self._maybe_session and not self._maybe_session.closed:
158            # do not care if this fails
159            with suppress(CallError):
160                _ = await self.cancel_queries()
161
162            # is raised when there are still active queries. that's ok
163            with suppress(aiohttp.ServerDisconnectedError):
164                await self._maybe_session.close()
165
166    async def _status(self, timeout: ClientTimeout | None = None) -> "Status":
167        endpoint = urljoin(self._url, "status")
168        timeout = timeout or aiohttp.ClientTimeout(total=self._status_timeout_secs)
169        async with (
170            _map_request_error(timeout),
171            self._session().get(url=endpoint, timeout=timeout) as response,
172        ):
173            return await _parse_status(response)
174
175    async def status(self) -> Status:
176        """
177        Check the current API status.
178
179        The timeout of this request is configured with the ``status_timeout_secs`` argument.
180
181        Raises:
182            ClientError: if the status could not be looked up
183        """
184        return await self._status()
185
186    async def cancel_queries(self, timeout_secs: float | None = None) -> int:
187        """
188        Cancel all running queries.
189
190        This can be used to terminate runaway queries that prevent you from sending new ones.
191
192        Returns:
193            the number of terminated queries
194
195        Raises:
196            ClientError: if the request to cancel queries failed
197        """
198        if timeout_secs is not None and (not math.isfinite(timeout_secs) or timeout_secs <= 0.0):
199            msg = "'timeout_secs' must be finite > 0"
200            raise ValueError(msg)
201
202        timeout = aiohttp.ClientTimeout(total=timeout_secs) if timeout_secs else None
203        headers = {"User-Agent": self._user_agent}
204        endpoint = urljoin(self._url, "kill_my_queries")
205
206        # use a new session here to get around our concurrency limit
207        async with (
208            aiohttp.ClientSession(headers=headers) as session,
209            _map_request_error(timeout),
210            session.get(endpoint, timeout=timeout) as response,
211        ):
212            body = await response.text()
213            killed_pids = re.findall("\\(pid (\\d+)\\)", body)
214            return len(set(killed_pids))
215
216    async def run_query(self, query: Query, *, raise_on_failure: bool = True) -> None:
217        """
218        Send a query to the API, and await its completion.
219
220        "Running" the query entails acquiring a connection from the pool, the query requests
221        themselves (which may be retried), status requests when the server is busy,
222        and cooldown periods.
223
224        The query runner is invoked before every try, and once after the last try.
225
226        To run multiple queries concurrently, wrap the returned coroutines in an ``asyncio`` task,
227        f.e. with ``asyncio.create_task()`` and subsequent ``asyncio.gather()``.
228
229        Args:
230            query: the query to run on this API instance
231            raise_on_failure: if ``True``, raises ``query.error`` if the query failed
232
233        Raises:
234            ClientError: when query or status requests fail. If the query was retried, the error
235                         of the last try will be raised. The same exception is also captured in
236                         ``query.error``. Raising can be prevented by setting ``raise_on_failure``
237                         to ``False``.
238            RunnerError: when a call to the query runner raises. This exception is raised
239                         even if ``raise_on_failure` is ``False``, since it is likely an error
240                         that is not just specific to this query.
241            AlreadyRunningError: when another ``run_query()`` call on this query has not finished
242                                 yet. This is not affected by ``raise_on_failure``.
243        """
244        if query.done:
245            return  # nothing to do
246
247        if not query._run_lock.acquire(blocking=False):
248            raise AlreadyRunningError(kwargs=query.kwargs)
249
250        try:
251            if query.nb_tries > 0:
252                query.reset()  # reset failed queries
253
254            # query runner is invoked before every try, and once after the last try
255            while True:
256                await self._invoke_runner(query, raise_on_failure=raise_on_failure)
257                if query.done:
258                    return
259                await self._try_query_once(query)
260        finally:
261            query._run_lock.release()
262
263    async def _invoke_runner(self, query: Query, *, raise_on_failure: bool) -> None:
264        """
265        Invoke the query runner.
266
267        Raises:
268            ClientError: if the runner raises ``query.error``
269            ValueError: if the runner raises a different ``ClientError`` than ``query.error``
270            RunnerError: if the runner raises any other exception (which it shouldn't)
271        """
272        try:
273            await self._runner(query)
274        except ClientError as err:
275            if err is not query.error:
276                msg = "query runner raised a ClientError other than 'query.error'"
277                raise ValueError(msg) from err
278            if raise_on_failure:
279                raise
280        except AssertionError:
281            raise
282        except BaseException as err:
283            raise RunnerError(cause=err) from err
284
285    async def _try_query_once(self, query: Query) -> None:
286        """A single iteration of running a query."""
287        query._begin_try()
288
289        try:
290            await self._cooldown(query)
291
292            req_timeout = _next_query_req_timeout(query)
293
294            # pick the timeout we will use for the next try
295            # TODO: not sure if this should also update the timeout setting in the Query state;
296            #   for now, pass it as parameter to the _code() function
297            next_timeout_secs = _next_timeout_secs(query)
298
299            data = query._code(next_timeout_secs)
300
301            query._begin_request()
302
303            query.logger.info(f"call api for {query}")
304
305            async with (
306                _map_request_error(req_timeout),
307                self._session().post(
308                    url=urljoin(self._url, "interpreter"),
309                    data=data,
310                    timeout=req_timeout,
311                ) as response,
312            ):
313                query._succeed_try(
314                    response=await _result_or_raise(response, query.kwargs, query.logger),
315                    response_bytes=response.content.total_bytes,
316                )
317
318        except CallTimeoutError as err:
319            fail_with: ClientError = err
320            if query.run_timeout_elapsed:
321                assert query.run_duration_secs is not None
322                fail_with = GiveupError(
323                    cause=GiveupCause.RUN_TIMEOUT_DURING_QUERY_CALL,
324                    kwargs=query.kwargs,
325                    after_secs=query.run_duration_secs,
326                )
327            query._fail_try(fail_with)
328
329        except ClientError as err:
330            query._fail_try(err)
331
332        finally:
333            query._end_try()
334
335    async def _cooldown(self, query: Query) -> None:
336        """
337        If the given query failed with ``TOO_MANY_QUERIES``, check for a cooldown period.
338
339        Raises:
340            ClientError: if the status request to find out the cooldown period fails
341            GiveupError: if the cooldown is longer than the remaining run duration
342        """
343        logger = query.logger
344
345        if not is_too_many_queries(query.error):
346            return
347
348        # If this client is running too many queries, we can check the status for a
349        # cooldown period. This request failing is a bit of an edge case.
350        # 'query.error' will be overwritten, which means we will not check for a
351        # cooldown in the next iteration.
352        status = await self._status(timeout=self._next_status_req_timeout(query))
353
354        if not status.cooldown_secs:
355            return
356
357        run_duration = query.run_duration_secs
358        assert run_duration is not None
359
360        if run_timeout := query.run_timeout_secs:
361            remaining = run_timeout - run_duration
362
363            if status.cooldown_secs > remaining:
364                logger.error(f"give up on {query} due to {status.cooldown_secs:.1f}s cooldown")
365                raise GiveupError(
366                    cause=GiveupCause.RUN_TIMEOUT_BY_COOLDOWN,
367                    kwargs=query.kwargs,
368                    after_secs=run_duration,
369                )
370
371        logger.info(f"{query} has cooldown for {status.cooldown_secs:.1f}s")
372        await sleep(status.cooldown_secs)
373
374    def _next_status_req_timeout(self, query: Query) -> aiohttp.ClientTimeout:
375        """Status request timeout; possibly limited by either the run or status timeout settings."""
376        remaining = None
377
378        run_duration = query.run_duration_secs
379        assert run_duration is not None
380
381        if run_timeout := query.run_timeout_secs:
382            remaining = run_timeout - run_duration
383
384            if remaining <= 0.0:
385                raise GiveupError(
386                    cause=GiveupCause.RUN_TIMEOUT_BEFORE_STATUS_CALL,
387                    kwargs=query.kwargs,
388                    after_secs=run_duration,
389                )
390
391            if self._status_timeout_secs:
392                remaining = min(remaining, self._status_timeout_secs)  # cap timeout if configured
393
394        return aiohttp.ClientTimeout(total=remaining)
395
396
397def _next_query_req_timeout(query: Query) -> aiohttp.ClientTimeout:
398    """Query request timeout; possibly limited by either the run or request timeout settings."""
399    run_total = None  # time left until "run_timeout_secs" exceeded
400    query_total = None  # "[timeout:*]" setting plus "total_without_query_secs"
401
402    run_duration = query.run_duration_secs
403    assert run_duration is not None
404
405    if run_timeout := query.run_timeout_secs:
406        run_total = run_timeout - run_duration
407        if run_total <= 0.0:
408            raise GiveupError(
409                cause=GiveupCause.RUN_TIMEOUT_BEFORE_QUERY_CALL,
410                kwargs=query.kwargs,
411                after_secs=run_duration,
412            )
413
414    if add_to_timeout_secs := query.request_timeout.total_without_query_secs:
415        query_total = float(query.timeout_secs) + add_to_timeout_secs
416        assert query_total > 0.0
417
418    total = min(run_total, query_total) if run_total and query_total else run_total or query_total
419
420    return aiohttp.ClientTimeout(
421        total=total,
422        connect=None,
423        sock_connect=query.request_timeout.sock_connect_secs,
424        sock_read=query.request_timeout.each_sock_read_secs,
425    )
426
427
428async def _parse_status(response: aiohttp.ClientResponse) -> Status:
429    """Parses an /api/status response."""
430    text = await response.text()
431
432    slots: int | None = 0
433    free_slots = None
434    cooldown_secs = 0
435    endpoint = None
436    nb_running_queries = 0
437
438    match_slots_overall = re.findall(r"Rate limit: (\d+)", text)
439    match_slots_available = re.findall(r"(\d+) slots available now", text)
440    match_cooldowns = re.findall(r"Slot available after: .+, in (\d+) seconds", text)
441    match_endpoint = re.findall(r"Announced endpoint: (.+)", text)
442    match_running_queries = re.findall(r"\d+\t\d+\t\d+\t\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z", text)
443
444    try:
445        (slots_str,) = match_slots_overall
446        slots = int(slots_str) or None
447
448        endpoint = match_endpoint[0] if match_endpoint else None
449        endpoint = None if endpoint == "none" else endpoint
450
451        nb_running_queries = len(match_running_queries)
452
453        if slots:
454            cooldowns = [int(secs) for secs in match_cooldowns]
455
456            if match_slots_available:
457                free_slots_str = match_slots_available[0]
458                free_slots = int(free_slots_str)
459            else:
460                free_slots = slots - len(cooldowns)
461
462            cooldown_secs = 0 if free_slots > 0 else min(cooldowns)
463    except ValueError as err:
464        await _raise_for_response(response, cause=err)
465
466    return Status(
467        slots=slots,
468        free_slots=free_slots,
469        cooldown_secs=cooldown_secs,
470        endpoint=endpoint,
471        nb_running_queries=nb_running_queries,
472    )
473
474
475@asynccontextmanager
476async def _map_request_error(
477    timeout: ClientTimeout | None = None,
478) -> AsyncIterator[None]:
479    """Context to make requests in; maps errors to our exception types."""
480    try:
481        yield
482    except aiohttp.ClientError as err:
483        await _raise_for_request_error(err)
484    except TimeoutError as err:
485        assert timeout is not None
486        assert timeout.total
487        raise CallTimeoutError(cause=err, after_secs=timeout.total) from err
488
489
490def _next_timeout_secs(query: Query) -> int:
491    """
492    Decide the ``[timeout:*]`` setting for the given query's next try.
493
494     - pick ``Query.timeout_secs``
495     - cap it by ``Query.run_timeout_secs``, and the time that has elapsed so far
496     - if a previous try was cancelled with less or equal than the chosen time,
497       raise without trying again
498
499    Raises:
500        GiveupError: when the run timeout elapsed, or when a previous try timed out with
501                     a lower timeout setting
502    """
503    timeout_secs = query.timeout_secs
504
505    if (at_most := query._run_duration_left_secs) and at_most < timeout_secs:
506        query._logger.info(f"{query} has {at_most}s left to run")
507        at_most = math.floor(at_most)
508        query._logger.info(f"adjust {query} [timeout:{timeout_secs}] to [timeout:{at_most}]")
509        timeout_secs = at_most
510    else:
511        query._logger.info(f"{query} will use [timeout:{timeout_secs}]")
512
513    not_enough = query._max_timed_out_after_secs or 0
514
515    if timeout_secs <= not_enough:
516        query._logger.error(f"{query} previously timed out with [timeout:{not_enough}] - give up")
517        raise GiveupError(
518            cause=GiveupCause.EXPECTING_QUERY_TIMEOUT,
519            kwargs=query.kwargs,
520            after_secs=query.run_duration_secs or 0.0,
521        )
522
523    return timeout_secs
class Client:
 87class Client:
 88    """
 89    A client for the Overpass API.
 90
 91    Requests are rate-limited according to the configured number of slots per IP for the specified
 92    API server. By default, queries are retried whenever the server is too busy, or the rate limit
 93    was exceeded. Custom query runners can be used to implement your own retry strategy.
 94
 95    Args:
 96        url: The url of an Overpass API instance. Defaults to the main Overpass API instance.
 97        user_agent: A string used for the User-Agent header. It is good practice to provide a string
 98                    that identifies your application, and includes a way to contact you (f.e. an
 99                    e-mail, or a link to a repository). This is important if you make too many
100                    requests, or queries that require a lot of resources.
101        concurrency: The maximum number of simultaneous connections. In practice the amount
102                     of concurrent queries may be limited by the number of slots it provides for
103                     each IP.
104        status_timeout_secs: If set, status requests to the Overpass API will time out after
105                             this duration in seconds. Defaults to no timeout.
106        runner: You can provide another query runner if you want to implement your own retry
107                strategy.
108
109    References:
110        - https://wiki.openstreetmap.org/wiki/Overpass_API#Public_Overpass_API_instances
111    """
112
113    __slots__ = (
114        "_concurrency",
115        "_maybe_session",
116        "_runner",
117        "_status_timeout_secs",
118        "_url",
119        "_user_agent",
120    )
121
122    def __init__(
123        self,
124        url: str = DEFAULT_INSTANCE,
125        user_agent: str = DEFAULT_USER_AGENT,
126        concurrency: int = 32,
127        status_timeout_secs: float | None = None,
128        runner: QueryRunner | None = None,
129    ) -> None:
130        if concurrency <= 0:
131            msg = "'concurrency' must be > 0"
132            raise ValueError(msg)
133        if status_timeout_secs is not None and (
134            not math.isfinite(status_timeout_secs) or status_timeout_secs <= 0.0
135        ):
136            msg = "'status_timeout_secs' must be finite > 0"
137            raise ValueError(msg)
138
139        self._url: Final[str] = url
140        self._user_agent: Final[str] = user_agent
141        self._concurrency: Final[int] = concurrency
142        self._status_timeout_secs: Final[float | None] = status_timeout_secs
143        self._runner: Final[QueryRunner] = runner or DefaultQueryRunner()
144
145        self._maybe_session: aiohttp.ClientSession | None = None
146
147    def _session(self) -> aiohttp.ClientSession:
148        """The session used for all requests of this client."""
149        if not self._maybe_session or self._maybe_session.closed:
150            headers = {"User-Agent": self._user_agent}
151            connector = aiohttp.TCPConnector(limit=self._concurrency)
152            self._maybe_session = aiohttp.ClientSession(headers=headers, connector=connector)
153
154        return self._maybe_session
155
156    async def close(self) -> None:
157        """Cancel all running queries and close the underlying session."""
158        if self._maybe_session and not self._maybe_session.closed:
159            # do not care if this fails
160            with suppress(CallError):
161                _ = await self.cancel_queries()
162
163            # is raised when there are still active queries. that's ok
164            with suppress(aiohttp.ServerDisconnectedError):
165                await self._maybe_session.close()
166
167    async def _status(self, timeout: ClientTimeout | None = None) -> "Status":
168        endpoint = urljoin(self._url, "status")
169        timeout = timeout or aiohttp.ClientTimeout(total=self._status_timeout_secs)
170        async with (
171            _map_request_error(timeout),
172            self._session().get(url=endpoint, timeout=timeout) as response,
173        ):
174            return await _parse_status(response)
175
176    async def status(self) -> Status:
177        """
178        Check the current API status.
179
180        The timeout of this request is configured with the ``status_timeout_secs`` argument.
181
182        Raises:
183            ClientError: if the status could not be looked up
184        """
185        return await self._status()
186
187    async def cancel_queries(self, timeout_secs: float | None = None) -> int:
188        """
189        Cancel all running queries.
190
191        This can be used to terminate runaway queries that prevent you from sending new ones.
192
193        Returns:
194            the number of terminated queries
195
196        Raises:
197            ClientError: if the request to cancel queries failed
198        """
199        if timeout_secs is not None and (not math.isfinite(timeout_secs) or timeout_secs <= 0.0):
200            msg = "'timeout_secs' must be finite > 0"
201            raise ValueError(msg)
202
203        timeout = aiohttp.ClientTimeout(total=timeout_secs) if timeout_secs else None
204        headers = {"User-Agent": self._user_agent}
205        endpoint = urljoin(self._url, "kill_my_queries")
206
207        # use a new session here to get around our concurrency limit
208        async with (
209            aiohttp.ClientSession(headers=headers) as session,
210            _map_request_error(timeout),
211            session.get(endpoint, timeout=timeout) as response,
212        ):
213            body = await response.text()
214            killed_pids = re.findall("\\(pid (\\d+)\\)", body)
215            return len(set(killed_pids))
216
217    async def run_query(self, query: Query, *, raise_on_failure: bool = True) -> None:
218        """
219        Send a query to the API, and await its completion.
220
221        "Running" the query entails acquiring a connection from the pool, the query requests
222        themselves (which may be retried), status requests when the server is busy,
223        and cooldown periods.
224
225        The query runner is invoked before every try, and once after the last try.
226
227        To run multiple queries concurrently, wrap the returned coroutines in an ``asyncio`` task,
228        f.e. with ``asyncio.create_task()`` and subsequent ``asyncio.gather()``.
229
230        Args:
231            query: the query to run on this API instance
232            raise_on_failure: if ``True``, raises ``query.error`` if the query failed
233
234        Raises:
235            ClientError: when query or status requests fail. If the query was retried, the error
236                         of the last try will be raised. The same exception is also captured in
237                         ``query.error``. Raising can be prevented by setting ``raise_on_failure``
238                         to ``False``.
239            RunnerError: when a call to the query runner raises. This exception is raised
240                         even if ``raise_on_failure` is ``False``, since it is likely an error
241                         that is not just specific to this query.
242            AlreadyRunningError: when another ``run_query()`` call on this query has not finished
243                                 yet. This is not affected by ``raise_on_failure``.
244        """
245        if query.done:
246            return  # nothing to do
247
248        if not query._run_lock.acquire(blocking=False):
249            raise AlreadyRunningError(kwargs=query.kwargs)
250
251        try:
252            if query.nb_tries > 0:
253                query.reset()  # reset failed queries
254
255            # query runner is invoked before every try, and once after the last try
256            while True:
257                await self._invoke_runner(query, raise_on_failure=raise_on_failure)
258                if query.done:
259                    return
260                await self._try_query_once(query)
261        finally:
262            query._run_lock.release()
263
264    async def _invoke_runner(self, query: Query, *, raise_on_failure: bool) -> None:
265        """
266        Invoke the query runner.
267
268        Raises:
269            ClientError: if the runner raises ``query.error``
270            ValueError: if the runner raises a different ``ClientError`` than ``query.error``
271            RunnerError: if the runner raises any other exception (which it shouldn't)
272        """
273        try:
274            await self._runner(query)
275        except ClientError as err:
276            if err is not query.error:
277                msg = "query runner raised a ClientError other than 'query.error'"
278                raise ValueError(msg) from err
279            if raise_on_failure:
280                raise
281        except AssertionError:
282            raise
283        except BaseException as err:
284            raise RunnerError(cause=err) from err
285
286    async def _try_query_once(self, query: Query) -> None:
287        """A single iteration of running a query."""
288        query._begin_try()
289
290        try:
291            await self._cooldown(query)
292
293            req_timeout = _next_query_req_timeout(query)
294
295            # pick the timeout we will use for the next try
296            # TODO: not sure if this should also update the timeout setting in the Query state;
297            #   for now, pass it as parameter to the _code() function
298            next_timeout_secs = _next_timeout_secs(query)
299
300            data = query._code(next_timeout_secs)
301
302            query._begin_request()
303
304            query.logger.info(f"call api for {query}")
305
306            async with (
307                _map_request_error(req_timeout),
308                self._session().post(
309                    url=urljoin(self._url, "interpreter"),
310                    data=data,
311                    timeout=req_timeout,
312                ) as response,
313            ):
314                query._succeed_try(
315                    response=await _result_or_raise(response, query.kwargs, query.logger),
316                    response_bytes=response.content.total_bytes,
317                )
318
319        except CallTimeoutError as err:
320            fail_with: ClientError = err
321            if query.run_timeout_elapsed:
322                assert query.run_duration_secs is not None
323                fail_with = GiveupError(
324                    cause=GiveupCause.RUN_TIMEOUT_DURING_QUERY_CALL,
325                    kwargs=query.kwargs,
326                    after_secs=query.run_duration_secs,
327                )
328            query._fail_try(fail_with)
329
330        except ClientError as err:
331            query._fail_try(err)
332
333        finally:
334            query._end_try()
335
336    async def _cooldown(self, query: Query) -> None:
337        """
338        If the given query failed with ``TOO_MANY_QUERIES``, check for a cooldown period.
339
340        Raises:
341            ClientError: if the status request to find out the cooldown period fails
342            GiveupError: if the cooldown is longer than the remaining run duration
343        """
344        logger = query.logger
345
346        if not is_too_many_queries(query.error):
347            return
348
349        # If this client is running too many queries, we can check the status for a
350        # cooldown period. This request failing is a bit of an edge case.
351        # 'query.error' will be overwritten, which means we will not check for a
352        # cooldown in the next iteration.
353        status = await self._status(timeout=self._next_status_req_timeout(query))
354
355        if not status.cooldown_secs:
356            return
357
358        run_duration = query.run_duration_secs
359        assert run_duration is not None
360
361        if run_timeout := query.run_timeout_secs:
362            remaining = run_timeout - run_duration
363
364            if status.cooldown_secs > remaining:
365                logger.error(f"give up on {query} due to {status.cooldown_secs:.1f}s cooldown")
366                raise GiveupError(
367                    cause=GiveupCause.RUN_TIMEOUT_BY_COOLDOWN,
368                    kwargs=query.kwargs,
369                    after_secs=run_duration,
370                )
371
372        logger.info(f"{query} has cooldown for {status.cooldown_secs:.1f}s")
373        await sleep(status.cooldown_secs)
374
375    def _next_status_req_timeout(self, query: Query) -> aiohttp.ClientTimeout:
376        """Status request timeout; possibly limited by either the run or status timeout settings."""
377        remaining = None
378
379        run_duration = query.run_duration_secs
380        assert run_duration is not None
381
382        if run_timeout := query.run_timeout_secs:
383            remaining = run_timeout - run_duration
384
385            if remaining <= 0.0:
386                raise GiveupError(
387                    cause=GiveupCause.RUN_TIMEOUT_BEFORE_STATUS_CALL,
388                    kwargs=query.kwargs,
389                    after_secs=run_duration,
390                )
391
392            if self._status_timeout_secs:
393                remaining = min(remaining, self._status_timeout_secs)  # cap timeout if configured
394
395        return aiohttp.ClientTimeout(total=remaining)

A client for the Overpass API.

Requests are rate-limited according to the configured number of slots per IP for the specified API server. By default, queries are retried whenever the server is too busy, or the rate limit was exceeded. Custom query runners can be used to implement your own retry strategy.

Arguments:
  • url: The url of an Overpass API instance. Defaults to the main Overpass API instance.
  • user_agent: A string used for the User-Agent header. It is good practice to provide a string that identifies your application, and includes a way to contact you (f.e. an e-mail, or a link to a repository). This is important if you make too many requests, or queries that require a lot of resources.
  • concurrency: The maximum number of simultaneous connections. In practice the amount of concurrent queries may be limited by the number of slots it provides for each IP.
  • status_timeout_secs: If set, status requests to the Overpass API will time out after this duration in seconds. Defaults to no timeout.
  • runner: You can provide another query runner if you want to implement your own retry strategy.
References:
Client( url: str = 'https://overpass-api.de/api/', user_agent: str = 'aio-overpass/0.15.0 (https://github.com/timwie/aio-overpass)', concurrency: int = 32, status_timeout_secs: float | None = None, runner: aio_overpass.query.QueryRunner | None = None)
122    def __init__(
123        self,
124        url: str = DEFAULT_INSTANCE,
125        user_agent: str = DEFAULT_USER_AGENT,
126        concurrency: int = 32,
127        status_timeout_secs: float | None = None,
128        runner: QueryRunner | None = None,
129    ) -> None:
130        if concurrency <= 0:
131            msg = "'concurrency' must be > 0"
132            raise ValueError(msg)
133        if status_timeout_secs is not None and (
134            not math.isfinite(status_timeout_secs) or status_timeout_secs <= 0.0
135        ):
136            msg = "'status_timeout_secs' must be finite > 0"
137            raise ValueError(msg)
138
139        self._url: Final[str] = url
140        self._user_agent: Final[str] = user_agent
141        self._concurrency: Final[int] = concurrency
142        self._status_timeout_secs: Final[float | None] = status_timeout_secs
143        self._runner: Final[QueryRunner] = runner or DefaultQueryRunner()
144
145        self._maybe_session: aiohttp.ClientSession | None = None
async def close(self) -> None:
156    async def close(self) -> None:
157        """Cancel all running queries and close the underlying session."""
158        if self._maybe_session and not self._maybe_session.closed:
159            # do not care if this fails
160            with suppress(CallError):
161                _ = await self.cancel_queries()
162
163            # is raised when there are still active queries. that's ok
164            with suppress(aiohttp.ServerDisconnectedError):
165                await self._maybe_session.close()

Cancel all running queries and close the underlying session.

async def status(self) -> Status:
176    async def status(self) -> Status:
177        """
178        Check the current API status.
179
180        The timeout of this request is configured with the ``status_timeout_secs`` argument.
181
182        Raises:
183            ClientError: if the status could not be looked up
184        """
185        return await self._status()

Check the current API status.

The timeout of this request is configured with the status_timeout_secs argument.

Raises:
  • ClientError: if the status could not be looked up
async def cancel_queries(self, timeout_secs: float | None = None) -> int:
187    async def cancel_queries(self, timeout_secs: float | None = None) -> int:
188        """
189        Cancel all running queries.
190
191        This can be used to terminate runaway queries that prevent you from sending new ones.
192
193        Returns:
194            the number of terminated queries
195
196        Raises:
197            ClientError: if the request to cancel queries failed
198        """
199        if timeout_secs is not None and (not math.isfinite(timeout_secs) or timeout_secs <= 0.0):
200            msg = "'timeout_secs' must be finite > 0"
201            raise ValueError(msg)
202
203        timeout = aiohttp.ClientTimeout(total=timeout_secs) if timeout_secs else None
204        headers = {"User-Agent": self._user_agent}
205        endpoint = urljoin(self._url, "kill_my_queries")
206
207        # use a new session here to get around our concurrency limit
208        async with (
209            aiohttp.ClientSession(headers=headers) as session,
210            _map_request_error(timeout),
211            session.get(endpoint, timeout=timeout) as response,
212        ):
213            body = await response.text()
214            killed_pids = re.findall("\\(pid (\\d+)\\)", body)
215            return len(set(killed_pids))

Cancel all running queries.

This can be used to terminate runaway queries that prevent you from sending new ones.

Returns:

the number of terminated queries

Raises:
  • ClientError: if the request to cancel queries failed
async def run_query( self, query: aio_overpass.Query, *, raise_on_failure: bool = True) -> None:
217    async def run_query(self, query: Query, *, raise_on_failure: bool = True) -> None:
218        """
219        Send a query to the API, and await its completion.
220
221        "Running" the query entails acquiring a connection from the pool, the query requests
222        themselves (which may be retried), status requests when the server is busy,
223        and cooldown periods.
224
225        The query runner is invoked before every try, and once after the last try.
226
227        To run multiple queries concurrently, wrap the returned coroutines in an ``asyncio`` task,
228        f.e. with ``asyncio.create_task()`` and subsequent ``asyncio.gather()``.
229
230        Args:
231            query: the query to run on this API instance
232            raise_on_failure: if ``True``, raises ``query.error`` if the query failed
233
234        Raises:
235            ClientError: when query or status requests fail. If the query was retried, the error
236                         of the last try will be raised. The same exception is also captured in
237                         ``query.error``. Raising can be prevented by setting ``raise_on_failure``
238                         to ``False``.
239            RunnerError: when a call to the query runner raises. This exception is raised
240                         even if ``raise_on_failure` is ``False``, since it is likely an error
241                         that is not just specific to this query.
242            AlreadyRunningError: when another ``run_query()`` call on this query has not finished
243                                 yet. This is not affected by ``raise_on_failure``.
244        """
245        if query.done:
246            return  # nothing to do
247
248        if not query._run_lock.acquire(blocking=False):
249            raise AlreadyRunningError(kwargs=query.kwargs)
250
251        try:
252            if query.nb_tries > 0:
253                query.reset()  # reset failed queries
254
255            # query runner is invoked before every try, and once after the last try
256            while True:
257                await self._invoke_runner(query, raise_on_failure=raise_on_failure)
258                if query.done:
259                    return
260                await self._try_query_once(query)
261        finally:
262            query._run_lock.release()

Send a query to the API, and await its completion.

"Running" the query entails acquiring a connection from the pool, the query requests themselves (which may be retried), status requests when the server is busy, and cooldown periods.

The query runner is invoked before every try, and once after the last try.

To run multiple queries concurrently, wrap the returned coroutines in an asyncio task, f.e. with asyncio.create_task() and subsequent asyncio.gather().

Arguments:
  • query: the query to run on this API instance
  • raise_on_failure: if True, raises query.error if the query failed
Raises:
  • ClientError: when query or status requests fail. If the query was retried, the error of the last try will be raised. The same exception is also captured in query.error. Raising can be prevented by setting raise_on_failure to False.
  • RunnerError: when a call to the query runner raises. This exception is raised even if raise_on_failure` isFalse``, since it is likely an error that is not just specific to this query.
  • AlreadyRunningError: when another run_query() call on this query has not finished yet. This is not affected by raise_on_failure.
@dataclass(kw_only=True, slots=True)
class Status:
51@dataclass(kw_only=True, slots=True)
52class Status:
53    """
54    Information about the API server's rate limit.
55
56    Attributes:
57        slots: The maximum number of concurrent queries per IP
58               (or ``None`` if there is no rate limit).
59        free_slots: The number of slots open for this IP
60                    (or ``None`` if there is no rate limit).
61        cooldown_secs: The number of seconds until a slot opens for this IP
62                       (or 0 if there is a free slot).
63        endpoint: Announced endpoint. For example, there are two distinct servers
64                  that both can be reached by the main Overpass API instance.
65                  Depending on server load, a query may be sent to either of them.
66                  This value is the server name, f.e. ``"gall.openstreetmap.de/"``.
67        nb_running_queries: Number of currently running queries for this IP.
68    """
69
70    slots: int | None
71    free_slots: int | None
72    cooldown_secs: int
73    endpoint: str | None
74    nb_running_queries: int
75
76    def __repr__(self) -> str:
77        rep = f"{type(self).__name__}(free_slots="
78        rep += f"{self.free_slots}/{self.slots}" if self.slots else "∞"
79        rep += f", cooldown={self.cooldown_secs}s"
80        rep += f", running={self.nb_running_queries}"
81        if self.endpoint:
82            rep += f", endpoint={self.endpoint!r}"
83        rep += ")"
84        return rep

Information about the API server's rate limit.

Attributes:
  • slots: The maximum number of concurrent queries per IP (or None if there is no rate limit).
  • free_slots: The number of slots open for this IP (or None if there is no rate limit).
  • cooldown_secs: The number of seconds until a slot opens for this IP (or 0 if there is a free slot).
  • endpoint: Announced endpoint. For example, there are two distinct servers that both can be reached by the main Overpass API instance. Depending on server load, a query may be sent to either of them. This value is the server name, f.e. "gall.openstreetmap.de/".
  • nb_running_queries: Number of currently running queries for this IP.
Status( *, slots: int | None, free_slots: int | None, cooldown_secs: int, endpoint: str | None, nb_running_queries: int)
slots: int | None
free_slots: int | None
cooldown_secs: int
endpoint: str | None
nb_running_queries: int
DEFAULT_INSTANCE: Final[str] = 'https://overpass-api.de/api/'

Main Overpass API instance.

DEFAULT_USER_AGENT: Final[str] = 'aio-overpass/0.15.0 (https://github.com/timwie/aio-overpass)'

User agent that points to the aio-overpass repo.