aio_overpass.client
Interface for making API calls.
1"""Interface for making API calls.""" 2 3import math 4import re 5from collections.abc import AsyncIterator 6from contextlib import asynccontextmanager, suppress 7from dataclasses import dataclass 8from typing import Final 9from urllib.parse import urljoin 10 11from aio_overpass import __version__ 12from aio_overpass._clock import sleep 13from aio_overpass.error import ( 14 AlreadyRunningError, 15 CallError, 16 CallTimeoutError, 17 ClientError, 18 GiveupCause, 19 GiveupError, 20 RunnerError, 21 _raise_for_request_error, 22 _raise_for_response, 23 _result_or_raise, 24 is_too_many_queries, 25) 26from aio_overpass.query import DefaultQueryRunner, Query, QueryRunner 27 28import aiohttp 29from aiohttp import ClientTimeout 30 31 32__docformat__ = "google" 33__all__ = ( 34 "Client", 35 "Status", 36 "DEFAULT_INSTANCE", 37 "DEFAULT_USER_AGENT", 38) 39 40 41DEFAULT_INSTANCE: Final[str] = "https://overpass-api.de/api/" 42"""Main Overpass API instance.""" 43 44DEFAULT_USER_AGENT: Final[str] = ( 45 f"aio-overpass/{__version__} (https://github.com/timwie/aio-overpass)" 46) 47"""User agent that points to the ``aio-overpass`` repo.""" 48 49 50@dataclass(kw_only=True, slots=True) 51class Status: 52 """ 53 Information about the API server's rate limit. 54 55 Attributes: 56 slots: The maximum number of concurrent queries per IP 57 (or ``None`` if there is no rate limit). 58 free_slots: The number of slots open for this IP 59 (or ``None`` if there is no rate limit). 60 cooldown_secs: The number of seconds until a slot opens for this IP 61 (or 0 if there is a free slot). 62 endpoint: Announced endpoint. For example, there are two distinct servers 63 that both can be reached by the main Overpass API instance. 64 Depending on server load, a query may be sent to either of them. 65 This value is the server name, f.e. ``"gall.openstreetmap.de/"``. 66 nb_running_queries: Number of currently running queries for this IP. 67 """ 68 69 slots: int | None 70 free_slots: int | None 71 cooldown_secs: int 72 endpoint: str | None 73 nb_running_queries: int 74 75 def __repr__(self) -> str: 76 rep = f"{type(self).__name__}(free_slots=" 77 rep += f"{self.free_slots}/{self.slots}" if self.slots else "∞" 78 rep += f", cooldown={self.cooldown_secs}s" 79 rep += f", running={self.nb_running_queries}" 80 if self.endpoint: 81 rep += f", endpoint={self.endpoint!r}" 82 rep += ")" 83 return rep 84 85 86class Client: 87 """ 88 A client for the Overpass API. 89 90 Requests are rate-limited according to the configured number of slots per IP for the specified 91 API server. By default, queries are retried whenever the server is too busy, or the rate limit 92 was exceeded. Custom query runners can be used to implement your own retry strategy. 93 94 Args: 95 url: The url of an Overpass API instance. Defaults to the main Overpass API instance. 96 user_agent: A string used for the User-Agent header. It is good practice to provide a string 97 that identifies your application, and includes a way to contact you (f.e. an 98 e-mail, or a link to a repository). This is important if you make too many 99 requests, or queries that require a lot of resources. 100 concurrency: The maximum number of simultaneous connections. In practice the amount 101 of concurrent queries may be limited by the number of slots it provides for 102 each IP. 103 status_timeout_secs: If set, status requests to the Overpass API will time out after 104 this duration in seconds. Defaults to no timeout. 105 runner: You can provide another query runner if you want to implement your own retry 106 strategy. 107 108 References: 109 - https://wiki.openstreetmap.org/wiki/Overpass_API#Public_Overpass_API_instances 110 """ 111 112 __slots__ = ( 113 "_concurrency", 114 "_maybe_session", 115 "_runner", 116 "_status_timeout_secs", 117 "_url", 118 "_user_agent", 119 ) 120 121 def __init__( 122 self, 123 url: str = DEFAULT_INSTANCE, 124 user_agent: str = DEFAULT_USER_AGENT, 125 concurrency: int = 32, 126 status_timeout_secs: float | None = None, 127 runner: QueryRunner | None = None, 128 ) -> None: 129 if concurrency <= 0: 130 msg = "'concurrency' must be > 0" 131 raise ValueError(msg) 132 if status_timeout_secs is not None and ( 133 not math.isfinite(status_timeout_secs) or status_timeout_secs <= 0.0 134 ): 135 msg = "'status_timeout_secs' must be finite > 0" 136 raise ValueError(msg) 137 138 self._url: Final[str] = url 139 self._user_agent: Final[str] = user_agent 140 self._concurrency: Final[int] = concurrency 141 self._status_timeout_secs: Final[float | None] = status_timeout_secs 142 self._runner: Final[QueryRunner] = runner or DefaultQueryRunner() 143 144 self._maybe_session: aiohttp.ClientSession | None = None 145 146 def _session(self) -> aiohttp.ClientSession: 147 """The session used for all requests of this client.""" 148 if not self._maybe_session or self._maybe_session.closed: 149 headers = {"User-Agent": self._user_agent} 150 connector = aiohttp.TCPConnector(limit=self._concurrency) 151 self._maybe_session = aiohttp.ClientSession(headers=headers, connector=connector) 152 153 return self._maybe_session 154 155 async def close(self) -> None: 156 """Cancel all running queries and close the underlying session.""" 157 if self._maybe_session and not self._maybe_session.closed: 158 # do not care if this fails 159 with suppress(CallError): 160 _ = await self.cancel_queries() 161 162 # is raised when there are still active queries. that's ok 163 with suppress(aiohttp.ServerDisconnectedError): 164 await self._maybe_session.close() 165 166 async def _status(self, timeout: ClientTimeout | None = None) -> "Status": 167 endpoint = urljoin(self._url, "status") 168 timeout = timeout or aiohttp.ClientTimeout(total=self._status_timeout_secs) 169 async with ( 170 _map_request_error(timeout), 171 self._session().get(url=endpoint, timeout=timeout) as response, 172 ): 173 return await _parse_status(response) 174 175 async def status(self) -> Status: 176 """ 177 Check the current API status. 178 179 The timeout of this request is configured with the ``status_timeout_secs`` argument. 180 181 Raises: 182 ClientError: if the status could not be looked up 183 """ 184 return await self._status() 185 186 async def cancel_queries(self, timeout_secs: float | None = None) -> int: 187 """ 188 Cancel all running queries. 189 190 This can be used to terminate runaway queries that prevent you from sending new ones. 191 192 Returns: 193 the number of terminated queries 194 195 Raises: 196 ClientError: if the request to cancel queries failed 197 """ 198 if timeout_secs is not None and (not math.isfinite(timeout_secs) or timeout_secs <= 0.0): 199 msg = "'timeout_secs' must be finite > 0" 200 raise ValueError(msg) 201 202 timeout = aiohttp.ClientTimeout(total=timeout_secs) if timeout_secs else None 203 headers = {"User-Agent": self._user_agent} 204 endpoint = urljoin(self._url, "kill_my_queries") 205 206 # use a new session here to get around our concurrency limit 207 async with ( 208 aiohttp.ClientSession(headers=headers) as session, 209 _map_request_error(timeout), 210 session.get(endpoint, timeout=timeout) as response, 211 ): 212 body = await response.text() 213 killed_pids = re.findall("\\(pid (\\d+)\\)", body) 214 return len(set(killed_pids)) 215 216 async def run_query(self, query: Query, *, raise_on_failure: bool = True) -> None: 217 """ 218 Send a query to the API, and await its completion. 219 220 "Running" the query entails acquiring a connection from the pool, the query requests 221 themselves (which may be retried), status requests when the server is busy, 222 and cooldown periods. 223 224 The query runner is invoked before every try, and once after the last try. 225 226 To run multiple queries concurrently, wrap the returned coroutines in an ``asyncio`` task, 227 f.e. with ``asyncio.create_task()`` and subsequent ``asyncio.gather()``. 228 229 Args: 230 query: the query to run on this API instance 231 raise_on_failure: if ``True``, raises ``query.error`` if the query failed 232 233 Raises: 234 ClientError: when query or status requests fail. If the query was retried, the error 235 of the last try will be raised. The same exception is also captured in 236 ``query.error``. Raising can be prevented by setting ``raise_on_failure`` 237 to ``False``. 238 RunnerError: when a call to the query runner raises. This exception is raised 239 even if ``raise_on_failure` is ``False``, since it is likely an error 240 that is not just specific to this query. 241 AlreadyRunningError: when another ``run_query()`` call on this query has not finished 242 yet. This is not affected by ``raise_on_failure``. 243 """ 244 if query.done: 245 return # nothing to do 246 247 if not query._run_lock.acquire(blocking=False): 248 raise AlreadyRunningError(kwargs=query.kwargs) 249 250 try: 251 if query.nb_tries > 0: 252 query.reset() # reset failed queries 253 254 # query runner is invoked before every try, and once after the last try 255 while True: 256 await self._invoke_runner(query, raise_on_failure=raise_on_failure) 257 if query.done: 258 return 259 await self._try_query_once(query) 260 finally: 261 query._run_lock.release() 262 263 async def _invoke_runner(self, query: Query, *, raise_on_failure: bool) -> None: 264 """ 265 Invoke the query runner. 266 267 Raises: 268 ClientError: if the runner raises ``query.error`` 269 ValueError: if the runner raises a different ``ClientError`` than ``query.error`` 270 RunnerError: if the runner raises any other exception (which it shouldn't) 271 """ 272 try: 273 await self._runner(query) 274 except ClientError as err: 275 if err is not query.error: 276 msg = "query runner raised a ClientError other than 'query.error'" 277 raise ValueError(msg) from err 278 if raise_on_failure: 279 raise 280 except AssertionError: 281 raise 282 except BaseException as err: 283 raise RunnerError(cause=err) from err 284 285 async def _try_query_once(self, query: Query) -> None: 286 """A single iteration of running a query.""" 287 query._begin_try() 288 289 try: 290 await self._cooldown(query) 291 292 req_timeout = _next_query_req_timeout(query) 293 294 # pick the timeout we will use for the next try 295 # TODO: not sure if this should also update the timeout setting in the Query state; 296 # for now, pass it as parameter to the _code() function 297 next_timeout_secs = _next_timeout_secs(query) 298 299 data = query._code(next_timeout_secs) 300 301 query._begin_request() 302 303 query.logger.info(f"call api for {query}") 304 305 async with ( 306 _map_request_error(req_timeout), 307 self._session().post( 308 url=urljoin(self._url, "interpreter"), 309 data=data, 310 timeout=req_timeout, 311 ) as response, 312 ): 313 query._succeed_try( 314 response=await _result_or_raise(response, query.kwargs, query.logger), 315 response_bytes=response.content.total_bytes, 316 ) 317 318 except CallTimeoutError as err: 319 fail_with: ClientError = err 320 if query.run_timeout_elapsed: 321 assert query.run_duration_secs is not None 322 fail_with = GiveupError( 323 cause=GiveupCause.RUN_TIMEOUT_DURING_QUERY_CALL, 324 kwargs=query.kwargs, 325 after_secs=query.run_duration_secs, 326 ) 327 query._fail_try(fail_with) 328 329 except ClientError as err: 330 query._fail_try(err) 331 332 finally: 333 query._end_try() 334 335 async def _cooldown(self, query: Query) -> None: 336 """ 337 If the given query failed with ``TOO_MANY_QUERIES``, check for a cooldown period. 338 339 Raises: 340 ClientError: if the status request to find out the cooldown period fails 341 GiveupError: if the cooldown is longer than the remaining run duration 342 """ 343 logger = query.logger 344 345 if not is_too_many_queries(query.error): 346 return 347 348 # If this client is running too many queries, we can check the status for a 349 # cooldown period. This request failing is a bit of an edge case. 350 # 'query.error' will be overwritten, which means we will not check for a 351 # cooldown in the next iteration. 352 status = await self._status(timeout=self._next_status_req_timeout(query)) 353 354 if not status.cooldown_secs: 355 return 356 357 run_duration = query.run_duration_secs 358 assert run_duration is not None 359 360 if run_timeout := query.run_timeout_secs: 361 remaining = run_timeout - run_duration 362 363 if status.cooldown_secs > remaining: 364 logger.error(f"give up on {query} due to {status.cooldown_secs:.1f}s cooldown") 365 raise GiveupError( 366 cause=GiveupCause.RUN_TIMEOUT_BY_COOLDOWN, 367 kwargs=query.kwargs, 368 after_secs=run_duration, 369 ) 370 371 logger.info(f"{query} has cooldown for {status.cooldown_secs:.1f}s") 372 await sleep(status.cooldown_secs) 373 374 def _next_status_req_timeout(self, query: Query) -> aiohttp.ClientTimeout: 375 """Status request timeout; possibly limited by either the run or status timeout settings.""" 376 remaining = None 377 378 run_duration = query.run_duration_secs 379 assert run_duration is not None 380 381 if run_timeout := query.run_timeout_secs: 382 remaining = run_timeout - run_duration 383 384 if remaining <= 0.0: 385 raise GiveupError( 386 cause=GiveupCause.RUN_TIMEOUT_BEFORE_STATUS_CALL, 387 kwargs=query.kwargs, 388 after_secs=run_duration, 389 ) 390 391 if self._status_timeout_secs: 392 remaining = min(remaining, self._status_timeout_secs) # cap timeout if configured 393 394 return aiohttp.ClientTimeout(total=remaining) 395 396 397def _next_query_req_timeout(query: Query) -> aiohttp.ClientTimeout: 398 """Query request timeout; possibly limited by either the run or request timeout settings.""" 399 run_total = None # time left until "run_timeout_secs" exceeded 400 query_total = None # "[timeout:*]" setting plus "total_without_query_secs" 401 402 run_duration = query.run_duration_secs 403 assert run_duration is not None 404 405 if run_timeout := query.run_timeout_secs: 406 run_total = run_timeout - run_duration 407 if run_total <= 0.0: 408 raise GiveupError( 409 cause=GiveupCause.RUN_TIMEOUT_BEFORE_QUERY_CALL, 410 kwargs=query.kwargs, 411 after_secs=run_duration, 412 ) 413 414 if add_to_timeout_secs := query.request_timeout.total_without_query_secs: 415 query_total = float(query.timeout_secs) + add_to_timeout_secs 416 assert query_total > 0.0 417 418 total = min(run_total, query_total) if run_total and query_total else run_total or query_total 419 420 return aiohttp.ClientTimeout( 421 total=total, 422 connect=None, 423 sock_connect=query.request_timeout.sock_connect_secs, 424 sock_read=query.request_timeout.each_sock_read_secs, 425 ) 426 427 428async def _parse_status(response: aiohttp.ClientResponse) -> Status: 429 """Parses an /api/status response.""" 430 text = await response.text() 431 432 slots: int | None = 0 433 free_slots = None 434 cooldown_secs = 0 435 endpoint = None 436 nb_running_queries = 0 437 438 match_slots_overall = re.findall(r"Rate limit: (\d+)", text) 439 match_slots_available = re.findall(r"(\d+) slots available now", text) 440 match_cooldowns = re.findall(r"Slot available after: .+, in (\d+) seconds", text) 441 match_endpoint = re.findall(r"Announced endpoint: (.+)", text) 442 match_running_queries = re.findall(r"\d+\t\d+\t\d+\t\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z", text) 443 444 try: 445 (slots_str,) = match_slots_overall 446 slots = int(slots_str) or None 447 448 endpoint = match_endpoint[0] if match_endpoint else None 449 endpoint = None if endpoint == "none" else endpoint 450 451 nb_running_queries = len(match_running_queries) 452 453 if slots: 454 cooldowns = [int(secs) for secs in match_cooldowns] 455 456 if match_slots_available: 457 free_slots_str = match_slots_available[0] 458 free_slots = int(free_slots_str) 459 else: 460 free_slots = slots - len(cooldowns) 461 462 cooldown_secs = 0 if free_slots > 0 else min(cooldowns) 463 except ValueError as err: 464 await _raise_for_response(response, cause=err) 465 466 return Status( 467 slots=slots, 468 free_slots=free_slots, 469 cooldown_secs=cooldown_secs, 470 endpoint=endpoint, 471 nb_running_queries=nb_running_queries, 472 ) 473 474 475@asynccontextmanager 476async def _map_request_error( 477 timeout: ClientTimeout | None = None, 478) -> AsyncIterator[None]: 479 """Context to make requests in; maps errors to our exception types.""" 480 try: 481 yield 482 except aiohttp.ClientError as err: 483 await _raise_for_request_error(err) 484 except TimeoutError as err: 485 assert timeout is not None 486 assert timeout.total 487 raise CallTimeoutError(cause=err, after_secs=timeout.total) from err 488 489 490def _next_timeout_secs(query: Query) -> int: 491 """ 492 Decide the ``[timeout:*]`` setting for the given query's next try. 493 494 - pick ``Query.timeout_secs`` 495 - cap it by ``Query.run_timeout_secs``, and the time that has elapsed so far 496 - if a previous try was cancelled with less or equal than the chosen time, 497 raise without trying again 498 499 Raises: 500 GiveupError: when the run timeout elapsed, or when a previous try timed out with 501 a lower timeout setting 502 """ 503 timeout_secs = query.timeout_secs 504 505 if (at_most := query._run_duration_left_secs) and at_most < timeout_secs: 506 query._logger.info(f"{query} has {at_most}s left to run") 507 at_most = math.floor(at_most) 508 query._logger.info(f"adjust {query} [timeout:{timeout_secs}] to [timeout:{at_most}]") 509 timeout_secs = at_most 510 else: 511 query._logger.info(f"{query} will use [timeout:{timeout_secs}]") 512 513 not_enough = query._max_timed_out_after_secs or 0 514 515 if timeout_secs <= not_enough: 516 query._logger.error(f"{query} previously timed out with [timeout:{not_enough}] - give up") 517 raise GiveupError( 518 cause=GiveupCause.EXPECTING_QUERY_TIMEOUT, 519 kwargs=query.kwargs, 520 after_secs=query.run_duration_secs or 0.0, 521 ) 522 523 return timeout_secs
87class Client: 88 """ 89 A client for the Overpass API. 90 91 Requests are rate-limited according to the configured number of slots per IP for the specified 92 API server. By default, queries are retried whenever the server is too busy, or the rate limit 93 was exceeded. Custom query runners can be used to implement your own retry strategy. 94 95 Args: 96 url: The url of an Overpass API instance. Defaults to the main Overpass API instance. 97 user_agent: A string used for the User-Agent header. It is good practice to provide a string 98 that identifies your application, and includes a way to contact you (f.e. an 99 e-mail, or a link to a repository). This is important if you make too many 100 requests, or queries that require a lot of resources. 101 concurrency: The maximum number of simultaneous connections. In practice the amount 102 of concurrent queries may be limited by the number of slots it provides for 103 each IP. 104 status_timeout_secs: If set, status requests to the Overpass API will time out after 105 this duration in seconds. Defaults to no timeout. 106 runner: You can provide another query runner if you want to implement your own retry 107 strategy. 108 109 References: 110 - https://wiki.openstreetmap.org/wiki/Overpass_API#Public_Overpass_API_instances 111 """ 112 113 __slots__ = ( 114 "_concurrency", 115 "_maybe_session", 116 "_runner", 117 "_status_timeout_secs", 118 "_url", 119 "_user_agent", 120 ) 121 122 def __init__( 123 self, 124 url: str = DEFAULT_INSTANCE, 125 user_agent: str = DEFAULT_USER_AGENT, 126 concurrency: int = 32, 127 status_timeout_secs: float | None = None, 128 runner: QueryRunner | None = None, 129 ) -> None: 130 if concurrency <= 0: 131 msg = "'concurrency' must be > 0" 132 raise ValueError(msg) 133 if status_timeout_secs is not None and ( 134 not math.isfinite(status_timeout_secs) or status_timeout_secs <= 0.0 135 ): 136 msg = "'status_timeout_secs' must be finite > 0" 137 raise ValueError(msg) 138 139 self._url: Final[str] = url 140 self._user_agent: Final[str] = user_agent 141 self._concurrency: Final[int] = concurrency 142 self._status_timeout_secs: Final[float | None] = status_timeout_secs 143 self._runner: Final[QueryRunner] = runner or DefaultQueryRunner() 144 145 self._maybe_session: aiohttp.ClientSession | None = None 146 147 def _session(self) -> aiohttp.ClientSession: 148 """The session used for all requests of this client.""" 149 if not self._maybe_session or self._maybe_session.closed: 150 headers = {"User-Agent": self._user_agent} 151 connector = aiohttp.TCPConnector(limit=self._concurrency) 152 self._maybe_session = aiohttp.ClientSession(headers=headers, connector=connector) 153 154 return self._maybe_session 155 156 async def close(self) -> None: 157 """Cancel all running queries and close the underlying session.""" 158 if self._maybe_session and not self._maybe_session.closed: 159 # do not care if this fails 160 with suppress(CallError): 161 _ = await self.cancel_queries() 162 163 # is raised when there are still active queries. that's ok 164 with suppress(aiohttp.ServerDisconnectedError): 165 await self._maybe_session.close() 166 167 async def _status(self, timeout: ClientTimeout | None = None) -> "Status": 168 endpoint = urljoin(self._url, "status") 169 timeout = timeout or aiohttp.ClientTimeout(total=self._status_timeout_secs) 170 async with ( 171 _map_request_error(timeout), 172 self._session().get(url=endpoint, timeout=timeout) as response, 173 ): 174 return await _parse_status(response) 175 176 async def status(self) -> Status: 177 """ 178 Check the current API status. 179 180 The timeout of this request is configured with the ``status_timeout_secs`` argument. 181 182 Raises: 183 ClientError: if the status could not be looked up 184 """ 185 return await self._status() 186 187 async def cancel_queries(self, timeout_secs: float | None = None) -> int: 188 """ 189 Cancel all running queries. 190 191 This can be used to terminate runaway queries that prevent you from sending new ones. 192 193 Returns: 194 the number of terminated queries 195 196 Raises: 197 ClientError: if the request to cancel queries failed 198 """ 199 if timeout_secs is not None and (not math.isfinite(timeout_secs) or timeout_secs <= 0.0): 200 msg = "'timeout_secs' must be finite > 0" 201 raise ValueError(msg) 202 203 timeout = aiohttp.ClientTimeout(total=timeout_secs) if timeout_secs else None 204 headers = {"User-Agent": self._user_agent} 205 endpoint = urljoin(self._url, "kill_my_queries") 206 207 # use a new session here to get around our concurrency limit 208 async with ( 209 aiohttp.ClientSession(headers=headers) as session, 210 _map_request_error(timeout), 211 session.get(endpoint, timeout=timeout) as response, 212 ): 213 body = await response.text() 214 killed_pids = re.findall("\\(pid (\\d+)\\)", body) 215 return len(set(killed_pids)) 216 217 async def run_query(self, query: Query, *, raise_on_failure: bool = True) -> None: 218 """ 219 Send a query to the API, and await its completion. 220 221 "Running" the query entails acquiring a connection from the pool, the query requests 222 themselves (which may be retried), status requests when the server is busy, 223 and cooldown periods. 224 225 The query runner is invoked before every try, and once after the last try. 226 227 To run multiple queries concurrently, wrap the returned coroutines in an ``asyncio`` task, 228 f.e. with ``asyncio.create_task()`` and subsequent ``asyncio.gather()``. 229 230 Args: 231 query: the query to run on this API instance 232 raise_on_failure: if ``True``, raises ``query.error`` if the query failed 233 234 Raises: 235 ClientError: when query or status requests fail. If the query was retried, the error 236 of the last try will be raised. The same exception is also captured in 237 ``query.error``. Raising can be prevented by setting ``raise_on_failure`` 238 to ``False``. 239 RunnerError: when a call to the query runner raises. This exception is raised 240 even if ``raise_on_failure` is ``False``, since it is likely an error 241 that is not just specific to this query. 242 AlreadyRunningError: when another ``run_query()`` call on this query has not finished 243 yet. This is not affected by ``raise_on_failure``. 244 """ 245 if query.done: 246 return # nothing to do 247 248 if not query._run_lock.acquire(blocking=False): 249 raise AlreadyRunningError(kwargs=query.kwargs) 250 251 try: 252 if query.nb_tries > 0: 253 query.reset() # reset failed queries 254 255 # query runner is invoked before every try, and once after the last try 256 while True: 257 await self._invoke_runner(query, raise_on_failure=raise_on_failure) 258 if query.done: 259 return 260 await self._try_query_once(query) 261 finally: 262 query._run_lock.release() 263 264 async def _invoke_runner(self, query: Query, *, raise_on_failure: bool) -> None: 265 """ 266 Invoke the query runner. 267 268 Raises: 269 ClientError: if the runner raises ``query.error`` 270 ValueError: if the runner raises a different ``ClientError`` than ``query.error`` 271 RunnerError: if the runner raises any other exception (which it shouldn't) 272 """ 273 try: 274 await self._runner(query) 275 except ClientError as err: 276 if err is not query.error: 277 msg = "query runner raised a ClientError other than 'query.error'" 278 raise ValueError(msg) from err 279 if raise_on_failure: 280 raise 281 except AssertionError: 282 raise 283 except BaseException as err: 284 raise RunnerError(cause=err) from err 285 286 async def _try_query_once(self, query: Query) -> None: 287 """A single iteration of running a query.""" 288 query._begin_try() 289 290 try: 291 await self._cooldown(query) 292 293 req_timeout = _next_query_req_timeout(query) 294 295 # pick the timeout we will use for the next try 296 # TODO: not sure if this should also update the timeout setting in the Query state; 297 # for now, pass it as parameter to the _code() function 298 next_timeout_secs = _next_timeout_secs(query) 299 300 data = query._code(next_timeout_secs) 301 302 query._begin_request() 303 304 query.logger.info(f"call api for {query}") 305 306 async with ( 307 _map_request_error(req_timeout), 308 self._session().post( 309 url=urljoin(self._url, "interpreter"), 310 data=data, 311 timeout=req_timeout, 312 ) as response, 313 ): 314 query._succeed_try( 315 response=await _result_or_raise(response, query.kwargs, query.logger), 316 response_bytes=response.content.total_bytes, 317 ) 318 319 except CallTimeoutError as err: 320 fail_with: ClientError = err 321 if query.run_timeout_elapsed: 322 assert query.run_duration_secs is not None 323 fail_with = GiveupError( 324 cause=GiveupCause.RUN_TIMEOUT_DURING_QUERY_CALL, 325 kwargs=query.kwargs, 326 after_secs=query.run_duration_secs, 327 ) 328 query._fail_try(fail_with) 329 330 except ClientError as err: 331 query._fail_try(err) 332 333 finally: 334 query._end_try() 335 336 async def _cooldown(self, query: Query) -> None: 337 """ 338 If the given query failed with ``TOO_MANY_QUERIES``, check for a cooldown period. 339 340 Raises: 341 ClientError: if the status request to find out the cooldown period fails 342 GiveupError: if the cooldown is longer than the remaining run duration 343 """ 344 logger = query.logger 345 346 if not is_too_many_queries(query.error): 347 return 348 349 # If this client is running too many queries, we can check the status for a 350 # cooldown period. This request failing is a bit of an edge case. 351 # 'query.error' will be overwritten, which means we will not check for a 352 # cooldown in the next iteration. 353 status = await self._status(timeout=self._next_status_req_timeout(query)) 354 355 if not status.cooldown_secs: 356 return 357 358 run_duration = query.run_duration_secs 359 assert run_duration is not None 360 361 if run_timeout := query.run_timeout_secs: 362 remaining = run_timeout - run_duration 363 364 if status.cooldown_secs > remaining: 365 logger.error(f"give up on {query} due to {status.cooldown_secs:.1f}s cooldown") 366 raise GiveupError( 367 cause=GiveupCause.RUN_TIMEOUT_BY_COOLDOWN, 368 kwargs=query.kwargs, 369 after_secs=run_duration, 370 ) 371 372 logger.info(f"{query} has cooldown for {status.cooldown_secs:.1f}s") 373 await sleep(status.cooldown_secs) 374 375 def _next_status_req_timeout(self, query: Query) -> aiohttp.ClientTimeout: 376 """Status request timeout; possibly limited by either the run or status timeout settings.""" 377 remaining = None 378 379 run_duration = query.run_duration_secs 380 assert run_duration is not None 381 382 if run_timeout := query.run_timeout_secs: 383 remaining = run_timeout - run_duration 384 385 if remaining <= 0.0: 386 raise GiveupError( 387 cause=GiveupCause.RUN_TIMEOUT_BEFORE_STATUS_CALL, 388 kwargs=query.kwargs, 389 after_secs=run_duration, 390 ) 391 392 if self._status_timeout_secs: 393 remaining = min(remaining, self._status_timeout_secs) # cap timeout if configured 394 395 return aiohttp.ClientTimeout(total=remaining)
A client for the Overpass API.
Requests are rate-limited according to the configured number of slots per IP for the specified API server. By default, queries are retried whenever the server is too busy, or the rate limit was exceeded. Custom query runners can be used to implement your own retry strategy.
Arguments:
- url: The url of an Overpass API instance. Defaults to the main Overpass API instance.
- user_agent: A string used for the User-Agent header. It is good practice to provide a string that identifies your application, and includes a way to contact you (f.e. an e-mail, or a link to a repository). This is important if you make too many requests, or queries that require a lot of resources.
- concurrency: The maximum number of simultaneous connections. In practice the amount of concurrent queries may be limited by the number of slots it provides for each IP.
- status_timeout_secs: If set, status requests to the Overpass API will time out after this duration in seconds. Defaults to no timeout.
- runner: You can provide another query runner if you want to implement your own retry strategy.
References:
122 def __init__( 123 self, 124 url: str = DEFAULT_INSTANCE, 125 user_agent: str = DEFAULT_USER_AGENT, 126 concurrency: int = 32, 127 status_timeout_secs: float | None = None, 128 runner: QueryRunner | None = None, 129 ) -> None: 130 if concurrency <= 0: 131 msg = "'concurrency' must be > 0" 132 raise ValueError(msg) 133 if status_timeout_secs is not None and ( 134 not math.isfinite(status_timeout_secs) or status_timeout_secs <= 0.0 135 ): 136 msg = "'status_timeout_secs' must be finite > 0" 137 raise ValueError(msg) 138 139 self._url: Final[str] = url 140 self._user_agent: Final[str] = user_agent 141 self._concurrency: Final[int] = concurrency 142 self._status_timeout_secs: Final[float | None] = status_timeout_secs 143 self._runner: Final[QueryRunner] = runner or DefaultQueryRunner() 144 145 self._maybe_session: aiohttp.ClientSession | None = None
156 async def close(self) -> None: 157 """Cancel all running queries and close the underlying session.""" 158 if self._maybe_session and not self._maybe_session.closed: 159 # do not care if this fails 160 with suppress(CallError): 161 _ = await self.cancel_queries() 162 163 # is raised when there are still active queries. that's ok 164 with suppress(aiohttp.ServerDisconnectedError): 165 await self._maybe_session.close()
Cancel all running queries and close the underlying session.
176 async def status(self) -> Status: 177 """ 178 Check the current API status. 179 180 The timeout of this request is configured with the ``status_timeout_secs`` argument. 181 182 Raises: 183 ClientError: if the status could not be looked up 184 """ 185 return await self._status()
Check the current API status.
The timeout of this request is configured with the status_timeout_secs
argument.
Raises:
- ClientError: if the status could not be looked up
187 async def cancel_queries(self, timeout_secs: float | None = None) -> int: 188 """ 189 Cancel all running queries. 190 191 This can be used to terminate runaway queries that prevent you from sending new ones. 192 193 Returns: 194 the number of terminated queries 195 196 Raises: 197 ClientError: if the request to cancel queries failed 198 """ 199 if timeout_secs is not None and (not math.isfinite(timeout_secs) or timeout_secs <= 0.0): 200 msg = "'timeout_secs' must be finite > 0" 201 raise ValueError(msg) 202 203 timeout = aiohttp.ClientTimeout(total=timeout_secs) if timeout_secs else None 204 headers = {"User-Agent": self._user_agent} 205 endpoint = urljoin(self._url, "kill_my_queries") 206 207 # use a new session here to get around our concurrency limit 208 async with ( 209 aiohttp.ClientSession(headers=headers) as session, 210 _map_request_error(timeout), 211 session.get(endpoint, timeout=timeout) as response, 212 ): 213 body = await response.text() 214 killed_pids = re.findall("\\(pid (\\d+)\\)", body) 215 return len(set(killed_pids))
Cancel all running queries.
This can be used to terminate runaway queries that prevent you from sending new ones.
Returns:
the number of terminated queries
Raises:
- ClientError: if the request to cancel queries failed
217 async def run_query(self, query: Query, *, raise_on_failure: bool = True) -> None: 218 """ 219 Send a query to the API, and await its completion. 220 221 "Running" the query entails acquiring a connection from the pool, the query requests 222 themselves (which may be retried), status requests when the server is busy, 223 and cooldown periods. 224 225 The query runner is invoked before every try, and once after the last try. 226 227 To run multiple queries concurrently, wrap the returned coroutines in an ``asyncio`` task, 228 f.e. with ``asyncio.create_task()`` and subsequent ``asyncio.gather()``. 229 230 Args: 231 query: the query to run on this API instance 232 raise_on_failure: if ``True``, raises ``query.error`` if the query failed 233 234 Raises: 235 ClientError: when query or status requests fail. If the query was retried, the error 236 of the last try will be raised. The same exception is also captured in 237 ``query.error``. Raising can be prevented by setting ``raise_on_failure`` 238 to ``False``. 239 RunnerError: when a call to the query runner raises. This exception is raised 240 even if ``raise_on_failure` is ``False``, since it is likely an error 241 that is not just specific to this query. 242 AlreadyRunningError: when another ``run_query()`` call on this query has not finished 243 yet. This is not affected by ``raise_on_failure``. 244 """ 245 if query.done: 246 return # nothing to do 247 248 if not query._run_lock.acquire(blocking=False): 249 raise AlreadyRunningError(kwargs=query.kwargs) 250 251 try: 252 if query.nb_tries > 0: 253 query.reset() # reset failed queries 254 255 # query runner is invoked before every try, and once after the last try 256 while True: 257 await self._invoke_runner(query, raise_on_failure=raise_on_failure) 258 if query.done: 259 return 260 await self._try_query_once(query) 261 finally: 262 query._run_lock.release()
Send a query to the API, and await its completion.
"Running" the query entails acquiring a connection from the pool, the query requests themselves (which may be retried), status requests when the server is busy, and cooldown periods.
The query runner is invoked before every try, and once after the last try.
To run multiple queries concurrently, wrap the returned coroutines in an asyncio
task,
f.e. with asyncio.create_task()
and subsequent asyncio.gather()
.
Arguments:
- query: the query to run on this API instance
- raise_on_failure: if
True
, raisesquery.error
if the query failed
Raises:
- ClientError: when query or status requests fail. If the query was retried, the error
of the last try will be raised. The same exception is also captured in
query.error
. Raising can be prevented by settingraise_on_failure
toFalse
. - RunnerError: when a call to the query runner raises. This exception is raised
even if
raise_on_failure` is
False``, since it is likely an error that is not just specific to this query. - AlreadyRunningError: when another
run_query()
call on this query has not finished yet. This is not affected byraise_on_failure
.
51@dataclass(kw_only=True, slots=True) 52class Status: 53 """ 54 Information about the API server's rate limit. 55 56 Attributes: 57 slots: The maximum number of concurrent queries per IP 58 (or ``None`` if there is no rate limit). 59 free_slots: The number of slots open for this IP 60 (or ``None`` if there is no rate limit). 61 cooldown_secs: The number of seconds until a slot opens for this IP 62 (or 0 if there is a free slot). 63 endpoint: Announced endpoint. For example, there are two distinct servers 64 that both can be reached by the main Overpass API instance. 65 Depending on server load, a query may be sent to either of them. 66 This value is the server name, f.e. ``"gall.openstreetmap.de/"``. 67 nb_running_queries: Number of currently running queries for this IP. 68 """ 69 70 slots: int | None 71 free_slots: int | None 72 cooldown_secs: int 73 endpoint: str | None 74 nb_running_queries: int 75 76 def __repr__(self) -> str: 77 rep = f"{type(self).__name__}(free_slots=" 78 rep += f"{self.free_slots}/{self.slots}" if self.slots else "∞" 79 rep += f", cooldown={self.cooldown_secs}s" 80 rep += f", running={self.nb_running_queries}" 81 if self.endpoint: 82 rep += f", endpoint={self.endpoint!r}" 83 rep += ")" 84 return rep
Information about the API server's rate limit.
Attributes:
- slots: The maximum number of concurrent queries per IP
(or
None
if there is no rate limit). - free_slots: The number of slots open for this IP
(or
None
if there is no rate limit). - cooldown_secs: The number of seconds until a slot opens for this IP (or 0 if there is a free slot).
- endpoint: Announced endpoint. For example, there are two distinct servers
that both can be reached by the main Overpass API instance.
Depending on server load, a query may be sent to either of them.
This value is the server name, f.e.
"gall.openstreetmap.de/"
. - nb_running_queries: Number of currently running queries for this IP.
Main Overpass API instance.
User agent that points to the aio-overpass
repo.