aio_overpass.query
Query state and runner.
1"""Query state and runner.""" 2 3import asyncio 4import hashlib 5import json 6import logging 7import math 8import re 9import tempfile 10import threading 11import time 12from abc import ABC, abstractmethod 13from dataclasses import dataclass 14from datetime import UTC, datetime 15from pathlib import Path 16from typing import Any, Final 17 18from aio_overpass._clock import Instant, sleep 19from aio_overpass._env import FORCE_DISABLE_CACHE 20from aio_overpass.error import ( 21 ClientError, 22 QueryRejectCause, 23 is_exceeding_timeout, 24 is_rejection, 25 is_server_error, 26) 27 28 29__docformat__ = "google" 30__all__ = ( 31 "Query", 32 "QueryRunner", 33 "DefaultQueryRunner", 34 "RequestTimeout", 35 "DEFAULT_MAXSIZE_MIB", 36 "DEFAULT_TIMEOUT_SECS", 37) 38 39 40DEFAULT_MAXSIZE_MIB: Final[int] = 512 41"""Default ``maxsize`` setting in mebibytes.""" 42 43DEFAULT_TIMEOUT_SECS: Final[int] = 180 44"""Default ``timeout`` setting in seconds.""" 45 46_COPYRIGHT: Final[str] = ( 47 "The data included in this document is from www.openstreetmap.org." 48 " The data is made available under ODbL." 49) 50"""This is the same copyright notice included in result sets""" 51 52_SETTING_PATTERN: Final[re.Pattern[str]] = re.compile(r"\[(\w+?):(.+?)]\s*;?") 53"""A pattern to match setting declarations (not the entire settings statement).""" 54 55_NULL_LOGGER: Final[logging.Logger] = logging.getLogger() 56_NULL_LOGGER.addHandler(logging.NullHandler()) 57 58 59class Query: 60 """ 61 State of a query that is either pending, running, successful, or failed. 62 63 Args: 64 input_code: The input Overpass QL code. Note that some settings might be changed 65 by query runners, notably the 'timeout' and 'maxsize' settings. 66 logger: The logger to use for all logging output related to this query. 67 **kwargs: Additional keyword arguments that can be used to identify queries. 68 69 References: 70 - https://wiki.openstreetmap.org/wiki/Overpass_API/Overpass_QL 71 """ 72 73 __slots__ = ( 74 "_error", 75 "_input_code", 76 "_kwargs", 77 "_logger", 78 "_max_timed_out_after_secs", 79 "_nb_tries", 80 "_request_timeout", 81 "_response", 82 "_response_bytes", 83 "_run_lock", 84 "_run_timeout_secs", 85 "_settings", 86 "_time_end_try", 87 "_time_start", 88 "_time_start_req", 89 "_time_start_try", 90 ) 91 92 def __init__( 93 self, 94 input_code: str, 95 logger: logging.Logger = _NULL_LOGGER, 96 **kwargs: Any, # noqa: ANN401 97 ) -> None: 98 self._run_lock: Final[threading.Lock] = threading.Lock() 99 """a lock used to ensure a query cannot be run more than once at the same time""" 100 101 self._input_code: Final[str] = input_code 102 """the original given overpass ql code""" 103 104 self._logger: Final[logging.Logger] = logger 105 """logger to use for this query""" 106 107 self._kwargs: Final[dict] = kwargs 108 """used to identify this query""" 109 110 self._settings = dict(_SETTING_PATTERN.findall(input_code)) 111 """all overpass ql settings [k:v];""" 112 113 if "out" in self._settings and self._settings["out"] != "json": 114 msg = "the '[out:*]' setting is implicitly set to 'json' and should be omitted" 115 raise ValueError(msg) 116 117 self._settings["out"] = "json" 118 119 if "maxsize" not in self._settings: 120 self._settings["maxsize"] = DEFAULT_MAXSIZE_MIB * 1024 * 1024 121 elif not self._settings["maxsize"].isdigit() or int(self._settings["maxsize"]) <= 0: 122 msg = "the '[maxsize:*]' setting must be an integer > 0" 123 raise ValueError(msg) 124 125 if "timeout" not in self._settings: 126 self._settings["timeout"] = DEFAULT_TIMEOUT_SECS 127 elif not self._settings["timeout"].isdigit() or int(self._settings["timeout"]) <= 0: 128 msg = "the '[timeout:*]' setting must be an integer > 0" 129 raise ValueError(msg) 130 131 self._run_timeout_secs: float | None = None 132 """total time limit for running this query""" 133 134 self._request_timeout: RequestTimeout = RequestTimeout() 135 """config for request timeouts""" 136 137 self._error: ClientError | None = None 138 """error of the last try, or None""" 139 140 self._response: dict | None = None 141 """response JSON as a dict, or None""" 142 143 self._response_bytes = 0.0 144 """number of bytes in a response, or zero""" 145 146 self._nb_tries = 0 147 """number of tries so far, starting at zero""" 148 149 self._time_start: Instant | None = None 150 """time prior to executing the first try""" 151 152 self._time_start_try: Instant | None = None 153 """time prior to executing the most recent try""" 154 155 self._time_start_req: Instant | None = None 156 """time prior to executing the most recent try's query request""" 157 158 self._time_end_try: Instant | None = None 159 """time the most recent try finished""" 160 161 self._max_timed_out_after_secs: int | None = None 162 """maximum of seconds after which the query was cancelled""" 163 164 def reset(self) -> None: 165 """Reset the query to its initial state, ignoring previous tries.""" 166 Query.__init__( 167 self, 168 input_code=self._input_code, 169 logger=self._logger, 170 **self._kwargs, 171 ) 172 173 @property 174 def input_code(self) -> str: 175 """The original input Overpass QL source code.""" 176 return self._input_code 177 178 @property 179 def kwargs(self) -> dict: 180 """ 181 Keyword arguments that can be used to identify queries. 182 183 The default query runner will log these values when a query is run. 184 """ 185 return self._kwargs 186 187 @property 188 def logger(self) -> logging.Logger: 189 """The logger used for logging output related to this query.""" 190 return self._logger 191 192 @property 193 def nb_tries(self) -> int: 194 """Current number of tries.""" 195 return self._nb_tries 196 197 @property 198 def error(self) -> ClientError | None: 199 """ 200 Error of the most recent try. 201 202 Returns: 203 an error or ``None`` if the query wasn't tried or hasn't failed 204 """ 205 return self._error 206 207 @property 208 def response(self) -> dict | None: 209 """ 210 The entire JSON response of the query. 211 212 Returns: 213 the response, or ``None`` if the query has not successfully finished (yet) 214 """ 215 return self._response 216 217 @property 218 def was_cached(self) -> bool | None: 219 """ 220 Indicates whether the query result was cached. 221 222 Returns: 223 ``None`` if the query has not been run yet. 224 ``True`` if the query has a result set with zero tries. 225 ``False`` otherwise. 226 """ 227 if self._response is None: 228 return None 229 return self._nb_tries == 0 230 231 @property 232 def result_set(self) -> list[dict] | None: 233 """ 234 The result set of the query. 235 236 This is open data, licensed under the Open Data Commons Open Database License (ODbL). 237 You are free to copy, distribute, transmit and adapt this data, as long as you credit 238 OpenStreetMap and its contributors. If you alter or build upon this data, you may 239 distribute the result only under the same licence. 240 241 Returns: 242 the elements of the result set, or ``None`` if the query has not successfully 243 finished (yet) 244 245 References: 246 - https://www.openstreetmap.org/copyright 247 - https://opendatacommons.org/licenses/odbl/1-0/ 248 """ 249 if not self._response: 250 return None 251 return self._response["elements"] 252 253 @property 254 def response_size_mib(self) -> float | None: 255 """ 256 The size of the response in mebibytes. 257 258 Returns: 259 the size, or ``None`` if the query has not successfully finished (yet) 260 """ 261 if self._response is None: 262 return None 263 return self._response_bytes / 1024.0 / 1024.0 264 265 @property 266 def maxsize_mib(self) -> float: 267 """ 268 The current value of the [maxsize:*] setting in mebibytes. 269 270 This size indicates the maximum allowed memory for the query in bytes RAM on the server, 271 as expected by the user. If the query needs more RAM than this value, the server may abort 272 the query with a memory exhaustion. The higher this size, the more probably the server 273 rejects the query before executing it. 274 """ 275 return float(self._settings["maxsize"]) // 1024.0 // 1024.0 276 277 @maxsize_mib.setter 278 def maxsize_mib(self, value: float) -> None: 279 if not math.isfinite(value) or value <= 0.0: 280 msg = "'maxsize_mib' must be finite > 0" 281 raise ValueError(msg) 282 self._settings["maxsize"] = int(value * 1024.0 * 1024.0) 283 284 @property 285 def timeout_secs(self) -> int: 286 """ 287 The current value of the [timeout:*] setting in seconds. 288 289 This duration is the maximum allowed runtime for the query in seconds, as expected by the 290 user. If the query runs longer than this time, the server may abort the query. The higher 291 this duration, the more probably the server rejects the query before executing it. 292 """ 293 return int(self._settings["timeout"]) 294 295 @timeout_secs.setter 296 def timeout_secs(self, value: int) -> None: 297 if value < 1: 298 msg = "timeout_secs must be >= 1" 299 raise ValueError(msg) 300 self._settings["timeout"] = value 301 302 @property 303 def run_timeout_secs(self) -> float | None: 304 """ 305 A limit to ``run_duration_secs``, that cancels running the query when exceeded. 306 307 Defaults to no timeout. 308 309 The client will raise a ``GiveupError`` if the timeout is reached. 310 311 Not to be confused with ``timeout_secs``, which is a setting for the Overpass API instance, 312 that limits a single query execution time. Instead, this value can be used to limit the 313 total client-side time spent on this query (see ``Client.run_query``). 314 """ 315 return self._run_timeout_secs 316 317 @run_timeout_secs.setter 318 def run_timeout_secs(self, value: float | None) -> None: 319 if value is not None and (not math.isfinite(value) or value <= 0.0): 320 msg = "'run_timeout_secs' must be finite > 0" 321 raise ValueError(msg) 322 self._run_timeout_secs = value 323 324 @property 325 def run_timeout_elapsed(self) -> bool: 326 """Returns ``True`` if ``run_timeout_secs`` is set and has elapsed.""" 327 return ( 328 self.run_timeout_secs is not None 329 and self.run_duration_secs is not None 330 and self.run_timeout_secs < self.run_duration_secs 331 ) 332 333 @property 334 def request_timeout(self) -> "RequestTimeout": 335 """Request timeout settings for this query.""" 336 return self._request_timeout 337 338 @request_timeout.setter 339 def request_timeout(self, value: "RequestTimeout") -> None: 340 self._request_timeout = value 341 342 def _code(self, next_timeout_secs_used: int) -> str: 343 """The query's QL code, substituting the [timeout:*] setting with the given duration.""" 344 settings_copy = self._settings.copy() 345 settings_copy["timeout"] = next_timeout_secs_used 346 347 # remove the original settings statement 348 code = _SETTING_PATTERN.sub("", self._input_code) 349 350 # put the adjusted settings in front 351 settings = "".join((f"[{k}:{v}]" for k, v in settings_copy.items())) + ";" 352 353 return f"{settings}\n{code}" 354 355 @property 356 def cache_key(self) -> str: 357 """ 358 Hash QL code, and return its digest as hexadecimal string. 359 360 The default query runner uses this as cache key. 361 """ 362 # Remove the original settings statement 363 code = _SETTING_PATTERN.sub("", self._input_code) 364 hasher = hashlib.blake2b(digest_size=8) 365 hasher.update(code.encode("utf-8")) 366 return hasher.hexdigest() 367 368 @property 369 def done(self) -> bool: 370 """Returns ``True`` if the result set was received.""" 371 return self._response is not None 372 373 @property 374 def request_duration_secs(self) -> float | None: 375 """ 376 How long it took to fetch the result set in seconds. 377 378 This is the duration starting with the API request, and ending once 379 the result is written to this query object. Although it depends on how busy 380 the API instance is, this can give some indication of how long a query takes. 381 382 Returns: 383 the duration or ``None`` if there is no result set yet, or when it was cached. 384 """ 385 if self._response is None or self.was_cached: 386 return None 387 388 assert self._time_end_try is not None 389 assert self._time_start_req is not None 390 391 return self._time_end_try - self._time_start_req 392 393 @property 394 def run_duration_secs(self) -> float | None: 395 """ 396 The total required time for this query in seconds (so far). 397 398 Returns: 399 the duration or ``None`` if there is no result set yet, or when it was cached. 400 """ 401 if self._time_start is None: 402 return None 403 404 if self._time_end_try: 405 return self._time_end_try - self._time_start 406 407 return self._time_start.elapsed_secs_since 408 409 @property 410 def _run_duration_left_secs(self) -> float | None: 411 """If a limit was set, returns the seconds until the time to run the query has elapsed.""" 412 if (time_max := self.run_timeout_secs) and (time_so_far := self.run_duration_secs): 413 return max(0, math.ceil(time_max - time_so_far)) 414 return None 415 416 @property 417 def api_version(self) -> str | None: 418 """ 419 The Overpass API version used by the queried instance. 420 421 Returns: 422 f.e. ``"Overpass API 0.7.56.8 7d656e78"``, or ``None`` if the query 423 has not successfully finished (yet) 424 425 References: 426 - https://wiki.openstreetmap.org/wiki/Overpass_API/versions 427 """ 428 if self._response is None: 429 return None 430 431 return self._response["generator"] 432 433 @property 434 def timestamp_osm(self) -> datetime | None: 435 """ 436 All OSM edits that have been uploaded before this date are included. 437 438 It can take a couple of minutes for changes to the database to show up in the 439 Overpass API query results. 440 441 Returns: 442 the timestamp, or ``None`` if the query has not successfully finished (yet) 443 """ 444 if self._response is None: 445 return None 446 447 date_str = self._response["osm3s"]["timestamp_osm_base"] 448 return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(UTC) 449 450 @property 451 def timestamp_areas(self) -> datetime | None: 452 """ 453 All area data edits that have been uploaded before this date are included. 454 455 If the query involves area data processing, this is the date of the latest edit 456 that has been considered in the most recent batch run of the area generation. 457 458 Returns: 459 the timestamp, or ``None`` if the query has not successfully finished (yet), or 460 if it does not involve area data processing. 461 """ 462 if self._response is None: 463 return None 464 465 date_str = self._response["osm3s"].get("timestamp_areas_base") 466 if not date_str: 467 return None 468 469 return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(UTC) 470 471 @property 472 def copyright(self) -> str: 473 """A copyright notice that comes with the result set.""" 474 if self._response is None: 475 return _COPYRIGHT 476 477 return self._response["osm3s"].get("copyright") or _COPYRIGHT 478 479 def __str__(self) -> str: 480 query = f"query{self.kwargs!r}" 481 482 size = self.response_size_mib 483 time_request = self.request_duration_secs 484 time_total = self.run_duration_secs 485 486 if self.nb_tries == 0: 487 details = "pending" 488 elif self.done: 489 if self.nb_tries == 1: 490 details = f"done - {size:.01f}mb in {time_request:.01f}s" 491 else: 492 details = f"done after {time_total:.01f}s - {size:.01f}mb in {time_request:.01f}s" 493 else: 494 t = "try" if self.nb_tries == 1 else "tries" 495 details = f"failing after {self.nb_tries} {t}, {time_total:.01f}s" 496 497 return f"{query} ({details})" 498 499 def __repr__(self) -> str: 500 cls_name = type(self).__name__ 501 502 details = { 503 "kwargs": self._kwargs, 504 "done": self.done, 505 } 506 507 if self.nb_tries == 0 or self.error: 508 details["tries"] = self.nb_tries 509 510 if self.error: 511 details["error"] = type(self.error).__name__ 512 513 if self.done: 514 details["response_size"] = f"{self.response_size_mib:.02f}mb" 515 516 if not self.was_cached: 517 details["request_duration"] = f"{self.request_duration_secs:.02f}s" 518 519 if self.nb_tries > 0: 520 details["run_duration"] = f"{self.run_duration_secs:.02f}s" 521 522 details_str = ", ".join((f"{k}={v!r}" for k, v in details.items())) 523 524 return f"{cls_name}({details_str})" 525 526 def _begin_try(self) -> None: 527 """First thing to call when starting the next try, after invoking the query runner.""" 528 if self._time_start is None: 529 self._time_start = Instant.now() 530 531 self._time_start_try = Instant.now() 532 self._time_start_req = None 533 self._time_end_try = None 534 535 def _begin_request(self) -> None: 536 """Call before making the API call of a try, after waiting for cooldown.""" 537 self._time_start_req = Instant.now() 538 539 def _succeed_try(self, response: dict, response_bytes: int) -> None: 540 """Call when the API call of a try was successful.""" 541 self._time_end_try = Instant.now() 542 self._response = response 543 self._response_bytes = response_bytes 544 self._error = None 545 546 def _fail_try(self, err: ClientError) -> None: 547 """Call when the API call of a try failed.""" 548 self._error = err 549 550 if is_exceeding_timeout(err): 551 self._max_timed_out_after_secs = err.timed_out_after_secs 552 553 def _end_try(self) -> None: 554 """Final call in a try.""" 555 self._nb_tries += 1 556 557 558@dataclass(kw_only=True, slots=True) 559class RequestTimeout: 560 """ 561 Request timeout settings. 562 563 Attributes: 564 total_without_query_secs: If set, the sum of this duration and the query's ``[timeout:*]`` 565 setting is used as timeout duration of the entire request, 566 including connection establishment, request sending and response 567 reading (``aiohttp.ClientTimeout.total``). 568 Defaults to 20 seconds. 569 sock_connect_secs: The maximum number of seconds allowed for pure socket connection 570 establishment (same as ``aiohttp.ClientTimeout.sock_connect``). 571 each_sock_read_secs: The maximum number of seconds allowed for the period between reading 572 a new chunk of data (same as ``aiohttp.ClientTimeout.sock_read``). 573 """ 574 575 total_without_query_secs: float | None = 20.0 576 sock_connect_secs: float | None = None 577 each_sock_read_secs: float | None = None 578 579 def __post_init__(self) -> None: 580 if self.total_without_query_secs is not None and ( 581 not math.isfinite(self.total_without_query_secs) or self.total_without_query_secs <= 0.0 582 ): 583 msg = "'total_without_query_secs' must be finite > 0" 584 raise ValueError(msg) 585 586 if self.sock_connect_secs is not None and ( 587 not math.isfinite(self.sock_connect_secs) or self.sock_connect_secs <= 0.0 588 ): 589 msg = "'sock_connect_secs' must be finite > 0" 590 raise ValueError(msg) 591 592 if self.each_sock_read_secs is not None and ( 593 not math.isfinite(self.each_sock_read_secs) or self.each_sock_read_secs <= 0.0 594 ): 595 msg = "'each_sock_read_secs' must be finite > 0" 596 raise ValueError(msg) 597 598 599class QueryRunner(ABC): 600 """ 601 A query runner is an async function that is called before a client makes an API request. 602 603 Query runners can be used to… 604 - …retry queries when they fail 605 - …modify queries, f.e. to lower/increase their maxsize/timeout 606 - …log query results & errors 607 - …implement caching 608 609 The absolute minimum a query runner function has to do is to simply return to (re)try 610 a query, or to raise ``query.error`` to stop trying. 611 """ 612 613 __slots__ = () 614 615 @abstractmethod 616 async def __call__(self, query: Query) -> None: 617 """Called with the current query state before the client makes an API request.""" 618 619 620class DefaultQueryRunner(QueryRunner): 621 """ 622 The default query runner. 623 624 This runner… 625 - …retries with an increasing back-off period in between tries if the server is too busy 626 - …retries and doubles timeout and maxsize settings if they were exceeded 627 - …limits the number of tries 628 - …optionally caches query results in temp files 629 630 This runner does *not* lower timeout and maxsize settings if the server rejected a query. 631 632 Args: 633 max_tries: The maximum number of times a query is tried. (5 by default) 634 cache_ttl_secs: Amount of seconds a query's result set is cached for. 635 Set to zero to disable caching. (zero by default) 636 """ 637 638 __slots__ = ( 639 "_cache_ttl_secs", 640 "_max_tries", 641 ) 642 643 def __init__(self, max_tries: int = 5, cache_ttl_secs: int = 0) -> None: 644 if max_tries < 1: 645 msg = "max_tries must be >= 1" 646 raise ValueError(msg) 647 648 if cache_ttl_secs < 0: 649 msg = "cache_ttl_secs must be >= 0" 650 raise ValueError(msg) 651 652 self._max_tries = max_tries 653 self._cache_ttl_secs = cache_ttl_secs 654 655 def _is_caching(self, query: Query) -> bool: 656 if self._cache_ttl_secs and FORCE_DISABLE_CACHE: 657 query.logger.debug("caching is forced disabled") 658 return False 659 return self._cache_ttl_secs > 0 660 661 @staticmethod 662 def _cache_file_name(query: Query) -> str: 663 return f"{query.cache_key}.json" 664 665 @staticmethod 666 def _cache_file_path(query: Query) -> Path: 667 return Path(tempfile.gettempdir()) / DefaultQueryRunner._cache_file_name(query) 668 669 @staticmethod 670 def _cache_read(query: Query) -> None: 671 logger = query.logger 672 673 now = int(time.time()) 674 675 file_path = DefaultQueryRunner._cache_file_path(query) 676 677 if not file_path.exists(): 678 logger.info("result was not cached") 679 logger.debug(f"checked for cache at {file_path}") 680 return 681 682 try: 683 with Path(file_path).open(encoding="utf-8") as file: 684 response = json.load(file) 685 except (OSError, json.JSONDecodeError): 686 logger.exception(f"failed to read cached {query}") 687 return 688 689 if response.get(_EXPIRATION_KEY, 0) <= now: 690 logger.info(f"{query} cache expired") 691 return 692 693 query._response = response 694 logger.info(f"{query} was cached") 695 696 def _cache_write(self, query: Query) -> None: 697 logger = query.logger 698 699 now = int(time.time()) 700 701 assert query._response is not None 702 query._response[_EXPIRATION_KEY] = now + self._cache_ttl_secs 703 704 file_path = DefaultQueryRunner._cache_file_path(query) 705 706 logger.debug(f"caching at {file_path}…") 707 708 try: 709 with Path(file_path).open(mode="w", encoding="utf-8") as file: 710 json.dump(query._response, file) 711 except OSError: 712 logger.exception(f"failed to cache {query}") 713 714 @staticmethod 715 def cache_delete(query: Query) -> None: 716 """Clear a cached response for the given query by removing the file on disk.""" 717 file_path = DefaultQueryRunner._cache_file_path(query) 718 file_path.unlink(missing_ok=True) 719 720 @staticmethod 721 def _cache_expire(query: Query) -> None: 722 """ 723 Clear a cached response for the given query by marking it expired in the file on disk. 724 725 This should only be used for testing. 726 """ 727 file_path = DefaultQueryRunner._cache_file_path(query) 728 729 with Path(file_path).open(encoding="utf-8") as file: 730 response = json.load(file) 731 732 response[_EXPIRATION_KEY] = 0 733 734 with Path(file_path).open(mode="w", encoding="utf-8") as file: 735 json.dump(response, file) 736 737 async def __call__(self, query: Query) -> None: # noqa: C901 738 """Called with the current query state before the client makes an API request.""" 739 logger = query.logger 740 741 # Check cache ahead of first try 742 if query.nb_tries == 0 and self._is_caching(query): 743 await asyncio.to_thread(DefaultQueryRunner._cache_read, query) 744 745 # Success or cached 746 if query.done: 747 logger.info(f"{query}") 748 if not query.was_cached and self._is_caching(query): 749 await asyncio.to_thread(self._cache_write, query) 750 return 751 752 err = query.error 753 754 if err: 755 logger.info(f"try for query{query.kwargs!r} failed: {err}") 756 757 if is_server_error(err): 758 logger.error(f"unexpected response body:\n{err.body}") 759 760 # Do not retry if we exhausted all tries, when a retry would not change the result, 761 # or when the timeout was reached. 762 if err and (query.nb_tries == self._max_tries or not err.should_retry): 763 logger.error(f"give up on {query}", exc_info=err) 764 raise err 765 766 if is_rejection(err): 767 # Wait if the server is too busy. 768 if err.cause == QueryRejectCause.TOO_BUSY: 769 backoff = _fibo_backoff_secs(query.nb_tries) 770 logger.info(f"retry {query} in {backoff:.1f}s") 771 await sleep(backoff) 772 773 # Wait until a slot opens if the rate limit was exceeded. 774 elif err.cause == QueryRejectCause.TOO_MANY_QUERIES: 775 pass # let client enforce cooldown 776 777 # Double timeout if exceeded. 778 elif err.cause == QueryRejectCause.EXCEEDED_TIMEOUT: 779 old = f"{query.timeout_secs:.1f}s" 780 query.timeout_secs *= 2 781 new = f"{query.timeout_secs:.1f}s" 782 logger.info(f"increased [timeout:*] for {query} from {old} to {new}") 783 784 # Double maxsize if exceeded. 785 elif err.cause == QueryRejectCause.EXCEEDED_MAXSIZE: 786 old = f"{query.maxsize_mib:.1f}mib" 787 query.maxsize_mib *= 2 788 new = f"{query.maxsize_mib:.1f}mib" 789 logger.info(f"increased [maxsize:*] for {query} from {old} to {new}") 790 791 792def _fibo_backoff_secs(tries: int) -> float: 793 """Fibonacci sequence without zero: 1, 1, 2, 3, 5, 8, etc.""" 794 a, b = 1.0, 1.0 795 796 for _ in range(tries): 797 a, b = b, a + b 798 799 return a 800 801 802_EXPIRATION_KEY: Final[str] = "__expiration__"
60class Query: 61 """ 62 State of a query that is either pending, running, successful, or failed. 63 64 Args: 65 input_code: The input Overpass QL code. Note that some settings might be changed 66 by query runners, notably the 'timeout' and 'maxsize' settings. 67 logger: The logger to use for all logging output related to this query. 68 **kwargs: Additional keyword arguments that can be used to identify queries. 69 70 References: 71 - https://wiki.openstreetmap.org/wiki/Overpass_API/Overpass_QL 72 """ 73 74 __slots__ = ( 75 "_error", 76 "_input_code", 77 "_kwargs", 78 "_logger", 79 "_max_timed_out_after_secs", 80 "_nb_tries", 81 "_request_timeout", 82 "_response", 83 "_response_bytes", 84 "_run_lock", 85 "_run_timeout_secs", 86 "_settings", 87 "_time_end_try", 88 "_time_start", 89 "_time_start_req", 90 "_time_start_try", 91 ) 92 93 def __init__( 94 self, 95 input_code: str, 96 logger: logging.Logger = _NULL_LOGGER, 97 **kwargs: Any, # noqa: ANN401 98 ) -> None: 99 self._run_lock: Final[threading.Lock] = threading.Lock() 100 """a lock used to ensure a query cannot be run more than once at the same time""" 101 102 self._input_code: Final[str] = input_code 103 """the original given overpass ql code""" 104 105 self._logger: Final[logging.Logger] = logger 106 """logger to use for this query""" 107 108 self._kwargs: Final[dict] = kwargs 109 """used to identify this query""" 110 111 self._settings = dict(_SETTING_PATTERN.findall(input_code)) 112 """all overpass ql settings [k:v];""" 113 114 if "out" in self._settings and self._settings["out"] != "json": 115 msg = "the '[out:*]' setting is implicitly set to 'json' and should be omitted" 116 raise ValueError(msg) 117 118 self._settings["out"] = "json" 119 120 if "maxsize" not in self._settings: 121 self._settings["maxsize"] = DEFAULT_MAXSIZE_MIB * 1024 * 1024 122 elif not self._settings["maxsize"].isdigit() or int(self._settings["maxsize"]) <= 0: 123 msg = "the '[maxsize:*]' setting must be an integer > 0" 124 raise ValueError(msg) 125 126 if "timeout" not in self._settings: 127 self._settings["timeout"] = DEFAULT_TIMEOUT_SECS 128 elif not self._settings["timeout"].isdigit() or int(self._settings["timeout"]) <= 0: 129 msg = "the '[timeout:*]' setting must be an integer > 0" 130 raise ValueError(msg) 131 132 self._run_timeout_secs: float | None = None 133 """total time limit for running this query""" 134 135 self._request_timeout: RequestTimeout = RequestTimeout() 136 """config for request timeouts""" 137 138 self._error: ClientError | None = None 139 """error of the last try, or None""" 140 141 self._response: dict | None = None 142 """response JSON as a dict, or None""" 143 144 self._response_bytes = 0.0 145 """number of bytes in a response, or zero""" 146 147 self._nb_tries = 0 148 """number of tries so far, starting at zero""" 149 150 self._time_start: Instant | None = None 151 """time prior to executing the first try""" 152 153 self._time_start_try: Instant | None = None 154 """time prior to executing the most recent try""" 155 156 self._time_start_req: Instant | None = None 157 """time prior to executing the most recent try's query request""" 158 159 self._time_end_try: Instant | None = None 160 """time the most recent try finished""" 161 162 self._max_timed_out_after_secs: int | None = None 163 """maximum of seconds after which the query was cancelled""" 164 165 def reset(self) -> None: 166 """Reset the query to its initial state, ignoring previous tries.""" 167 Query.__init__( 168 self, 169 input_code=self._input_code, 170 logger=self._logger, 171 **self._kwargs, 172 ) 173 174 @property 175 def input_code(self) -> str: 176 """The original input Overpass QL source code.""" 177 return self._input_code 178 179 @property 180 def kwargs(self) -> dict: 181 """ 182 Keyword arguments that can be used to identify queries. 183 184 The default query runner will log these values when a query is run. 185 """ 186 return self._kwargs 187 188 @property 189 def logger(self) -> logging.Logger: 190 """The logger used for logging output related to this query.""" 191 return self._logger 192 193 @property 194 def nb_tries(self) -> int: 195 """Current number of tries.""" 196 return self._nb_tries 197 198 @property 199 def error(self) -> ClientError | None: 200 """ 201 Error of the most recent try. 202 203 Returns: 204 an error or ``None`` if the query wasn't tried or hasn't failed 205 """ 206 return self._error 207 208 @property 209 def response(self) -> dict | None: 210 """ 211 The entire JSON response of the query. 212 213 Returns: 214 the response, or ``None`` if the query has not successfully finished (yet) 215 """ 216 return self._response 217 218 @property 219 def was_cached(self) -> bool | None: 220 """ 221 Indicates whether the query result was cached. 222 223 Returns: 224 ``None`` if the query has not been run yet. 225 ``True`` if the query has a result set with zero tries. 226 ``False`` otherwise. 227 """ 228 if self._response is None: 229 return None 230 return self._nb_tries == 0 231 232 @property 233 def result_set(self) -> list[dict] | None: 234 """ 235 The result set of the query. 236 237 This is open data, licensed under the Open Data Commons Open Database License (ODbL). 238 You are free to copy, distribute, transmit and adapt this data, as long as you credit 239 OpenStreetMap and its contributors. If you alter or build upon this data, you may 240 distribute the result only under the same licence. 241 242 Returns: 243 the elements of the result set, or ``None`` if the query has not successfully 244 finished (yet) 245 246 References: 247 - https://www.openstreetmap.org/copyright 248 - https://opendatacommons.org/licenses/odbl/1-0/ 249 """ 250 if not self._response: 251 return None 252 return self._response["elements"] 253 254 @property 255 def response_size_mib(self) -> float | None: 256 """ 257 The size of the response in mebibytes. 258 259 Returns: 260 the size, or ``None`` if the query has not successfully finished (yet) 261 """ 262 if self._response is None: 263 return None 264 return self._response_bytes / 1024.0 / 1024.0 265 266 @property 267 def maxsize_mib(self) -> float: 268 """ 269 The current value of the [maxsize:*] setting in mebibytes. 270 271 This size indicates the maximum allowed memory for the query in bytes RAM on the server, 272 as expected by the user. If the query needs more RAM than this value, the server may abort 273 the query with a memory exhaustion. The higher this size, the more probably the server 274 rejects the query before executing it. 275 """ 276 return float(self._settings["maxsize"]) // 1024.0 // 1024.0 277 278 @maxsize_mib.setter 279 def maxsize_mib(self, value: float) -> None: 280 if not math.isfinite(value) or value <= 0.0: 281 msg = "'maxsize_mib' must be finite > 0" 282 raise ValueError(msg) 283 self._settings["maxsize"] = int(value * 1024.0 * 1024.0) 284 285 @property 286 def timeout_secs(self) -> int: 287 """ 288 The current value of the [timeout:*] setting in seconds. 289 290 This duration is the maximum allowed runtime for the query in seconds, as expected by the 291 user. If the query runs longer than this time, the server may abort the query. The higher 292 this duration, the more probably the server rejects the query before executing it. 293 """ 294 return int(self._settings["timeout"]) 295 296 @timeout_secs.setter 297 def timeout_secs(self, value: int) -> None: 298 if value < 1: 299 msg = "timeout_secs must be >= 1" 300 raise ValueError(msg) 301 self._settings["timeout"] = value 302 303 @property 304 def run_timeout_secs(self) -> float | None: 305 """ 306 A limit to ``run_duration_secs``, that cancels running the query when exceeded. 307 308 Defaults to no timeout. 309 310 The client will raise a ``GiveupError`` if the timeout is reached. 311 312 Not to be confused with ``timeout_secs``, which is a setting for the Overpass API instance, 313 that limits a single query execution time. Instead, this value can be used to limit the 314 total client-side time spent on this query (see ``Client.run_query``). 315 """ 316 return self._run_timeout_secs 317 318 @run_timeout_secs.setter 319 def run_timeout_secs(self, value: float | None) -> None: 320 if value is not None and (not math.isfinite(value) or value <= 0.0): 321 msg = "'run_timeout_secs' must be finite > 0" 322 raise ValueError(msg) 323 self._run_timeout_secs = value 324 325 @property 326 def run_timeout_elapsed(self) -> bool: 327 """Returns ``True`` if ``run_timeout_secs`` is set and has elapsed.""" 328 return ( 329 self.run_timeout_secs is not None 330 and self.run_duration_secs is not None 331 and self.run_timeout_secs < self.run_duration_secs 332 ) 333 334 @property 335 def request_timeout(self) -> "RequestTimeout": 336 """Request timeout settings for this query.""" 337 return self._request_timeout 338 339 @request_timeout.setter 340 def request_timeout(self, value: "RequestTimeout") -> None: 341 self._request_timeout = value 342 343 def _code(self, next_timeout_secs_used: int) -> str: 344 """The query's QL code, substituting the [timeout:*] setting with the given duration.""" 345 settings_copy = self._settings.copy() 346 settings_copy["timeout"] = next_timeout_secs_used 347 348 # remove the original settings statement 349 code = _SETTING_PATTERN.sub("", self._input_code) 350 351 # put the adjusted settings in front 352 settings = "".join((f"[{k}:{v}]" for k, v in settings_copy.items())) + ";" 353 354 return f"{settings}\n{code}" 355 356 @property 357 def cache_key(self) -> str: 358 """ 359 Hash QL code, and return its digest as hexadecimal string. 360 361 The default query runner uses this as cache key. 362 """ 363 # Remove the original settings statement 364 code = _SETTING_PATTERN.sub("", self._input_code) 365 hasher = hashlib.blake2b(digest_size=8) 366 hasher.update(code.encode("utf-8")) 367 return hasher.hexdigest() 368 369 @property 370 def done(self) -> bool: 371 """Returns ``True`` if the result set was received.""" 372 return self._response is not None 373 374 @property 375 def request_duration_secs(self) -> float | None: 376 """ 377 How long it took to fetch the result set in seconds. 378 379 This is the duration starting with the API request, and ending once 380 the result is written to this query object. Although it depends on how busy 381 the API instance is, this can give some indication of how long a query takes. 382 383 Returns: 384 the duration or ``None`` if there is no result set yet, or when it was cached. 385 """ 386 if self._response is None or self.was_cached: 387 return None 388 389 assert self._time_end_try is not None 390 assert self._time_start_req is not None 391 392 return self._time_end_try - self._time_start_req 393 394 @property 395 def run_duration_secs(self) -> float | None: 396 """ 397 The total required time for this query in seconds (so far). 398 399 Returns: 400 the duration or ``None`` if there is no result set yet, or when it was cached. 401 """ 402 if self._time_start is None: 403 return None 404 405 if self._time_end_try: 406 return self._time_end_try - self._time_start 407 408 return self._time_start.elapsed_secs_since 409 410 @property 411 def _run_duration_left_secs(self) -> float | None: 412 """If a limit was set, returns the seconds until the time to run the query has elapsed.""" 413 if (time_max := self.run_timeout_secs) and (time_so_far := self.run_duration_secs): 414 return max(0, math.ceil(time_max - time_so_far)) 415 return None 416 417 @property 418 def api_version(self) -> str | None: 419 """ 420 The Overpass API version used by the queried instance. 421 422 Returns: 423 f.e. ``"Overpass API 0.7.56.8 7d656e78"``, or ``None`` if the query 424 has not successfully finished (yet) 425 426 References: 427 - https://wiki.openstreetmap.org/wiki/Overpass_API/versions 428 """ 429 if self._response is None: 430 return None 431 432 return self._response["generator"] 433 434 @property 435 def timestamp_osm(self) -> datetime | None: 436 """ 437 All OSM edits that have been uploaded before this date are included. 438 439 It can take a couple of minutes for changes to the database to show up in the 440 Overpass API query results. 441 442 Returns: 443 the timestamp, or ``None`` if the query has not successfully finished (yet) 444 """ 445 if self._response is None: 446 return None 447 448 date_str = self._response["osm3s"]["timestamp_osm_base"] 449 return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(UTC) 450 451 @property 452 def timestamp_areas(self) -> datetime | None: 453 """ 454 All area data edits that have been uploaded before this date are included. 455 456 If the query involves area data processing, this is the date of the latest edit 457 that has been considered in the most recent batch run of the area generation. 458 459 Returns: 460 the timestamp, or ``None`` if the query has not successfully finished (yet), or 461 if it does not involve area data processing. 462 """ 463 if self._response is None: 464 return None 465 466 date_str = self._response["osm3s"].get("timestamp_areas_base") 467 if not date_str: 468 return None 469 470 return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(UTC) 471 472 @property 473 def copyright(self) -> str: 474 """A copyright notice that comes with the result set.""" 475 if self._response is None: 476 return _COPYRIGHT 477 478 return self._response["osm3s"].get("copyright") or _COPYRIGHT 479 480 def __str__(self) -> str: 481 query = f"query{self.kwargs!r}" 482 483 size = self.response_size_mib 484 time_request = self.request_duration_secs 485 time_total = self.run_duration_secs 486 487 if self.nb_tries == 0: 488 details = "pending" 489 elif self.done: 490 if self.nb_tries == 1: 491 details = f"done - {size:.01f}mb in {time_request:.01f}s" 492 else: 493 details = f"done after {time_total:.01f}s - {size:.01f}mb in {time_request:.01f}s" 494 else: 495 t = "try" if self.nb_tries == 1 else "tries" 496 details = f"failing after {self.nb_tries} {t}, {time_total:.01f}s" 497 498 return f"{query} ({details})" 499 500 def __repr__(self) -> str: 501 cls_name = type(self).__name__ 502 503 details = { 504 "kwargs": self._kwargs, 505 "done": self.done, 506 } 507 508 if self.nb_tries == 0 or self.error: 509 details["tries"] = self.nb_tries 510 511 if self.error: 512 details["error"] = type(self.error).__name__ 513 514 if self.done: 515 details["response_size"] = f"{self.response_size_mib:.02f}mb" 516 517 if not self.was_cached: 518 details["request_duration"] = f"{self.request_duration_secs:.02f}s" 519 520 if self.nb_tries > 0: 521 details["run_duration"] = f"{self.run_duration_secs:.02f}s" 522 523 details_str = ", ".join((f"{k}={v!r}" for k, v in details.items())) 524 525 return f"{cls_name}({details_str})" 526 527 def _begin_try(self) -> None: 528 """First thing to call when starting the next try, after invoking the query runner.""" 529 if self._time_start is None: 530 self._time_start = Instant.now() 531 532 self._time_start_try = Instant.now() 533 self._time_start_req = None 534 self._time_end_try = None 535 536 def _begin_request(self) -> None: 537 """Call before making the API call of a try, after waiting for cooldown.""" 538 self._time_start_req = Instant.now() 539 540 def _succeed_try(self, response: dict, response_bytes: int) -> None: 541 """Call when the API call of a try was successful.""" 542 self._time_end_try = Instant.now() 543 self._response = response 544 self._response_bytes = response_bytes 545 self._error = None 546 547 def _fail_try(self, err: ClientError) -> None: 548 """Call when the API call of a try failed.""" 549 self._error = err 550 551 if is_exceeding_timeout(err): 552 self._max_timed_out_after_secs = err.timed_out_after_secs 553 554 def _end_try(self) -> None: 555 """Final call in a try.""" 556 self._nb_tries += 1
State of a query that is either pending, running, successful, or failed.
Arguments:
- input_code: The input Overpass QL code. Note that some settings might be changed by query runners, notably the 'timeout' and 'maxsize' settings.
- logger: The logger to use for all logging output related to this query.
- **kwargs: Additional keyword arguments that can be used to identify queries.
References:
93 def __init__( 94 self, 95 input_code: str, 96 logger: logging.Logger = _NULL_LOGGER, 97 **kwargs: Any, # noqa: ANN401 98 ) -> None: 99 self._run_lock: Final[threading.Lock] = threading.Lock() 100 """a lock used to ensure a query cannot be run more than once at the same time""" 101 102 self._input_code: Final[str] = input_code 103 """the original given overpass ql code""" 104 105 self._logger: Final[logging.Logger] = logger 106 """logger to use for this query""" 107 108 self._kwargs: Final[dict] = kwargs 109 """used to identify this query""" 110 111 self._settings = dict(_SETTING_PATTERN.findall(input_code)) 112 """all overpass ql settings [k:v];""" 113 114 if "out" in self._settings and self._settings["out"] != "json": 115 msg = "the '[out:*]' setting is implicitly set to 'json' and should be omitted" 116 raise ValueError(msg) 117 118 self._settings["out"] = "json" 119 120 if "maxsize" not in self._settings: 121 self._settings["maxsize"] = DEFAULT_MAXSIZE_MIB * 1024 * 1024 122 elif not self._settings["maxsize"].isdigit() or int(self._settings["maxsize"]) <= 0: 123 msg = "the '[maxsize:*]' setting must be an integer > 0" 124 raise ValueError(msg) 125 126 if "timeout" not in self._settings: 127 self._settings["timeout"] = DEFAULT_TIMEOUT_SECS 128 elif not self._settings["timeout"].isdigit() or int(self._settings["timeout"]) <= 0: 129 msg = "the '[timeout:*]' setting must be an integer > 0" 130 raise ValueError(msg) 131 132 self._run_timeout_secs: float | None = None 133 """total time limit for running this query""" 134 135 self._request_timeout: RequestTimeout = RequestTimeout() 136 """config for request timeouts""" 137 138 self._error: ClientError | None = None 139 """error of the last try, or None""" 140 141 self._response: dict | None = None 142 """response JSON as a dict, or None""" 143 144 self._response_bytes = 0.0 145 """number of bytes in a response, or zero""" 146 147 self._nb_tries = 0 148 """number of tries so far, starting at zero""" 149 150 self._time_start: Instant | None = None 151 """time prior to executing the first try""" 152 153 self._time_start_try: Instant | None = None 154 """time prior to executing the most recent try""" 155 156 self._time_start_req: Instant | None = None 157 """time prior to executing the most recent try's query request""" 158 159 self._time_end_try: Instant | None = None 160 """time the most recent try finished""" 161 162 self._max_timed_out_after_secs: int | None = None 163 """maximum of seconds after which the query was cancelled"""
165 def reset(self) -> None: 166 """Reset the query to its initial state, ignoring previous tries.""" 167 Query.__init__( 168 self, 169 input_code=self._input_code, 170 logger=self._logger, 171 **self._kwargs, 172 )
Reset the query to its initial state, ignoring previous tries.
174 @property 175 def input_code(self) -> str: 176 """The original input Overpass QL source code.""" 177 return self._input_code
The original input Overpass QL source code.
179 @property 180 def kwargs(self) -> dict: 181 """ 182 Keyword arguments that can be used to identify queries. 183 184 The default query runner will log these values when a query is run. 185 """ 186 return self._kwargs
Keyword arguments that can be used to identify queries.
The default query runner will log these values when a query is run.
188 @property 189 def logger(self) -> logging.Logger: 190 """The logger used for logging output related to this query.""" 191 return self._logger
The logger used for logging output related to this query.
193 @property 194 def nb_tries(self) -> int: 195 """Current number of tries.""" 196 return self._nb_tries
Current number of tries.
198 @property 199 def error(self) -> ClientError | None: 200 """ 201 Error of the most recent try. 202 203 Returns: 204 an error or ``None`` if the query wasn't tried or hasn't failed 205 """ 206 return self._error
Error of the most recent try.
Returns:
an error or
None
if the query wasn't tried or hasn't failed
208 @property 209 def response(self) -> dict | None: 210 """ 211 The entire JSON response of the query. 212 213 Returns: 214 the response, or ``None`` if the query has not successfully finished (yet) 215 """ 216 return self._response
The entire JSON response of the query.
Returns:
the response, or
None
if the query has not successfully finished (yet)
218 @property 219 def was_cached(self) -> bool | None: 220 """ 221 Indicates whether the query result was cached. 222 223 Returns: 224 ``None`` if the query has not been run yet. 225 ``True`` if the query has a result set with zero tries. 226 ``False`` otherwise. 227 """ 228 if self._response is None: 229 return None 230 return self._nb_tries == 0
Indicates whether the query result was cached.
Returns:
None
if the query has not been run yet.True
if the query has a result set with zero tries.False
otherwise.
232 @property 233 def result_set(self) -> list[dict] | None: 234 """ 235 The result set of the query. 236 237 This is open data, licensed under the Open Data Commons Open Database License (ODbL). 238 You are free to copy, distribute, transmit and adapt this data, as long as you credit 239 OpenStreetMap and its contributors. If you alter or build upon this data, you may 240 distribute the result only under the same licence. 241 242 Returns: 243 the elements of the result set, or ``None`` if the query has not successfully 244 finished (yet) 245 246 References: 247 - https://www.openstreetmap.org/copyright 248 - https://opendatacommons.org/licenses/odbl/1-0/ 249 """ 250 if not self._response: 251 return None 252 return self._response["elements"]
The result set of the query.
This is open data, licensed under the Open Data Commons Open Database License (ODbL). You are free to copy, distribute, transmit and adapt this data, as long as you credit OpenStreetMap and its contributors. If you alter or build upon this data, you may distribute the result only under the same licence.
Returns:
the elements of the result set, or
None
if the query has not successfully finished (yet)
References:
254 @property 255 def response_size_mib(self) -> float | None: 256 """ 257 The size of the response in mebibytes. 258 259 Returns: 260 the size, or ``None`` if the query has not successfully finished (yet) 261 """ 262 if self._response is None: 263 return None 264 return self._response_bytes / 1024.0 / 1024.0
The size of the response in mebibytes.
Returns:
the size, or
None
if the query has not successfully finished (yet)
266 @property 267 def maxsize_mib(self) -> float: 268 """ 269 The current value of the [maxsize:*] setting in mebibytes. 270 271 This size indicates the maximum allowed memory for the query in bytes RAM on the server, 272 as expected by the user. If the query needs more RAM than this value, the server may abort 273 the query with a memory exhaustion. The higher this size, the more probably the server 274 rejects the query before executing it. 275 """ 276 return float(self._settings["maxsize"]) // 1024.0 // 1024.0
The current value of the [maxsize:*] setting in mebibytes.
This size indicates the maximum allowed memory for the query in bytes RAM on the server, as expected by the user. If the query needs more RAM than this value, the server may abort the query with a memory exhaustion. The higher this size, the more probably the server rejects the query before executing it.
285 @property 286 def timeout_secs(self) -> int: 287 """ 288 The current value of the [timeout:*] setting in seconds. 289 290 This duration is the maximum allowed runtime for the query in seconds, as expected by the 291 user. If the query runs longer than this time, the server may abort the query. The higher 292 this duration, the more probably the server rejects the query before executing it. 293 """ 294 return int(self._settings["timeout"])
The current value of the [timeout:*] setting in seconds.
This duration is the maximum allowed runtime for the query in seconds, as expected by the user. If the query runs longer than this time, the server may abort the query. The higher this duration, the more probably the server rejects the query before executing it.
303 @property 304 def run_timeout_secs(self) -> float | None: 305 """ 306 A limit to ``run_duration_secs``, that cancels running the query when exceeded. 307 308 Defaults to no timeout. 309 310 The client will raise a ``GiveupError`` if the timeout is reached. 311 312 Not to be confused with ``timeout_secs``, which is a setting for the Overpass API instance, 313 that limits a single query execution time. Instead, this value can be used to limit the 314 total client-side time spent on this query (see ``Client.run_query``). 315 """ 316 return self._run_timeout_secs
A limit to run_duration_secs
, that cancels running the query when exceeded.
Defaults to no timeout.
The client will raise a GiveupError
if the timeout is reached.
Not to be confused with timeout_secs
, which is a setting for the Overpass API instance,
that limits a single query execution time. Instead, this value can be used to limit the
total client-side time spent on this query (see Client.run_query
).
325 @property 326 def run_timeout_elapsed(self) -> bool: 327 """Returns ``True`` if ``run_timeout_secs`` is set and has elapsed.""" 328 return ( 329 self.run_timeout_secs is not None 330 and self.run_duration_secs is not None 331 and self.run_timeout_secs < self.run_duration_secs 332 )
Returns True
if run_timeout_secs
is set and has elapsed.
334 @property 335 def request_timeout(self) -> "RequestTimeout": 336 """Request timeout settings for this query.""" 337 return self._request_timeout
Request timeout settings for this query.
356 @property 357 def cache_key(self) -> str: 358 """ 359 Hash QL code, and return its digest as hexadecimal string. 360 361 The default query runner uses this as cache key. 362 """ 363 # Remove the original settings statement 364 code = _SETTING_PATTERN.sub("", self._input_code) 365 hasher = hashlib.blake2b(digest_size=8) 366 hasher.update(code.encode("utf-8")) 367 return hasher.hexdigest()
Hash QL code, and return its digest as hexadecimal string.
The default query runner uses this as cache key.
369 @property 370 def done(self) -> bool: 371 """Returns ``True`` if the result set was received.""" 372 return self._response is not None
Returns True
if the result set was received.
374 @property 375 def request_duration_secs(self) -> float | None: 376 """ 377 How long it took to fetch the result set in seconds. 378 379 This is the duration starting with the API request, and ending once 380 the result is written to this query object. Although it depends on how busy 381 the API instance is, this can give some indication of how long a query takes. 382 383 Returns: 384 the duration or ``None`` if there is no result set yet, or when it was cached. 385 """ 386 if self._response is None or self.was_cached: 387 return None 388 389 assert self._time_end_try is not None 390 assert self._time_start_req is not None 391 392 return self._time_end_try - self._time_start_req
How long it took to fetch the result set in seconds.
This is the duration starting with the API request, and ending once the result is written to this query object. Although it depends on how busy the API instance is, this can give some indication of how long a query takes.
Returns:
the duration or
None
if there is no result set yet, or when it was cached.
394 @property 395 def run_duration_secs(self) -> float | None: 396 """ 397 The total required time for this query in seconds (so far). 398 399 Returns: 400 the duration or ``None`` if there is no result set yet, or when it was cached. 401 """ 402 if self._time_start is None: 403 return None 404 405 if self._time_end_try: 406 return self._time_end_try - self._time_start 407 408 return self._time_start.elapsed_secs_since
The total required time for this query in seconds (so far).
Returns:
the duration or
None
if there is no result set yet, or when it was cached.
417 @property 418 def api_version(self) -> str | None: 419 """ 420 The Overpass API version used by the queried instance. 421 422 Returns: 423 f.e. ``"Overpass API 0.7.56.8 7d656e78"``, or ``None`` if the query 424 has not successfully finished (yet) 425 426 References: 427 - https://wiki.openstreetmap.org/wiki/Overpass_API/versions 428 """ 429 if self._response is None: 430 return None 431 432 return self._response["generator"]
The Overpass API version used by the queried instance.
Returns:
f.e.
"Overpass API 0.7.56.8 7d656e78"
, orNone
if the query has not successfully finished (yet)
References:
434 @property 435 def timestamp_osm(self) -> datetime | None: 436 """ 437 All OSM edits that have been uploaded before this date are included. 438 439 It can take a couple of minutes for changes to the database to show up in the 440 Overpass API query results. 441 442 Returns: 443 the timestamp, or ``None`` if the query has not successfully finished (yet) 444 """ 445 if self._response is None: 446 return None 447 448 date_str = self._response["osm3s"]["timestamp_osm_base"] 449 return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(UTC)
All OSM edits that have been uploaded before this date are included.
It can take a couple of minutes for changes to the database to show up in the Overpass API query results.
Returns:
the timestamp, or
None
if the query has not successfully finished (yet)
451 @property 452 def timestamp_areas(self) -> datetime | None: 453 """ 454 All area data edits that have been uploaded before this date are included. 455 456 If the query involves area data processing, this is the date of the latest edit 457 that has been considered in the most recent batch run of the area generation. 458 459 Returns: 460 the timestamp, or ``None`` if the query has not successfully finished (yet), or 461 if it does not involve area data processing. 462 """ 463 if self._response is None: 464 return None 465 466 date_str = self._response["osm3s"].get("timestamp_areas_base") 467 if not date_str: 468 return None 469 470 return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(UTC)
All area data edits that have been uploaded before this date are included.
If the query involves area data processing, this is the date of the latest edit that has been considered in the most recent batch run of the area generation.
Returns:
the timestamp, or
None
if the query has not successfully finished (yet), or if it does not involve area data processing.
600class QueryRunner(ABC): 601 """ 602 A query runner is an async function that is called before a client makes an API request. 603 604 Query runners can be used to… 605 - …retry queries when they fail 606 - …modify queries, f.e. to lower/increase their maxsize/timeout 607 - …log query results & errors 608 - …implement caching 609 610 The absolute minimum a query runner function has to do is to simply return to (re)try 611 a query, or to raise ``query.error`` to stop trying. 612 """ 613 614 __slots__ = () 615 616 @abstractmethod 617 async def __call__(self, query: Query) -> None: 618 """Called with the current query state before the client makes an API request."""
A query runner is an async function that is called before a client makes an API request.
Query runners can be used to…
- …retry queries when they fail
- …modify queries, f.e. to lower/increase their maxsize/timeout
- …log query results & errors
- …implement caching
The absolute minimum a query runner function has to do is to simply return to (re)try
a query, or to raise query.error
to stop trying.
621class DefaultQueryRunner(QueryRunner): 622 """ 623 The default query runner. 624 625 This runner… 626 - …retries with an increasing back-off period in between tries if the server is too busy 627 - …retries and doubles timeout and maxsize settings if they were exceeded 628 - …limits the number of tries 629 - …optionally caches query results in temp files 630 631 This runner does *not* lower timeout and maxsize settings if the server rejected a query. 632 633 Args: 634 max_tries: The maximum number of times a query is tried. (5 by default) 635 cache_ttl_secs: Amount of seconds a query's result set is cached for. 636 Set to zero to disable caching. (zero by default) 637 """ 638 639 __slots__ = ( 640 "_cache_ttl_secs", 641 "_max_tries", 642 ) 643 644 def __init__(self, max_tries: int = 5, cache_ttl_secs: int = 0) -> None: 645 if max_tries < 1: 646 msg = "max_tries must be >= 1" 647 raise ValueError(msg) 648 649 if cache_ttl_secs < 0: 650 msg = "cache_ttl_secs must be >= 0" 651 raise ValueError(msg) 652 653 self._max_tries = max_tries 654 self._cache_ttl_secs = cache_ttl_secs 655 656 def _is_caching(self, query: Query) -> bool: 657 if self._cache_ttl_secs and FORCE_DISABLE_CACHE: 658 query.logger.debug("caching is forced disabled") 659 return False 660 return self._cache_ttl_secs > 0 661 662 @staticmethod 663 def _cache_file_name(query: Query) -> str: 664 return f"{query.cache_key}.json" 665 666 @staticmethod 667 def _cache_file_path(query: Query) -> Path: 668 return Path(tempfile.gettempdir()) / DefaultQueryRunner._cache_file_name(query) 669 670 @staticmethod 671 def _cache_read(query: Query) -> None: 672 logger = query.logger 673 674 now = int(time.time()) 675 676 file_path = DefaultQueryRunner._cache_file_path(query) 677 678 if not file_path.exists(): 679 logger.info("result was not cached") 680 logger.debug(f"checked for cache at {file_path}") 681 return 682 683 try: 684 with Path(file_path).open(encoding="utf-8") as file: 685 response = json.load(file) 686 except (OSError, json.JSONDecodeError): 687 logger.exception(f"failed to read cached {query}") 688 return 689 690 if response.get(_EXPIRATION_KEY, 0) <= now: 691 logger.info(f"{query} cache expired") 692 return 693 694 query._response = response 695 logger.info(f"{query} was cached") 696 697 def _cache_write(self, query: Query) -> None: 698 logger = query.logger 699 700 now = int(time.time()) 701 702 assert query._response is not None 703 query._response[_EXPIRATION_KEY] = now + self._cache_ttl_secs 704 705 file_path = DefaultQueryRunner._cache_file_path(query) 706 707 logger.debug(f"caching at {file_path}…") 708 709 try: 710 with Path(file_path).open(mode="w", encoding="utf-8") as file: 711 json.dump(query._response, file) 712 except OSError: 713 logger.exception(f"failed to cache {query}") 714 715 @staticmethod 716 def cache_delete(query: Query) -> None: 717 """Clear a cached response for the given query by removing the file on disk.""" 718 file_path = DefaultQueryRunner._cache_file_path(query) 719 file_path.unlink(missing_ok=True) 720 721 @staticmethod 722 def _cache_expire(query: Query) -> None: 723 """ 724 Clear a cached response for the given query by marking it expired in the file on disk. 725 726 This should only be used for testing. 727 """ 728 file_path = DefaultQueryRunner._cache_file_path(query) 729 730 with Path(file_path).open(encoding="utf-8") as file: 731 response = json.load(file) 732 733 response[_EXPIRATION_KEY] = 0 734 735 with Path(file_path).open(mode="w", encoding="utf-8") as file: 736 json.dump(response, file) 737 738 async def __call__(self, query: Query) -> None: # noqa: C901 739 """Called with the current query state before the client makes an API request.""" 740 logger = query.logger 741 742 # Check cache ahead of first try 743 if query.nb_tries == 0 and self._is_caching(query): 744 await asyncio.to_thread(DefaultQueryRunner._cache_read, query) 745 746 # Success or cached 747 if query.done: 748 logger.info(f"{query}") 749 if not query.was_cached and self._is_caching(query): 750 await asyncio.to_thread(self._cache_write, query) 751 return 752 753 err = query.error 754 755 if err: 756 logger.info(f"try for query{query.kwargs!r} failed: {err}") 757 758 if is_server_error(err): 759 logger.error(f"unexpected response body:\n{err.body}") 760 761 # Do not retry if we exhausted all tries, when a retry would not change the result, 762 # or when the timeout was reached. 763 if err and (query.nb_tries == self._max_tries or not err.should_retry): 764 logger.error(f"give up on {query}", exc_info=err) 765 raise err 766 767 if is_rejection(err): 768 # Wait if the server is too busy. 769 if err.cause == QueryRejectCause.TOO_BUSY: 770 backoff = _fibo_backoff_secs(query.nb_tries) 771 logger.info(f"retry {query} in {backoff:.1f}s") 772 await sleep(backoff) 773 774 # Wait until a slot opens if the rate limit was exceeded. 775 elif err.cause == QueryRejectCause.TOO_MANY_QUERIES: 776 pass # let client enforce cooldown 777 778 # Double timeout if exceeded. 779 elif err.cause == QueryRejectCause.EXCEEDED_TIMEOUT: 780 old = f"{query.timeout_secs:.1f}s" 781 query.timeout_secs *= 2 782 new = f"{query.timeout_secs:.1f}s" 783 logger.info(f"increased [timeout:*] for {query} from {old} to {new}") 784 785 # Double maxsize if exceeded. 786 elif err.cause == QueryRejectCause.EXCEEDED_MAXSIZE: 787 old = f"{query.maxsize_mib:.1f}mib" 788 query.maxsize_mib *= 2 789 new = f"{query.maxsize_mib:.1f}mib" 790 logger.info(f"increased [maxsize:*] for {query} from {old} to {new}")
The default query runner.
This runner…
- …retries with an increasing back-off period in between tries if the server is too busy
- …retries and doubles timeout and maxsize settings if they were exceeded
- …limits the number of tries
- …optionally caches query results in temp files
This runner does not lower timeout and maxsize settings if the server rejected a query.
Arguments:
- max_tries: The maximum number of times a query is tried. (5 by default)
- cache_ttl_secs: Amount of seconds a query's result set is cached for. Set to zero to disable caching. (zero by default)
644 def __init__(self, max_tries: int = 5, cache_ttl_secs: int = 0) -> None: 645 if max_tries < 1: 646 msg = "max_tries must be >= 1" 647 raise ValueError(msg) 648 649 if cache_ttl_secs < 0: 650 msg = "cache_ttl_secs must be >= 0" 651 raise ValueError(msg) 652 653 self._max_tries = max_tries 654 self._cache_ttl_secs = cache_ttl_secs
715 @staticmethod 716 def cache_delete(query: Query) -> None: 717 """Clear a cached response for the given query by removing the file on disk.""" 718 file_path = DefaultQueryRunner._cache_file_path(query) 719 file_path.unlink(missing_ok=True)
Clear a cached response for the given query by removing the file on disk.
559@dataclass(kw_only=True, slots=True) 560class RequestTimeout: 561 """ 562 Request timeout settings. 563 564 Attributes: 565 total_without_query_secs: If set, the sum of this duration and the query's ``[timeout:*]`` 566 setting is used as timeout duration of the entire request, 567 including connection establishment, request sending and response 568 reading (``aiohttp.ClientTimeout.total``). 569 Defaults to 20 seconds. 570 sock_connect_secs: The maximum number of seconds allowed for pure socket connection 571 establishment (same as ``aiohttp.ClientTimeout.sock_connect``). 572 each_sock_read_secs: The maximum number of seconds allowed for the period between reading 573 a new chunk of data (same as ``aiohttp.ClientTimeout.sock_read``). 574 """ 575 576 total_without_query_secs: float | None = 20.0 577 sock_connect_secs: float | None = None 578 each_sock_read_secs: float | None = None 579 580 def __post_init__(self) -> None: 581 if self.total_without_query_secs is not None and ( 582 not math.isfinite(self.total_without_query_secs) or self.total_without_query_secs <= 0.0 583 ): 584 msg = "'total_without_query_secs' must be finite > 0" 585 raise ValueError(msg) 586 587 if self.sock_connect_secs is not None and ( 588 not math.isfinite(self.sock_connect_secs) or self.sock_connect_secs <= 0.0 589 ): 590 msg = "'sock_connect_secs' must be finite > 0" 591 raise ValueError(msg) 592 593 if self.each_sock_read_secs is not None and ( 594 not math.isfinite(self.each_sock_read_secs) or self.each_sock_read_secs <= 0.0 595 ): 596 msg = "'each_sock_read_secs' must be finite > 0" 597 raise ValueError(msg)
Request timeout settings.
Attributes:
- total_without_query_secs: If set, the sum of this duration and the query's
[timeout:*]
setting is used as timeout duration of the entire request, including connection establishment, request sending and response reading (aiohttp.ClientTimeout.total
). Defaults to 20 seconds. - sock_connect_secs: The maximum number of seconds allowed for pure socket connection
establishment (same as
aiohttp.ClientTimeout.sock_connect
). - each_sock_read_secs: The maximum number of seconds allowed for the period between reading
a new chunk of data (same as
aiohttp.ClientTimeout.sock_read
).
Default maxsize
setting in mebibytes.
Default timeout
setting in seconds.