aio_overpass.query

Query state and runner.

  1"""Query state and runner."""
  2
  3import asyncio
  4import hashlib
  5import json
  6import logging
  7import math
  8import re
  9import tempfile
 10import threading
 11import time
 12from abc import ABC, abstractmethod
 13from dataclasses import dataclass
 14from datetime import UTC, datetime
 15from pathlib import Path
 16from typing import Any, Final
 17
 18from aio_overpass._clock import Instant, sleep
 19from aio_overpass._env import FORCE_DISABLE_CACHE
 20from aio_overpass.error import (
 21    ClientError,
 22    QueryRejectCause,
 23    is_exceeding_timeout,
 24    is_rejection,
 25    is_server_error,
 26)
 27
 28
 29__docformat__ = "google"
 30__all__ = (
 31    "Query",
 32    "QueryRunner",
 33    "DefaultQueryRunner",
 34    "RequestTimeout",
 35    "DEFAULT_MAXSIZE_MIB",
 36    "DEFAULT_TIMEOUT_SECS",
 37)
 38
 39
 40DEFAULT_MAXSIZE_MIB: Final[int] = 512
 41"""Default ``maxsize`` setting in mebibytes."""
 42
 43DEFAULT_TIMEOUT_SECS: Final[int] = 180
 44"""Default ``timeout`` setting in seconds."""
 45
 46_COPYRIGHT: Final[str] = (
 47    "The data included in this document is from www.openstreetmap.org."
 48    " The data is made available under ODbL."
 49)
 50"""This is the same copyright notice included in result sets"""
 51
 52_SETTING_PATTERN: Final[re.Pattern[str]] = re.compile(r"\[(\w+?):(.+?)]\s*;?")
 53"""A pattern to match setting declarations (not the entire settings statement)."""
 54
 55_NULL_LOGGER: Final[logging.Logger] = logging.getLogger()
 56_NULL_LOGGER.addHandler(logging.NullHandler())
 57
 58
 59class Query:
 60    """
 61    State of a query that is either pending, running, successful, or failed.
 62
 63    Args:
 64        input_code: The input Overpass QL code. Note that some settings might be changed
 65                    by query runners, notably the 'timeout' and 'maxsize' settings.
 66        logger: The logger to use for all logging output related to this query.
 67        **kwargs: Additional keyword arguments that can be used to identify queries.
 68
 69    References:
 70        - https://wiki.openstreetmap.org/wiki/Overpass_API/Overpass_QL
 71    """
 72
 73    __slots__ = (
 74        "_error",
 75        "_input_code",
 76        "_kwargs",
 77        "_logger",
 78        "_max_timed_out_after_secs",
 79        "_nb_tries",
 80        "_request_timeout",
 81        "_response",
 82        "_response_bytes",
 83        "_run_lock",
 84        "_run_timeout_secs",
 85        "_settings",
 86        "_time_end_try",
 87        "_time_start",
 88        "_time_start_req",
 89        "_time_start_try",
 90    )
 91
 92    def __init__(
 93        self,
 94        input_code: str,
 95        logger: logging.Logger = _NULL_LOGGER,
 96        **kwargs: Any,  # noqa: ANN401
 97    ) -> None:
 98        self._run_lock: Final[threading.Lock] = threading.Lock()
 99        """a lock used to ensure a query cannot be run more than once at the same time"""
100
101        self._input_code: Final[str] = input_code
102        """the original given overpass ql code"""
103
104        self._logger: Final[logging.Logger] = logger
105        """logger to use for this query"""
106
107        self._kwargs: Final[dict] = kwargs
108        """used to identify this query"""
109
110        self._settings = dict(_SETTING_PATTERN.findall(input_code))
111        """all overpass ql settings [k:v];"""
112
113        if "out" in self._settings and self._settings["out"] != "json":
114            msg = "the '[out:*]' setting is implicitly set to 'json' and should be omitted"
115            raise ValueError(msg)
116
117        self._settings["out"] = "json"
118
119        if "maxsize" not in self._settings:
120            self._settings["maxsize"] = DEFAULT_MAXSIZE_MIB * 1024 * 1024
121        elif not self._settings["maxsize"].isdigit() or int(self._settings["maxsize"]) <= 0:
122            msg = "the '[maxsize:*]' setting must be an integer > 0"
123            raise ValueError(msg)
124
125        if "timeout" not in self._settings:
126            self._settings["timeout"] = DEFAULT_TIMEOUT_SECS
127        elif not self._settings["timeout"].isdigit() or int(self._settings["timeout"]) <= 0:
128            msg = "the '[timeout:*]' setting must be an integer > 0"
129            raise ValueError(msg)
130
131        self._run_timeout_secs: float | None = None
132        """total time limit for running this query"""
133
134        self._request_timeout: RequestTimeout = RequestTimeout()
135        """config for request timeouts"""
136
137        self._error: ClientError | None = None
138        """error of the last try, or None"""
139
140        self._response: dict | None = None
141        """response JSON as a dict, or None"""
142
143        self._response_bytes = 0.0
144        """number of bytes in a response, or zero"""
145
146        self._nb_tries = 0
147        """number of tries so far, starting at zero"""
148
149        self._time_start: Instant | None = None
150        """time prior to executing the first try"""
151
152        self._time_start_try: Instant | None = None
153        """time prior to executing the most recent try"""
154
155        self._time_start_req: Instant | None = None
156        """time prior to executing the most recent try's query request"""
157
158        self._time_end_try: Instant | None = None
159        """time the most recent try finished"""
160
161        self._max_timed_out_after_secs: int | None = None
162        """maximum of seconds after which the query was cancelled"""
163
164    def reset(self) -> None:
165        """Reset the query to its initial state, ignoring previous tries."""
166        Query.__init__(
167            self,
168            input_code=self._input_code,
169            logger=self._logger,
170            **self._kwargs,
171        )
172
173    @property
174    def input_code(self) -> str:
175        """The original input Overpass QL source code."""
176        return self._input_code
177
178    @property
179    def kwargs(self) -> dict:
180        """
181        Keyword arguments that can be used to identify queries.
182
183        The default query runner will log these values when a query is run.
184        """
185        return self._kwargs
186
187    @property
188    def logger(self) -> logging.Logger:
189        """The logger used for logging output related to this query."""
190        return self._logger
191
192    @property
193    def nb_tries(self) -> int:
194        """Current number of tries."""
195        return self._nb_tries
196
197    @property
198    def error(self) -> ClientError | None:
199        """
200        Error of the most recent try.
201
202        Returns:
203            an error or ``None`` if the query wasn't tried or hasn't failed
204        """
205        return self._error
206
207    @property
208    def response(self) -> dict | None:
209        """
210        The entire JSON response of the query.
211
212        Returns:
213            the response, or ``None`` if the query has not successfully finished (yet)
214        """
215        return self._response
216
217    @property
218    def was_cached(self) -> bool | None:
219        """
220        Indicates whether the query result was cached.
221
222        Returns:
223            ``None`` if the query has not been run yet.
224            ``True`` if the query has a result set with zero tries.
225            ``False`` otherwise.
226        """
227        if self._response is None:
228            return None
229        return self._nb_tries == 0
230
231    @property
232    def result_set(self) -> list[dict] | None:
233        """
234        The result set of the query.
235
236        This is open data, licensed under the Open Data Commons Open Database License (ODbL).
237        You are free to copy, distribute, transmit and adapt this data, as long as you credit
238        OpenStreetMap and its contributors. If you alter or build upon this data, you may
239        distribute the result only under the same licence.
240
241        Returns:
242            the elements of the result set, or ``None`` if the query has not successfully
243            finished (yet)
244
245        References:
246            - https://www.openstreetmap.org/copyright
247            - https://opendatacommons.org/licenses/odbl/1-0/
248        """
249        if not self._response:
250            return None
251        return self._response["elements"]
252
253    @property
254    def response_size_mib(self) -> float | None:
255        """
256        The size of the response in mebibytes.
257
258        Returns:
259            the size, or ``None`` if the query has not successfully finished (yet)
260        """
261        if self._response is None:
262            return None
263        return self._response_bytes / 1024.0 / 1024.0
264
265    @property
266    def maxsize_mib(self) -> float:
267        """
268        The current value of the [maxsize:*] setting in mebibytes.
269
270        This size indicates the maximum allowed memory for the query in bytes RAM on the server,
271        as expected by the user. If the query needs more RAM than this value, the server may abort
272        the query with a memory exhaustion. The higher this size, the more probably the server
273        rejects the query before executing it.
274        """
275        return float(self._settings["maxsize"]) // 1024.0 // 1024.0
276
277    @maxsize_mib.setter
278    def maxsize_mib(self, value: float) -> None:
279        if not math.isfinite(value) or value <= 0.0:
280            msg = "'maxsize_mib' must be finite > 0"
281            raise ValueError(msg)
282        self._settings["maxsize"] = int(value * 1024.0 * 1024.0)
283
284    @property
285    def timeout_secs(self) -> int:
286        """
287        The current value of the [timeout:*] setting in seconds.
288
289        This duration is the maximum allowed runtime for the query in seconds, as expected by the
290        user. If the query runs longer than this time, the server may abort the query. The higher
291        this duration, the more probably the server rejects the query before executing it.
292        """
293        return int(self._settings["timeout"])
294
295    @timeout_secs.setter
296    def timeout_secs(self, value: int) -> None:
297        if value < 1:
298            msg = "timeout_secs must be >= 1"
299            raise ValueError(msg)
300        self._settings["timeout"] = value
301
302    @property
303    def run_timeout_secs(self) -> float | None:
304        """
305        A limit to ``run_duration_secs``, that cancels running the query when exceeded.
306
307        Defaults to no timeout.
308
309        The client will raise a ``GiveupError`` if the timeout is reached.
310
311        Not to be confused with ``timeout_secs``, which is a setting for the Overpass API instance,
312        that limits a single query execution time. Instead, this value can be used to limit the
313        total client-side time spent on this query (see ``Client.run_query``).
314        """
315        return self._run_timeout_secs
316
317    @run_timeout_secs.setter
318    def run_timeout_secs(self, value: float | None) -> None:
319        if value is not None and (not math.isfinite(value) or value <= 0.0):
320            msg = "'run_timeout_secs' must be finite > 0"
321            raise ValueError(msg)
322        self._run_timeout_secs = value
323
324    @property
325    def run_timeout_elapsed(self) -> bool:
326        """Returns ``True`` if ``run_timeout_secs`` is set and has elapsed."""
327        return (
328            self.run_timeout_secs is not None
329            and self.run_duration_secs is not None
330            and self.run_timeout_secs < self.run_duration_secs
331        )
332
333    @property
334    def request_timeout(self) -> "RequestTimeout":
335        """Request timeout settings for this query."""
336        return self._request_timeout
337
338    @request_timeout.setter
339    def request_timeout(self, value: "RequestTimeout") -> None:
340        self._request_timeout = value
341
342    def _code(self, next_timeout_secs_used: int) -> str:
343        """The query's QL code, substituting the [timeout:*] setting with the given duration."""
344        settings_copy = self._settings.copy()
345        settings_copy["timeout"] = next_timeout_secs_used
346
347        # remove the original settings statement
348        code = _SETTING_PATTERN.sub("", self._input_code)
349
350        # put the adjusted settings in front
351        settings = "".join((f"[{k}:{v}]" for k, v in settings_copy.items())) + ";"
352
353        return f"{settings}\n{code}"
354
355    @property
356    def cache_key(self) -> str:
357        """
358        Hash QL code, and return its digest as hexadecimal string.
359
360        The default query runner uses this as cache key.
361        """
362        # Remove the original settings statement
363        code = _SETTING_PATTERN.sub("", self._input_code)
364        hasher = hashlib.blake2b(digest_size=8)
365        hasher.update(code.encode("utf-8"))
366        return hasher.hexdigest()
367
368    @property
369    def done(self) -> bool:
370        """Returns ``True`` if the result set was received."""
371        return self._response is not None
372
373    @property
374    def request_duration_secs(self) -> float | None:
375        """
376        How long it took to fetch the result set in seconds.
377
378        This is the duration starting with the API request, and ending once
379        the result is written to this query object. Although it depends on how busy
380        the API instance is, this can give some indication of how long a query takes.
381
382        Returns:
383            the duration or ``None`` if there is no result set yet, or when it was cached.
384        """
385        if self._response is None or self.was_cached:
386            return None
387
388        assert self._time_end_try is not None
389        assert self._time_start_req is not None
390
391        return self._time_end_try - self._time_start_req
392
393    @property
394    def run_duration_secs(self) -> float | None:
395        """
396        The total required time for this query in seconds (so far).
397
398        Returns:
399            the duration or ``None`` if there is no result set yet, or when it was cached.
400        """
401        if self._time_start is None:
402            return None
403
404        if self._time_end_try:
405            return self._time_end_try - self._time_start
406
407        return self._time_start.elapsed_secs_since
408
409    @property
410    def _run_duration_left_secs(self) -> float | None:
411        """If a limit was set, returns the seconds until the time to run the query has elapsed."""
412        if (time_max := self.run_timeout_secs) and (time_so_far := self.run_duration_secs):
413            return max(0, math.ceil(time_max - time_so_far))
414        return None
415
416    @property
417    def api_version(self) -> str | None:
418        """
419        The Overpass API version used by the queried instance.
420
421        Returns:
422            f.e. ``"Overpass API 0.7.56.8 7d656e78"``, or ``None`` if the query
423            has not successfully finished (yet)
424
425        References:
426            - https://wiki.openstreetmap.org/wiki/Overpass_API/versions
427        """
428        if self._response is None:
429            return None
430
431        return self._response["generator"]
432
433    @property
434    def timestamp_osm(self) -> datetime | None:
435        """
436        All OSM edits that have been uploaded before this date are included.
437
438        It can take a couple of minutes for changes to the database to show up in the
439        Overpass API query results.
440
441        Returns:
442            the timestamp, or ``None`` if the query has not successfully finished (yet)
443        """
444        if self._response is None:
445            return None
446
447        date_str = self._response["osm3s"]["timestamp_osm_base"]
448        return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(UTC)
449
450    @property
451    def timestamp_areas(self) -> datetime | None:
452        """
453        All area data edits that have been uploaded before this date are included.
454
455        If the query involves area data processing, this is the date of the latest edit
456        that has been considered in the most recent batch run of the area generation.
457
458        Returns:
459            the timestamp, or ``None`` if the query has not successfully finished (yet), or
460            if it does not involve area data processing.
461        """
462        if self._response is None:
463            return None
464
465        date_str = self._response["osm3s"].get("timestamp_areas_base")
466        if not date_str:
467            return None
468
469        return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(UTC)
470
471    @property
472    def copyright(self) -> str:
473        """A copyright notice that comes with the result set."""
474        if self._response is None:
475            return _COPYRIGHT
476
477        return self._response["osm3s"].get("copyright") or _COPYRIGHT
478
479    def __str__(self) -> str:
480        query = f"query{self.kwargs!r}"
481
482        size = self.response_size_mib
483        time_request = self.request_duration_secs
484        time_total = self.run_duration_secs
485
486        if self.nb_tries == 0:
487            details = "pending"
488        elif self.done:
489            if self.nb_tries == 1:
490                details = f"done - {size:.01f}mb in {time_request:.01f}s"
491            else:
492                details = f"done after {time_total:.01f}s - {size:.01f}mb in {time_request:.01f}s"
493        else:
494            t = "try" if self.nb_tries == 1 else "tries"
495            details = f"failing after {self.nb_tries} {t}, {time_total:.01f}s"
496
497        return f"{query} ({details})"
498
499    def __repr__(self) -> str:
500        cls_name = type(self).__name__
501
502        details = {
503            "kwargs": self._kwargs,
504            "done": self.done,
505        }
506
507        if self.nb_tries == 0 or self.error:
508            details["tries"] = self.nb_tries
509
510        if self.error:
511            details["error"] = type(self.error).__name__
512
513        if self.done:
514            details["response_size"] = f"{self.response_size_mib:.02f}mb"
515
516            if not self.was_cached:
517                details["request_duration"] = f"{self.request_duration_secs:.02f}s"
518
519        if self.nb_tries > 0:
520            details["run_duration"] = f"{self.run_duration_secs:.02f}s"
521
522        details_str = ", ".join((f"{k}={v!r}" for k, v in details.items()))
523
524        return f"{cls_name}({details_str})"
525
526    def _begin_try(self) -> None:
527        """First thing to call when starting the next try, after invoking the query runner."""
528        if self._time_start is None:
529            self._time_start = Instant.now()
530
531        self._time_start_try = Instant.now()
532        self._time_start_req = None
533        self._time_end_try = None
534
535    def _begin_request(self) -> None:
536        """Call before making the API call of a try, after waiting for cooldown."""
537        self._time_start_req = Instant.now()
538
539    def _succeed_try(self, response: dict, response_bytes: int) -> None:
540        """Call when the API call of a try was successful."""
541        self._time_end_try = Instant.now()
542        self._response = response
543        self._response_bytes = response_bytes
544        self._error = None
545
546    def _fail_try(self, err: ClientError) -> None:
547        """Call when the API call of a try failed."""
548        self._error = err
549
550        if is_exceeding_timeout(err):
551            self._max_timed_out_after_secs = err.timed_out_after_secs
552
553    def _end_try(self) -> None:
554        """Final call in a try."""
555        self._nb_tries += 1
556
557
558@dataclass(kw_only=True, slots=True)
559class RequestTimeout:
560    """
561    Request timeout settings.
562
563    Attributes:
564        total_without_query_secs: If set, the sum of this duration and the query's ``[timeout:*]``
565                                  setting is used as timeout duration of the entire request,
566                                  including connection establishment, request sending and response
567                                  reading (``aiohttp.ClientTimeout.total``).
568                                  Defaults to 20 seconds.
569        sock_connect_secs: The maximum number of seconds allowed for pure socket connection
570                           establishment (same as ``aiohttp.ClientTimeout.sock_connect``).
571        each_sock_read_secs: The maximum number of seconds allowed for the period between reading
572                             a new chunk of data (same as ``aiohttp.ClientTimeout.sock_read``).
573    """
574
575    total_without_query_secs: float | None = 20.0
576    sock_connect_secs: float | None = None
577    each_sock_read_secs: float | None = None
578
579    def __post_init__(self) -> None:
580        if self.total_without_query_secs is not None and (
581            not math.isfinite(self.total_without_query_secs) or self.total_without_query_secs <= 0.0
582        ):
583            msg = "'total_without_query_secs' must be finite > 0"
584            raise ValueError(msg)
585
586        if self.sock_connect_secs is not None and (
587            not math.isfinite(self.sock_connect_secs) or self.sock_connect_secs <= 0.0
588        ):
589            msg = "'sock_connect_secs' must be finite > 0"
590            raise ValueError(msg)
591
592        if self.each_sock_read_secs is not None and (
593            not math.isfinite(self.each_sock_read_secs) or self.each_sock_read_secs <= 0.0
594        ):
595            msg = "'each_sock_read_secs' must be finite > 0"
596            raise ValueError(msg)
597
598
599class QueryRunner(ABC):
600    """
601    A query runner is an async function that is called before a client makes an API request.
602
603    Query runners can be used to…
604     - …retry queries when they fail
605     - …modify queries, f.e. to lower/increase their maxsize/timeout
606     - …log query results & errors
607     - …implement caching
608
609    The absolute minimum a query runner function has to do is to simply return to (re)try
610    a query, or to raise ``query.error`` to stop trying.
611    """
612
613    __slots__ = ()
614
615    @abstractmethod
616    async def __call__(self, query: Query) -> None:
617        """Called with the current query state before the client makes an API request."""
618
619
620class DefaultQueryRunner(QueryRunner):
621    """
622    The default query runner.
623
624    This runner…
625     - …retries with an increasing back-off period in between tries if the server is too busy
626     - …retries and doubles timeout and maxsize settings if they were exceeded
627     - …limits the number of tries
628     - …optionally caches query results in temp files
629
630    This runner does *not* lower timeout and maxsize settings if the server rejected a query.
631
632    Args:
633        max_tries: The maximum number of times a query is tried. (5 by default)
634        cache_ttl_secs: Amount of seconds a query's result set is cached for.
635                        Set to zero to disable caching. (zero by default)
636    """
637
638    __slots__ = (
639        "_cache_ttl_secs",
640        "_max_tries",
641    )
642
643    def __init__(self, max_tries: int = 5, cache_ttl_secs: int = 0) -> None:
644        if max_tries < 1:
645            msg = "max_tries must be >= 1"
646            raise ValueError(msg)
647
648        if cache_ttl_secs < 0:
649            msg = "cache_ttl_secs must be >= 0"
650            raise ValueError(msg)
651
652        self._max_tries = max_tries
653        self._cache_ttl_secs = cache_ttl_secs
654
655    def _is_caching(self, query: Query) -> bool:
656        if self._cache_ttl_secs and FORCE_DISABLE_CACHE:
657            query.logger.debug("caching is forced disabled")
658            return False
659        return self._cache_ttl_secs > 0
660
661    @staticmethod
662    def _cache_file_name(query: Query) -> str:
663        return f"{query.cache_key}.json"
664
665    @staticmethod
666    def _cache_file_path(query: Query) -> Path:
667        return Path(tempfile.gettempdir()) / DefaultQueryRunner._cache_file_name(query)
668
669    @staticmethod
670    def _cache_read(query: Query) -> None:
671        logger = query.logger
672
673        now = int(time.time())
674
675        file_path = DefaultQueryRunner._cache_file_path(query)
676
677        if not file_path.exists():
678            logger.info("result was not cached")
679            logger.debug(f"checked for cache at {file_path}")
680            return
681
682        try:
683            with Path(file_path).open(encoding="utf-8") as file:
684                response = json.load(file)
685        except (OSError, json.JSONDecodeError):
686            logger.exception(f"failed to read cached {query}")
687            return
688
689        if response.get(_EXPIRATION_KEY, 0) <= now:
690            logger.info(f"{query} cache expired")
691            return
692
693        query._response = response
694        logger.info(f"{query} was cached")
695
696    def _cache_write(self, query: Query) -> None:
697        logger = query.logger
698
699        now = int(time.time())
700
701        assert query._response is not None
702        query._response[_EXPIRATION_KEY] = now + self._cache_ttl_secs
703
704        file_path = DefaultQueryRunner._cache_file_path(query)
705
706        logger.debug(f"caching at {file_path}…")
707
708        try:
709            with Path(file_path).open(mode="w", encoding="utf-8") as file:
710                json.dump(query._response, file)
711        except OSError:
712            logger.exception(f"failed to cache {query}")
713
714    @staticmethod
715    def cache_delete(query: Query) -> None:
716        """Clear a cached response for the given query by removing the file on disk."""
717        file_path = DefaultQueryRunner._cache_file_path(query)
718        file_path.unlink(missing_ok=True)
719
720    @staticmethod
721    def _cache_expire(query: Query) -> None:
722        """
723        Clear a cached response for the given query by marking it expired in the file on disk.
724
725        This should only be used for testing.
726        """
727        file_path = DefaultQueryRunner._cache_file_path(query)
728
729        with Path(file_path).open(encoding="utf-8") as file:
730            response = json.load(file)
731
732        response[_EXPIRATION_KEY] = 0
733
734        with Path(file_path).open(mode="w", encoding="utf-8") as file:
735            json.dump(response, file)
736
737    async def __call__(self, query: Query) -> None:  # noqa: C901
738        """Called with the current query state before the client makes an API request."""
739        logger = query.logger
740
741        # Check cache ahead of first try
742        if query.nb_tries == 0 and self._is_caching(query):
743            await asyncio.to_thread(DefaultQueryRunner._cache_read, query)
744
745        # Success or cached
746        if query.done:
747            logger.info(f"{query}")
748            if not query.was_cached and self._is_caching(query):
749                await asyncio.to_thread(self._cache_write, query)
750            return
751
752        err = query.error
753
754        if err:
755            logger.info(f"try for query{query.kwargs!r} failed: {err}")
756
757        if is_server_error(err):
758            logger.error(f"unexpected response body:\n{err.body}")
759
760        # Do not retry if we exhausted all tries, when a retry would not change the result,
761        # or when the timeout was reached.
762        if err and (query.nb_tries == self._max_tries or not err.should_retry):
763            logger.error(f"give up on {query}", exc_info=err)
764            raise err
765
766        if is_rejection(err):
767            # Wait if the server is too busy.
768            if err.cause == QueryRejectCause.TOO_BUSY:
769                backoff = _fibo_backoff_secs(query.nb_tries)
770                logger.info(f"retry {query} in {backoff:.1f}s")
771                await sleep(backoff)
772
773            # Wait until a slot opens if the rate limit was exceeded.
774            elif err.cause == QueryRejectCause.TOO_MANY_QUERIES:
775                pass  # let client enforce cooldown
776
777            # Double timeout if exceeded.
778            elif err.cause == QueryRejectCause.EXCEEDED_TIMEOUT:
779                old = f"{query.timeout_secs:.1f}s"
780                query.timeout_secs *= 2
781                new = f"{query.timeout_secs:.1f}s"
782                logger.info(f"increased [timeout:*] for {query} from {old} to {new}")
783
784            # Double maxsize if exceeded.
785            elif err.cause == QueryRejectCause.EXCEEDED_MAXSIZE:
786                old = f"{query.maxsize_mib:.1f}mib"
787                query.maxsize_mib *= 2
788                new = f"{query.maxsize_mib:.1f}mib"
789                logger.info(f"increased [maxsize:*] for {query} from {old} to {new}")
790
791
792def _fibo_backoff_secs(tries: int) -> float:
793    """Fibonacci sequence without zero: 1, 1, 2, 3, 5, 8, etc."""
794    a, b = 1.0, 1.0
795
796    for _ in range(tries):
797        a, b = b, a + b
798
799    return a
800
801
802_EXPIRATION_KEY: Final[str] = "__expiration__"
class Query:
 60class Query:
 61    """
 62    State of a query that is either pending, running, successful, or failed.
 63
 64    Args:
 65        input_code: The input Overpass QL code. Note that some settings might be changed
 66                    by query runners, notably the 'timeout' and 'maxsize' settings.
 67        logger: The logger to use for all logging output related to this query.
 68        **kwargs: Additional keyword arguments that can be used to identify queries.
 69
 70    References:
 71        - https://wiki.openstreetmap.org/wiki/Overpass_API/Overpass_QL
 72    """
 73
 74    __slots__ = (
 75        "_error",
 76        "_input_code",
 77        "_kwargs",
 78        "_logger",
 79        "_max_timed_out_after_secs",
 80        "_nb_tries",
 81        "_request_timeout",
 82        "_response",
 83        "_response_bytes",
 84        "_run_lock",
 85        "_run_timeout_secs",
 86        "_settings",
 87        "_time_end_try",
 88        "_time_start",
 89        "_time_start_req",
 90        "_time_start_try",
 91    )
 92
 93    def __init__(
 94        self,
 95        input_code: str,
 96        logger: logging.Logger = _NULL_LOGGER,
 97        **kwargs: Any,  # noqa: ANN401
 98    ) -> None:
 99        self._run_lock: Final[threading.Lock] = threading.Lock()
100        """a lock used to ensure a query cannot be run more than once at the same time"""
101
102        self._input_code: Final[str] = input_code
103        """the original given overpass ql code"""
104
105        self._logger: Final[logging.Logger] = logger
106        """logger to use for this query"""
107
108        self._kwargs: Final[dict] = kwargs
109        """used to identify this query"""
110
111        self._settings = dict(_SETTING_PATTERN.findall(input_code))
112        """all overpass ql settings [k:v];"""
113
114        if "out" in self._settings and self._settings["out"] != "json":
115            msg = "the '[out:*]' setting is implicitly set to 'json' and should be omitted"
116            raise ValueError(msg)
117
118        self._settings["out"] = "json"
119
120        if "maxsize" not in self._settings:
121            self._settings["maxsize"] = DEFAULT_MAXSIZE_MIB * 1024 * 1024
122        elif not self._settings["maxsize"].isdigit() or int(self._settings["maxsize"]) <= 0:
123            msg = "the '[maxsize:*]' setting must be an integer > 0"
124            raise ValueError(msg)
125
126        if "timeout" not in self._settings:
127            self._settings["timeout"] = DEFAULT_TIMEOUT_SECS
128        elif not self._settings["timeout"].isdigit() or int(self._settings["timeout"]) <= 0:
129            msg = "the '[timeout:*]' setting must be an integer > 0"
130            raise ValueError(msg)
131
132        self._run_timeout_secs: float | None = None
133        """total time limit for running this query"""
134
135        self._request_timeout: RequestTimeout = RequestTimeout()
136        """config for request timeouts"""
137
138        self._error: ClientError | None = None
139        """error of the last try, or None"""
140
141        self._response: dict | None = None
142        """response JSON as a dict, or None"""
143
144        self._response_bytes = 0.0
145        """number of bytes in a response, or zero"""
146
147        self._nb_tries = 0
148        """number of tries so far, starting at zero"""
149
150        self._time_start: Instant | None = None
151        """time prior to executing the first try"""
152
153        self._time_start_try: Instant | None = None
154        """time prior to executing the most recent try"""
155
156        self._time_start_req: Instant | None = None
157        """time prior to executing the most recent try's query request"""
158
159        self._time_end_try: Instant | None = None
160        """time the most recent try finished"""
161
162        self._max_timed_out_after_secs: int | None = None
163        """maximum of seconds after which the query was cancelled"""
164
165    def reset(self) -> None:
166        """Reset the query to its initial state, ignoring previous tries."""
167        Query.__init__(
168            self,
169            input_code=self._input_code,
170            logger=self._logger,
171            **self._kwargs,
172        )
173
174    @property
175    def input_code(self) -> str:
176        """The original input Overpass QL source code."""
177        return self._input_code
178
179    @property
180    def kwargs(self) -> dict:
181        """
182        Keyword arguments that can be used to identify queries.
183
184        The default query runner will log these values when a query is run.
185        """
186        return self._kwargs
187
188    @property
189    def logger(self) -> logging.Logger:
190        """The logger used for logging output related to this query."""
191        return self._logger
192
193    @property
194    def nb_tries(self) -> int:
195        """Current number of tries."""
196        return self._nb_tries
197
198    @property
199    def error(self) -> ClientError | None:
200        """
201        Error of the most recent try.
202
203        Returns:
204            an error or ``None`` if the query wasn't tried or hasn't failed
205        """
206        return self._error
207
208    @property
209    def response(self) -> dict | None:
210        """
211        The entire JSON response of the query.
212
213        Returns:
214            the response, or ``None`` if the query has not successfully finished (yet)
215        """
216        return self._response
217
218    @property
219    def was_cached(self) -> bool | None:
220        """
221        Indicates whether the query result was cached.
222
223        Returns:
224            ``None`` if the query has not been run yet.
225            ``True`` if the query has a result set with zero tries.
226            ``False`` otherwise.
227        """
228        if self._response is None:
229            return None
230        return self._nb_tries == 0
231
232    @property
233    def result_set(self) -> list[dict] | None:
234        """
235        The result set of the query.
236
237        This is open data, licensed under the Open Data Commons Open Database License (ODbL).
238        You are free to copy, distribute, transmit and adapt this data, as long as you credit
239        OpenStreetMap and its contributors. If you alter or build upon this data, you may
240        distribute the result only under the same licence.
241
242        Returns:
243            the elements of the result set, or ``None`` if the query has not successfully
244            finished (yet)
245
246        References:
247            - https://www.openstreetmap.org/copyright
248            - https://opendatacommons.org/licenses/odbl/1-0/
249        """
250        if not self._response:
251            return None
252        return self._response["elements"]
253
254    @property
255    def response_size_mib(self) -> float | None:
256        """
257        The size of the response in mebibytes.
258
259        Returns:
260            the size, or ``None`` if the query has not successfully finished (yet)
261        """
262        if self._response is None:
263            return None
264        return self._response_bytes / 1024.0 / 1024.0
265
266    @property
267    def maxsize_mib(self) -> float:
268        """
269        The current value of the [maxsize:*] setting in mebibytes.
270
271        This size indicates the maximum allowed memory for the query in bytes RAM on the server,
272        as expected by the user. If the query needs more RAM than this value, the server may abort
273        the query with a memory exhaustion. The higher this size, the more probably the server
274        rejects the query before executing it.
275        """
276        return float(self._settings["maxsize"]) // 1024.0 // 1024.0
277
278    @maxsize_mib.setter
279    def maxsize_mib(self, value: float) -> None:
280        if not math.isfinite(value) or value <= 0.0:
281            msg = "'maxsize_mib' must be finite > 0"
282            raise ValueError(msg)
283        self._settings["maxsize"] = int(value * 1024.0 * 1024.0)
284
285    @property
286    def timeout_secs(self) -> int:
287        """
288        The current value of the [timeout:*] setting in seconds.
289
290        This duration is the maximum allowed runtime for the query in seconds, as expected by the
291        user. If the query runs longer than this time, the server may abort the query. The higher
292        this duration, the more probably the server rejects the query before executing it.
293        """
294        return int(self._settings["timeout"])
295
296    @timeout_secs.setter
297    def timeout_secs(self, value: int) -> None:
298        if value < 1:
299            msg = "timeout_secs must be >= 1"
300            raise ValueError(msg)
301        self._settings["timeout"] = value
302
303    @property
304    def run_timeout_secs(self) -> float | None:
305        """
306        A limit to ``run_duration_secs``, that cancels running the query when exceeded.
307
308        Defaults to no timeout.
309
310        The client will raise a ``GiveupError`` if the timeout is reached.
311
312        Not to be confused with ``timeout_secs``, which is a setting for the Overpass API instance,
313        that limits a single query execution time. Instead, this value can be used to limit the
314        total client-side time spent on this query (see ``Client.run_query``).
315        """
316        return self._run_timeout_secs
317
318    @run_timeout_secs.setter
319    def run_timeout_secs(self, value: float | None) -> None:
320        if value is not None and (not math.isfinite(value) or value <= 0.0):
321            msg = "'run_timeout_secs' must be finite > 0"
322            raise ValueError(msg)
323        self._run_timeout_secs = value
324
325    @property
326    def run_timeout_elapsed(self) -> bool:
327        """Returns ``True`` if ``run_timeout_secs`` is set and has elapsed."""
328        return (
329            self.run_timeout_secs is not None
330            and self.run_duration_secs is not None
331            and self.run_timeout_secs < self.run_duration_secs
332        )
333
334    @property
335    def request_timeout(self) -> "RequestTimeout":
336        """Request timeout settings for this query."""
337        return self._request_timeout
338
339    @request_timeout.setter
340    def request_timeout(self, value: "RequestTimeout") -> None:
341        self._request_timeout = value
342
343    def _code(self, next_timeout_secs_used: int) -> str:
344        """The query's QL code, substituting the [timeout:*] setting with the given duration."""
345        settings_copy = self._settings.copy()
346        settings_copy["timeout"] = next_timeout_secs_used
347
348        # remove the original settings statement
349        code = _SETTING_PATTERN.sub("", self._input_code)
350
351        # put the adjusted settings in front
352        settings = "".join((f"[{k}:{v}]" for k, v in settings_copy.items())) + ";"
353
354        return f"{settings}\n{code}"
355
356    @property
357    def cache_key(self) -> str:
358        """
359        Hash QL code, and return its digest as hexadecimal string.
360
361        The default query runner uses this as cache key.
362        """
363        # Remove the original settings statement
364        code = _SETTING_PATTERN.sub("", self._input_code)
365        hasher = hashlib.blake2b(digest_size=8)
366        hasher.update(code.encode("utf-8"))
367        return hasher.hexdigest()
368
369    @property
370    def done(self) -> bool:
371        """Returns ``True`` if the result set was received."""
372        return self._response is not None
373
374    @property
375    def request_duration_secs(self) -> float | None:
376        """
377        How long it took to fetch the result set in seconds.
378
379        This is the duration starting with the API request, and ending once
380        the result is written to this query object. Although it depends on how busy
381        the API instance is, this can give some indication of how long a query takes.
382
383        Returns:
384            the duration or ``None`` if there is no result set yet, or when it was cached.
385        """
386        if self._response is None or self.was_cached:
387            return None
388
389        assert self._time_end_try is not None
390        assert self._time_start_req is not None
391
392        return self._time_end_try - self._time_start_req
393
394    @property
395    def run_duration_secs(self) -> float | None:
396        """
397        The total required time for this query in seconds (so far).
398
399        Returns:
400            the duration or ``None`` if there is no result set yet, or when it was cached.
401        """
402        if self._time_start is None:
403            return None
404
405        if self._time_end_try:
406            return self._time_end_try - self._time_start
407
408        return self._time_start.elapsed_secs_since
409
410    @property
411    def _run_duration_left_secs(self) -> float | None:
412        """If a limit was set, returns the seconds until the time to run the query has elapsed."""
413        if (time_max := self.run_timeout_secs) and (time_so_far := self.run_duration_secs):
414            return max(0, math.ceil(time_max - time_so_far))
415        return None
416
417    @property
418    def api_version(self) -> str | None:
419        """
420        The Overpass API version used by the queried instance.
421
422        Returns:
423            f.e. ``"Overpass API 0.7.56.8 7d656e78"``, or ``None`` if the query
424            has not successfully finished (yet)
425
426        References:
427            - https://wiki.openstreetmap.org/wiki/Overpass_API/versions
428        """
429        if self._response is None:
430            return None
431
432        return self._response["generator"]
433
434    @property
435    def timestamp_osm(self) -> datetime | None:
436        """
437        All OSM edits that have been uploaded before this date are included.
438
439        It can take a couple of minutes for changes to the database to show up in the
440        Overpass API query results.
441
442        Returns:
443            the timestamp, or ``None`` if the query has not successfully finished (yet)
444        """
445        if self._response is None:
446            return None
447
448        date_str = self._response["osm3s"]["timestamp_osm_base"]
449        return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(UTC)
450
451    @property
452    def timestamp_areas(self) -> datetime | None:
453        """
454        All area data edits that have been uploaded before this date are included.
455
456        If the query involves area data processing, this is the date of the latest edit
457        that has been considered in the most recent batch run of the area generation.
458
459        Returns:
460            the timestamp, or ``None`` if the query has not successfully finished (yet), or
461            if it does not involve area data processing.
462        """
463        if self._response is None:
464            return None
465
466        date_str = self._response["osm3s"].get("timestamp_areas_base")
467        if not date_str:
468            return None
469
470        return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(UTC)
471
472    @property
473    def copyright(self) -> str:
474        """A copyright notice that comes with the result set."""
475        if self._response is None:
476            return _COPYRIGHT
477
478        return self._response["osm3s"].get("copyright") or _COPYRIGHT
479
480    def __str__(self) -> str:
481        query = f"query{self.kwargs!r}"
482
483        size = self.response_size_mib
484        time_request = self.request_duration_secs
485        time_total = self.run_duration_secs
486
487        if self.nb_tries == 0:
488            details = "pending"
489        elif self.done:
490            if self.nb_tries == 1:
491                details = f"done - {size:.01f}mb in {time_request:.01f}s"
492            else:
493                details = f"done after {time_total:.01f}s - {size:.01f}mb in {time_request:.01f}s"
494        else:
495            t = "try" if self.nb_tries == 1 else "tries"
496            details = f"failing after {self.nb_tries} {t}, {time_total:.01f}s"
497
498        return f"{query} ({details})"
499
500    def __repr__(self) -> str:
501        cls_name = type(self).__name__
502
503        details = {
504            "kwargs": self._kwargs,
505            "done": self.done,
506        }
507
508        if self.nb_tries == 0 or self.error:
509            details["tries"] = self.nb_tries
510
511        if self.error:
512            details["error"] = type(self.error).__name__
513
514        if self.done:
515            details["response_size"] = f"{self.response_size_mib:.02f}mb"
516
517            if not self.was_cached:
518                details["request_duration"] = f"{self.request_duration_secs:.02f}s"
519
520        if self.nb_tries > 0:
521            details["run_duration"] = f"{self.run_duration_secs:.02f}s"
522
523        details_str = ", ".join((f"{k}={v!r}" for k, v in details.items()))
524
525        return f"{cls_name}({details_str})"
526
527    def _begin_try(self) -> None:
528        """First thing to call when starting the next try, after invoking the query runner."""
529        if self._time_start is None:
530            self._time_start = Instant.now()
531
532        self._time_start_try = Instant.now()
533        self._time_start_req = None
534        self._time_end_try = None
535
536    def _begin_request(self) -> None:
537        """Call before making the API call of a try, after waiting for cooldown."""
538        self._time_start_req = Instant.now()
539
540    def _succeed_try(self, response: dict, response_bytes: int) -> None:
541        """Call when the API call of a try was successful."""
542        self._time_end_try = Instant.now()
543        self._response = response
544        self._response_bytes = response_bytes
545        self._error = None
546
547    def _fail_try(self, err: ClientError) -> None:
548        """Call when the API call of a try failed."""
549        self._error = err
550
551        if is_exceeding_timeout(err):
552            self._max_timed_out_after_secs = err.timed_out_after_secs
553
554    def _end_try(self) -> None:
555        """Final call in a try."""
556        self._nb_tries += 1

State of a query that is either pending, running, successful, or failed.

Arguments:
  • input_code: The input Overpass QL code. Note that some settings might be changed by query runners, notably the 'timeout' and 'maxsize' settings.
  • logger: The logger to use for all logging output related to this query.
  • **kwargs: Additional keyword arguments that can be used to identify queries.
References:
Query( input_code: str, logger: logging.Logger = <RootLogger root (WARNING)>, **kwargs: Any)
 93    def __init__(
 94        self,
 95        input_code: str,
 96        logger: logging.Logger = _NULL_LOGGER,
 97        **kwargs: Any,  # noqa: ANN401
 98    ) -> None:
 99        self._run_lock: Final[threading.Lock] = threading.Lock()
100        """a lock used to ensure a query cannot be run more than once at the same time"""
101
102        self._input_code: Final[str] = input_code
103        """the original given overpass ql code"""
104
105        self._logger: Final[logging.Logger] = logger
106        """logger to use for this query"""
107
108        self._kwargs: Final[dict] = kwargs
109        """used to identify this query"""
110
111        self._settings = dict(_SETTING_PATTERN.findall(input_code))
112        """all overpass ql settings [k:v];"""
113
114        if "out" in self._settings and self._settings["out"] != "json":
115            msg = "the '[out:*]' setting is implicitly set to 'json' and should be omitted"
116            raise ValueError(msg)
117
118        self._settings["out"] = "json"
119
120        if "maxsize" not in self._settings:
121            self._settings["maxsize"] = DEFAULT_MAXSIZE_MIB * 1024 * 1024
122        elif not self._settings["maxsize"].isdigit() or int(self._settings["maxsize"]) <= 0:
123            msg = "the '[maxsize:*]' setting must be an integer > 0"
124            raise ValueError(msg)
125
126        if "timeout" not in self._settings:
127            self._settings["timeout"] = DEFAULT_TIMEOUT_SECS
128        elif not self._settings["timeout"].isdigit() or int(self._settings["timeout"]) <= 0:
129            msg = "the '[timeout:*]' setting must be an integer > 0"
130            raise ValueError(msg)
131
132        self._run_timeout_secs: float | None = None
133        """total time limit for running this query"""
134
135        self._request_timeout: RequestTimeout = RequestTimeout()
136        """config for request timeouts"""
137
138        self._error: ClientError | None = None
139        """error of the last try, or None"""
140
141        self._response: dict | None = None
142        """response JSON as a dict, or None"""
143
144        self._response_bytes = 0.0
145        """number of bytes in a response, or zero"""
146
147        self._nb_tries = 0
148        """number of tries so far, starting at zero"""
149
150        self._time_start: Instant | None = None
151        """time prior to executing the first try"""
152
153        self._time_start_try: Instant | None = None
154        """time prior to executing the most recent try"""
155
156        self._time_start_req: Instant | None = None
157        """time prior to executing the most recent try's query request"""
158
159        self._time_end_try: Instant | None = None
160        """time the most recent try finished"""
161
162        self._max_timed_out_after_secs: int | None = None
163        """maximum of seconds after which the query was cancelled"""
def reset(self) -> None:
165    def reset(self) -> None:
166        """Reset the query to its initial state, ignoring previous tries."""
167        Query.__init__(
168            self,
169            input_code=self._input_code,
170            logger=self._logger,
171            **self._kwargs,
172        )

Reset the query to its initial state, ignoring previous tries.

input_code: str
174    @property
175    def input_code(self) -> str:
176        """The original input Overpass QL source code."""
177        return self._input_code

The original input Overpass QL source code.

kwargs: dict
179    @property
180    def kwargs(self) -> dict:
181        """
182        Keyword arguments that can be used to identify queries.
183
184        The default query runner will log these values when a query is run.
185        """
186        return self._kwargs

Keyword arguments that can be used to identify queries.

The default query runner will log these values when a query is run.

logger: logging.Logger
188    @property
189    def logger(self) -> logging.Logger:
190        """The logger used for logging output related to this query."""
191        return self._logger

The logger used for logging output related to this query.

nb_tries: int
193    @property
194    def nb_tries(self) -> int:
195        """Current number of tries."""
196        return self._nb_tries

Current number of tries.

error: aio_overpass.ClientError | None
198    @property
199    def error(self) -> ClientError | None:
200        """
201        Error of the most recent try.
202
203        Returns:
204            an error or ``None`` if the query wasn't tried or hasn't failed
205        """
206        return self._error

Error of the most recent try.

Returns:

an error or None if the query wasn't tried or hasn't failed

response: dict | None
208    @property
209    def response(self) -> dict | None:
210        """
211        The entire JSON response of the query.
212
213        Returns:
214            the response, or ``None`` if the query has not successfully finished (yet)
215        """
216        return self._response

The entire JSON response of the query.

Returns:

the response, or None if the query has not successfully finished (yet)

was_cached: bool | None
218    @property
219    def was_cached(self) -> bool | None:
220        """
221        Indicates whether the query result was cached.
222
223        Returns:
224            ``None`` if the query has not been run yet.
225            ``True`` if the query has a result set with zero tries.
226            ``False`` otherwise.
227        """
228        if self._response is None:
229            return None
230        return self._nb_tries == 0

Indicates whether the query result was cached.

Returns:

None if the query has not been run yet. True if the query has a result set with zero tries. False otherwise.

result_set: list[dict] | None
232    @property
233    def result_set(self) -> list[dict] | None:
234        """
235        The result set of the query.
236
237        This is open data, licensed under the Open Data Commons Open Database License (ODbL).
238        You are free to copy, distribute, transmit and adapt this data, as long as you credit
239        OpenStreetMap and its contributors. If you alter or build upon this data, you may
240        distribute the result only under the same licence.
241
242        Returns:
243            the elements of the result set, or ``None`` if the query has not successfully
244            finished (yet)
245
246        References:
247            - https://www.openstreetmap.org/copyright
248            - https://opendatacommons.org/licenses/odbl/1-0/
249        """
250        if not self._response:
251            return None
252        return self._response["elements"]

The result set of the query.

This is open data, licensed under the Open Data Commons Open Database License (ODbL). You are free to copy, distribute, transmit and adapt this data, as long as you credit OpenStreetMap and its contributors. If you alter or build upon this data, you may distribute the result only under the same licence.

Returns:

the elements of the result set, or None if the query has not successfully finished (yet)

References:
response_size_mib: float | None
254    @property
255    def response_size_mib(self) -> float | None:
256        """
257        The size of the response in mebibytes.
258
259        Returns:
260            the size, or ``None`` if the query has not successfully finished (yet)
261        """
262        if self._response is None:
263            return None
264        return self._response_bytes / 1024.0 / 1024.0

The size of the response in mebibytes.

Returns:

the size, or None if the query has not successfully finished (yet)

maxsize_mib: float
266    @property
267    def maxsize_mib(self) -> float:
268        """
269        The current value of the [maxsize:*] setting in mebibytes.
270
271        This size indicates the maximum allowed memory for the query in bytes RAM on the server,
272        as expected by the user. If the query needs more RAM than this value, the server may abort
273        the query with a memory exhaustion. The higher this size, the more probably the server
274        rejects the query before executing it.
275        """
276        return float(self._settings["maxsize"]) // 1024.0 // 1024.0

The current value of the [maxsize:*] setting in mebibytes.

This size indicates the maximum allowed memory for the query in bytes RAM on the server, as expected by the user. If the query needs more RAM than this value, the server may abort the query with a memory exhaustion. The higher this size, the more probably the server rejects the query before executing it.

timeout_secs: int
285    @property
286    def timeout_secs(self) -> int:
287        """
288        The current value of the [timeout:*] setting in seconds.
289
290        This duration is the maximum allowed runtime for the query in seconds, as expected by the
291        user. If the query runs longer than this time, the server may abort the query. The higher
292        this duration, the more probably the server rejects the query before executing it.
293        """
294        return int(self._settings["timeout"])

The current value of the [timeout:*] setting in seconds.

This duration is the maximum allowed runtime for the query in seconds, as expected by the user. If the query runs longer than this time, the server may abort the query. The higher this duration, the more probably the server rejects the query before executing it.

run_timeout_secs: float | None
303    @property
304    def run_timeout_secs(self) -> float | None:
305        """
306        A limit to ``run_duration_secs``, that cancels running the query when exceeded.
307
308        Defaults to no timeout.
309
310        The client will raise a ``GiveupError`` if the timeout is reached.
311
312        Not to be confused with ``timeout_secs``, which is a setting for the Overpass API instance,
313        that limits a single query execution time. Instead, this value can be used to limit the
314        total client-side time spent on this query (see ``Client.run_query``).
315        """
316        return self._run_timeout_secs

A limit to run_duration_secs, that cancels running the query when exceeded.

Defaults to no timeout.

The client will raise a GiveupError if the timeout is reached.

Not to be confused with timeout_secs, which is a setting for the Overpass API instance, that limits a single query execution time. Instead, this value can be used to limit the total client-side time spent on this query (see Client.run_query).

run_timeout_elapsed: bool
325    @property
326    def run_timeout_elapsed(self) -> bool:
327        """Returns ``True`` if ``run_timeout_secs`` is set and has elapsed."""
328        return (
329            self.run_timeout_secs is not None
330            and self.run_duration_secs is not None
331            and self.run_timeout_secs < self.run_duration_secs
332        )

Returns True if run_timeout_secs is set and has elapsed.

request_timeout: RequestTimeout
334    @property
335    def request_timeout(self) -> "RequestTimeout":
336        """Request timeout settings for this query."""
337        return self._request_timeout

Request timeout settings for this query.

cache_key: str
356    @property
357    def cache_key(self) -> str:
358        """
359        Hash QL code, and return its digest as hexadecimal string.
360
361        The default query runner uses this as cache key.
362        """
363        # Remove the original settings statement
364        code = _SETTING_PATTERN.sub("", self._input_code)
365        hasher = hashlib.blake2b(digest_size=8)
366        hasher.update(code.encode("utf-8"))
367        return hasher.hexdigest()

Hash QL code, and return its digest as hexadecimal string.

The default query runner uses this as cache key.

done: bool
369    @property
370    def done(self) -> bool:
371        """Returns ``True`` if the result set was received."""
372        return self._response is not None

Returns True if the result set was received.

request_duration_secs: float | None
374    @property
375    def request_duration_secs(self) -> float | None:
376        """
377        How long it took to fetch the result set in seconds.
378
379        This is the duration starting with the API request, and ending once
380        the result is written to this query object. Although it depends on how busy
381        the API instance is, this can give some indication of how long a query takes.
382
383        Returns:
384            the duration or ``None`` if there is no result set yet, or when it was cached.
385        """
386        if self._response is None or self.was_cached:
387            return None
388
389        assert self._time_end_try is not None
390        assert self._time_start_req is not None
391
392        return self._time_end_try - self._time_start_req

How long it took to fetch the result set in seconds.

This is the duration starting with the API request, and ending once the result is written to this query object. Although it depends on how busy the API instance is, this can give some indication of how long a query takes.

Returns:

the duration or None if there is no result set yet, or when it was cached.

run_duration_secs: float | None
394    @property
395    def run_duration_secs(self) -> float | None:
396        """
397        The total required time for this query in seconds (so far).
398
399        Returns:
400            the duration or ``None`` if there is no result set yet, or when it was cached.
401        """
402        if self._time_start is None:
403            return None
404
405        if self._time_end_try:
406            return self._time_end_try - self._time_start
407
408        return self._time_start.elapsed_secs_since

The total required time for this query in seconds (so far).

Returns:

the duration or None if there is no result set yet, or when it was cached.

api_version: str | None
417    @property
418    def api_version(self) -> str | None:
419        """
420        The Overpass API version used by the queried instance.
421
422        Returns:
423            f.e. ``"Overpass API 0.7.56.8 7d656e78"``, or ``None`` if the query
424            has not successfully finished (yet)
425
426        References:
427            - https://wiki.openstreetmap.org/wiki/Overpass_API/versions
428        """
429        if self._response is None:
430            return None
431
432        return self._response["generator"]

The Overpass API version used by the queried instance.

Returns:

f.e. "Overpass API 0.7.56.8 7d656e78", or None if the query has not successfully finished (yet)

References:
timestamp_osm: datetime.datetime | None
434    @property
435    def timestamp_osm(self) -> datetime | None:
436        """
437        All OSM edits that have been uploaded before this date are included.
438
439        It can take a couple of minutes for changes to the database to show up in the
440        Overpass API query results.
441
442        Returns:
443            the timestamp, or ``None`` if the query has not successfully finished (yet)
444        """
445        if self._response is None:
446            return None
447
448        date_str = self._response["osm3s"]["timestamp_osm_base"]
449        return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(UTC)

All OSM edits that have been uploaded before this date are included.

It can take a couple of minutes for changes to the database to show up in the Overpass API query results.

Returns:

the timestamp, or None if the query has not successfully finished (yet)

timestamp_areas: datetime.datetime | None
451    @property
452    def timestamp_areas(self) -> datetime | None:
453        """
454        All area data edits that have been uploaded before this date are included.
455
456        If the query involves area data processing, this is the date of the latest edit
457        that has been considered in the most recent batch run of the area generation.
458
459        Returns:
460            the timestamp, or ``None`` if the query has not successfully finished (yet), or
461            if it does not involve area data processing.
462        """
463        if self._response is None:
464            return None
465
466        date_str = self._response["osm3s"].get("timestamp_areas_base")
467        if not date_str:
468            return None
469
470        return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(UTC)

All area data edits that have been uploaded before this date are included.

If the query involves area data processing, this is the date of the latest edit that has been considered in the most recent batch run of the area generation.

Returns:

the timestamp, or None if the query has not successfully finished (yet), or if it does not involve area data processing.

copyright: str
472    @property
473    def copyright(self) -> str:
474        """A copyright notice that comes with the result set."""
475        if self._response is None:
476            return _COPYRIGHT
477
478        return self._response["osm3s"].get("copyright") or _COPYRIGHT

A copyright notice that comes with the result set.

class QueryRunner(abc.ABC):
600class QueryRunner(ABC):
601    """
602    A query runner is an async function that is called before a client makes an API request.
603
604    Query runners can be used to…
605     - …retry queries when they fail
606     - …modify queries, f.e. to lower/increase their maxsize/timeout
607     - …log query results & errors
608     - …implement caching
609
610    The absolute minimum a query runner function has to do is to simply return to (re)try
611    a query, or to raise ``query.error`` to stop trying.
612    """
613
614    __slots__ = ()
615
616    @abstractmethod
617    async def __call__(self, query: Query) -> None:
618        """Called with the current query state before the client makes an API request."""

A query runner is an async function that is called before a client makes an API request.

Query runners can be used to…

  • …retry queries when they fail
  • …modify queries, f.e. to lower/increase their maxsize/timeout
  • …log query results & errors
  • …implement caching

The absolute minimum a query runner function has to do is to simply return to (re)try a query, or to raise query.error to stop trying.

class DefaultQueryRunner(QueryRunner):
621class DefaultQueryRunner(QueryRunner):
622    """
623    The default query runner.
624
625    This runner…
626     - …retries with an increasing back-off period in between tries if the server is too busy
627     - …retries and doubles timeout and maxsize settings if they were exceeded
628     - …limits the number of tries
629     - …optionally caches query results in temp files
630
631    This runner does *not* lower timeout and maxsize settings if the server rejected a query.
632
633    Args:
634        max_tries: The maximum number of times a query is tried. (5 by default)
635        cache_ttl_secs: Amount of seconds a query's result set is cached for.
636                        Set to zero to disable caching. (zero by default)
637    """
638
639    __slots__ = (
640        "_cache_ttl_secs",
641        "_max_tries",
642    )
643
644    def __init__(self, max_tries: int = 5, cache_ttl_secs: int = 0) -> None:
645        if max_tries < 1:
646            msg = "max_tries must be >= 1"
647            raise ValueError(msg)
648
649        if cache_ttl_secs < 0:
650            msg = "cache_ttl_secs must be >= 0"
651            raise ValueError(msg)
652
653        self._max_tries = max_tries
654        self._cache_ttl_secs = cache_ttl_secs
655
656    def _is_caching(self, query: Query) -> bool:
657        if self._cache_ttl_secs and FORCE_DISABLE_CACHE:
658            query.logger.debug("caching is forced disabled")
659            return False
660        return self._cache_ttl_secs > 0
661
662    @staticmethod
663    def _cache_file_name(query: Query) -> str:
664        return f"{query.cache_key}.json"
665
666    @staticmethod
667    def _cache_file_path(query: Query) -> Path:
668        return Path(tempfile.gettempdir()) / DefaultQueryRunner._cache_file_name(query)
669
670    @staticmethod
671    def _cache_read(query: Query) -> None:
672        logger = query.logger
673
674        now = int(time.time())
675
676        file_path = DefaultQueryRunner._cache_file_path(query)
677
678        if not file_path.exists():
679            logger.info("result was not cached")
680            logger.debug(f"checked for cache at {file_path}")
681            return
682
683        try:
684            with Path(file_path).open(encoding="utf-8") as file:
685                response = json.load(file)
686        except (OSError, json.JSONDecodeError):
687            logger.exception(f"failed to read cached {query}")
688            return
689
690        if response.get(_EXPIRATION_KEY, 0) <= now:
691            logger.info(f"{query} cache expired")
692            return
693
694        query._response = response
695        logger.info(f"{query} was cached")
696
697    def _cache_write(self, query: Query) -> None:
698        logger = query.logger
699
700        now = int(time.time())
701
702        assert query._response is not None
703        query._response[_EXPIRATION_KEY] = now + self._cache_ttl_secs
704
705        file_path = DefaultQueryRunner._cache_file_path(query)
706
707        logger.debug(f"caching at {file_path}…")
708
709        try:
710            with Path(file_path).open(mode="w", encoding="utf-8") as file:
711                json.dump(query._response, file)
712        except OSError:
713            logger.exception(f"failed to cache {query}")
714
715    @staticmethod
716    def cache_delete(query: Query) -> None:
717        """Clear a cached response for the given query by removing the file on disk."""
718        file_path = DefaultQueryRunner._cache_file_path(query)
719        file_path.unlink(missing_ok=True)
720
721    @staticmethod
722    def _cache_expire(query: Query) -> None:
723        """
724        Clear a cached response for the given query by marking it expired in the file on disk.
725
726        This should only be used for testing.
727        """
728        file_path = DefaultQueryRunner._cache_file_path(query)
729
730        with Path(file_path).open(encoding="utf-8") as file:
731            response = json.load(file)
732
733        response[_EXPIRATION_KEY] = 0
734
735        with Path(file_path).open(mode="w", encoding="utf-8") as file:
736            json.dump(response, file)
737
738    async def __call__(self, query: Query) -> None:  # noqa: C901
739        """Called with the current query state before the client makes an API request."""
740        logger = query.logger
741
742        # Check cache ahead of first try
743        if query.nb_tries == 0 and self._is_caching(query):
744            await asyncio.to_thread(DefaultQueryRunner._cache_read, query)
745
746        # Success or cached
747        if query.done:
748            logger.info(f"{query}")
749            if not query.was_cached and self._is_caching(query):
750                await asyncio.to_thread(self._cache_write, query)
751            return
752
753        err = query.error
754
755        if err:
756            logger.info(f"try for query{query.kwargs!r} failed: {err}")
757
758        if is_server_error(err):
759            logger.error(f"unexpected response body:\n{err.body}")
760
761        # Do not retry if we exhausted all tries, when a retry would not change the result,
762        # or when the timeout was reached.
763        if err and (query.nb_tries == self._max_tries or not err.should_retry):
764            logger.error(f"give up on {query}", exc_info=err)
765            raise err
766
767        if is_rejection(err):
768            # Wait if the server is too busy.
769            if err.cause == QueryRejectCause.TOO_BUSY:
770                backoff = _fibo_backoff_secs(query.nb_tries)
771                logger.info(f"retry {query} in {backoff:.1f}s")
772                await sleep(backoff)
773
774            # Wait until a slot opens if the rate limit was exceeded.
775            elif err.cause == QueryRejectCause.TOO_MANY_QUERIES:
776                pass  # let client enforce cooldown
777
778            # Double timeout if exceeded.
779            elif err.cause == QueryRejectCause.EXCEEDED_TIMEOUT:
780                old = f"{query.timeout_secs:.1f}s"
781                query.timeout_secs *= 2
782                new = f"{query.timeout_secs:.1f}s"
783                logger.info(f"increased [timeout:*] for {query} from {old} to {new}")
784
785            # Double maxsize if exceeded.
786            elif err.cause == QueryRejectCause.EXCEEDED_MAXSIZE:
787                old = f"{query.maxsize_mib:.1f}mib"
788                query.maxsize_mib *= 2
789                new = f"{query.maxsize_mib:.1f}mib"
790                logger.info(f"increased [maxsize:*] for {query} from {old} to {new}")

The default query runner.

This runner…

  • …retries with an increasing back-off period in between tries if the server is too busy
  • …retries and doubles timeout and maxsize settings if they were exceeded
  • …limits the number of tries
  • …optionally caches query results in temp files

This runner does not lower timeout and maxsize settings if the server rejected a query.

Arguments:
  • max_tries: The maximum number of times a query is tried. (5 by default)
  • cache_ttl_secs: Amount of seconds a query's result set is cached for. Set to zero to disable caching. (zero by default)
DefaultQueryRunner(max_tries: int = 5, cache_ttl_secs: int = 0)
644    def __init__(self, max_tries: int = 5, cache_ttl_secs: int = 0) -> None:
645        if max_tries < 1:
646            msg = "max_tries must be >= 1"
647            raise ValueError(msg)
648
649        if cache_ttl_secs < 0:
650            msg = "cache_ttl_secs must be >= 0"
651            raise ValueError(msg)
652
653        self._max_tries = max_tries
654        self._cache_ttl_secs = cache_ttl_secs
@staticmethod
def cache_delete(query: Query) -> None:
715    @staticmethod
716    def cache_delete(query: Query) -> None:
717        """Clear a cached response for the given query by removing the file on disk."""
718        file_path = DefaultQueryRunner._cache_file_path(query)
719        file_path.unlink(missing_ok=True)

Clear a cached response for the given query by removing the file on disk.

@dataclass(kw_only=True, slots=True)
class RequestTimeout:
559@dataclass(kw_only=True, slots=True)
560class RequestTimeout:
561    """
562    Request timeout settings.
563
564    Attributes:
565        total_without_query_secs: If set, the sum of this duration and the query's ``[timeout:*]``
566                                  setting is used as timeout duration of the entire request,
567                                  including connection establishment, request sending and response
568                                  reading (``aiohttp.ClientTimeout.total``).
569                                  Defaults to 20 seconds.
570        sock_connect_secs: The maximum number of seconds allowed for pure socket connection
571                           establishment (same as ``aiohttp.ClientTimeout.sock_connect``).
572        each_sock_read_secs: The maximum number of seconds allowed for the period between reading
573                             a new chunk of data (same as ``aiohttp.ClientTimeout.sock_read``).
574    """
575
576    total_without_query_secs: float | None = 20.0
577    sock_connect_secs: float | None = None
578    each_sock_read_secs: float | None = None
579
580    def __post_init__(self) -> None:
581        if self.total_without_query_secs is not None and (
582            not math.isfinite(self.total_without_query_secs) or self.total_without_query_secs <= 0.0
583        ):
584            msg = "'total_without_query_secs' must be finite > 0"
585            raise ValueError(msg)
586
587        if self.sock_connect_secs is not None and (
588            not math.isfinite(self.sock_connect_secs) or self.sock_connect_secs <= 0.0
589        ):
590            msg = "'sock_connect_secs' must be finite > 0"
591            raise ValueError(msg)
592
593        if self.each_sock_read_secs is not None and (
594            not math.isfinite(self.each_sock_read_secs) or self.each_sock_read_secs <= 0.0
595        ):
596            msg = "'each_sock_read_secs' must be finite > 0"
597            raise ValueError(msg)

Request timeout settings.

Attributes:
  • total_without_query_secs: If set, the sum of this duration and the query's [timeout:*] setting is used as timeout duration of the entire request, including connection establishment, request sending and response reading (aiohttp.ClientTimeout.total). Defaults to 20 seconds.
  • sock_connect_secs: The maximum number of seconds allowed for pure socket connection establishment (same as aiohttp.ClientTimeout.sock_connect).
  • each_sock_read_secs: The maximum number of seconds allowed for the period between reading a new chunk of data (same as aiohttp.ClientTimeout.sock_read).
RequestTimeout( *, total_without_query_secs: float | None = 20.0, sock_connect_secs: float | None = None, each_sock_read_secs: float | None = None)
total_without_query_secs: float | None
sock_connect_secs: float | None
each_sock_read_secs: float | None
DEFAULT_MAXSIZE_MIB: Final[int] = 512

Default maxsize setting in mebibytes.

DEFAULT_TIMEOUT_SECS: Final[int] = 180

Default timeout setting in seconds.