Исходный код tbank.core.transport

from __future__ import annotations

import asyncio
import ssl
import time
from email.utils import parsedate_to_datetime
from typing import Any, Dict, Optional, Tuple, Union

import httpx

from tbank.core.auth import AuthStrategy, BearerAuth, Body, Headers, NoAuth
from tbank.core.errors import TBankNetworkError, TBankTimeoutError
from tbank.core.retry import (
    IDEMPOTENT_METHODS,
    RetryPolicy,
    compute_delay,
    should_retry,
)

DEFAULT_TIMEOUT = httpx.Timeout(30.0, connect=5.0)

# mTLS: путь к PEM либо (cert, key) либо (cert, key, password).
CertTypes = Union[str, Tuple[str, str], Tuple[str, str, str]]
VerifyTypes = Union[bool, str, ssl.SSLContext]


def _drop_json_content_type(
    headers: "Headers",
    data: Optional[Dict[str, Any]],
    files: Optional[Any],
) -> "Headers":
    """Для multipart/form-запросов убирает дефолтный JSON-Content-Type, чтобы httpx
    выставил свой (с boundary). Для `content` (сырое тело) Content-Type задаёт сам
    вызывающий, поэтому его не трогаем."""
    if data is None and files is None:
        return headers
    return {
        k: v
        for k, v in headers.items()
        if not (
            k.lower() == "content-type" and v.lower().startswith("application/json")
        )
    }


def _is_idempotent(method: str, headers: "Headers") -> bool:
    """Безопасен ли повтор запроса: идемпотентный метод или Idempotency-Key."""
    if method.upper() in IDEMPOTENT_METHODS:
        return True
    return any(k.lower() == "idempotency-key" for k in headers)


class _TransportBase:
    def __init__(
        self,
        *,
        base_url: str,
        auth: Optional[AuthStrategy] = None,
        retry: Optional[RetryPolicy] = None,
        timeout: Optional[httpx.Timeout] = None,
        user_agent: str = "tbank",
        cert: Optional[CertTypes] = None,
        verify: VerifyTypes = True,
    ) -> None:
        self._base_url = base_url.rstrip("/")
        self._auth: AuthStrategy = auth or NoAuth()
        self._retry = retry or RetryPolicy()
        self._timeout = timeout or DEFAULT_TIMEOUT
        self._cert = cert
        self._verify = verify
        self._base_headers: Headers = {
            "Content-Type": "application/json",
            "User-Agent": user_agent,
        }

    def _prepare(
        self, json: Body, headers: Optional[Dict[str, str]]
    ) -> Tuple[Body, Headers]:
        merged: Headers = {**self._base_headers, **(headers or {})}
        return self._auth.apply(json, merged)

    def _url(self, path: str) -> str:
        return f"{self._base_url}{path}"

    @staticmethod
    def _retry_after(response: httpx.Response) -> Optional[float]:
        """Retry-After: секунды либо HTTP-дата."""
        value = response.headers.get("Retry-After")
        if value is None:
            return None
        try:
            return float(value)
        except ValueError:
            pass
        try:
            dt = parsedate_to_datetime(value)
        except (TypeError, ValueError):
            return None
        delay = dt.timestamp() - time.time()
        return max(delay, 0.0)


[документация] class AsyncTransport(_TransportBase): def __init__( self, *, client: Optional[httpx.AsyncClient] = None, **kwargs: Any ) -> None: super().__init__(**kwargs) self._client = client or httpx.AsyncClient( timeout=self._timeout, cert=self._cert, verify=self._verify ) async def request( self, method: str, path: str, *, json: Body = None, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, data: Optional[Dict[str, Any]] = None, files: Optional[Any] = None, content: Optional[Any] = None, ) -> httpx.Response: url = self._url(path) body, merged = self._prepare(json, headers) merged = _drop_json_content_type(merged, data, files) idempotent = _is_idempotent(method, merged) attempt = 0 while True: attempt += 1 try: response = await self._client.request( method, url, json=body, params=params, headers=merged, data=data, files=files, content=content, ) except httpx.TimeoutException as exc: if should_retry( self._retry, status=None, attempt=attempt, idempotent=idempotent ): await asyncio.sleep(compute_delay(self._retry, attempt=attempt)) continue raise TBankTimeoutError(str(exc)) from exc except httpx.HTTPError as exc: if should_retry( self._retry, status=None, attempt=attempt, idempotent=idempotent ): await asyncio.sleep(compute_delay(self._retry, attempt=attempt)) continue raise TBankNetworkError(str(exc)) from exc if response.status_code >= 400 and should_retry( self._retry, status=response.status_code, attempt=attempt, idempotent=idempotent, ): await asyncio.sleep( compute_delay( self._retry, attempt=attempt, retry_after=self._retry_after(response), ) ) continue return response async def aclose(self) -> None: await self._client.aclose() async def __aenter__(self) -> "AsyncTransport": return self async def __aexit__(self, *exc: Any) -> None: await self.aclose()
[документация] class SyncTransport(_TransportBase): def __init__(self, *, client: Optional[httpx.Client] = None, **kwargs: Any) -> None: super().__init__(**kwargs) self._client = client or httpx.Client( timeout=self._timeout, cert=self._cert, verify=self._verify ) def request( self, method: str, path: str, *, json: Body = None, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, data: Optional[Dict[str, Any]] = None, files: Optional[Any] = None, content: Optional[Any] = None, ) -> httpx.Response: url = self._url(path) body, merged = self._prepare(json, headers) merged = _drop_json_content_type(merged, data, files) idempotent = _is_idempotent(method, merged) attempt = 0 while True: attempt += 1 try: response = self._client.request( method, url, json=body, params=params, headers=merged, data=data, files=files, content=content, ) except httpx.TimeoutException as exc: if should_retry( self._retry, status=None, attempt=attempt, idempotent=idempotent ): time.sleep(compute_delay(self._retry, attempt=attempt)) continue raise TBankTimeoutError(str(exc)) from exc except httpx.HTTPError as exc: if should_retry( self._retry, status=None, attempt=attempt, idempotent=idempotent ): time.sleep(compute_delay(self._retry, attempt=attempt)) continue raise TBankNetworkError(str(exc)) from exc if response.status_code >= 400 and should_retry( self._retry, status=response.status_code, attempt=attempt, idempotent=idempotent, ): time.sleep( compute_delay( self._retry, attempt=attempt, retry_after=self._retry_after(response), ) ) continue return response def close(self) -> None: self._client.close() def __enter__(self) -> "SyncTransport": return self def __exit__(self, *exc: Any) -> None: self.close()
[документация] def build_sync_transports( token: str, *, base_url: str, secured_base_url: Optional[str] = None, cert: Optional[CertTypes] = None, verify: VerifyTypes = True, retry: Optional[RetryPolicy] = None, transport: Optional[SyncTransport] = None, secured_transport: Optional[SyncTransport] = None, ) -> Tuple[SyncTransport, Optional[SyncTransport]]: """Пара транспортов (обычный, secured) с Bearer-авторизацией. Secured-транспорт создаётся, только если переданы `cert` и `secured_base_url`. Уже созданные транспорты пробрасываются как есть. """ transport = transport or SyncTransport( base_url=base_url, auth=BearerAuth(token), retry=retry ) if secured_transport is None and cert is not None and secured_base_url is not None: secured_transport = SyncTransport( base_url=secured_base_url, auth=BearerAuth(token), retry=retry, cert=cert, verify=verify, ) return transport, secured_transport
[документация] def build_async_transports( token: str, *, base_url: str, secured_base_url: Optional[str] = None, cert: Optional[CertTypes] = None, verify: VerifyTypes = True, retry: Optional[RetryPolicy] = None, transport: Optional[AsyncTransport] = None, secured_transport: Optional[AsyncTransport] = None, ) -> Tuple[AsyncTransport, Optional[AsyncTransport]]: """Асинхронный аналог `build_sync_transports`.""" transport = transport or AsyncTransport( base_url=base_url, auth=BearerAuth(token), retry=retry ) if secured_transport is None and cert is not None and secured_base_url is not None: secured_transport = AsyncTransport( base_url=secured_base_url, auth=BearerAuth(token), retry=retry, cert=cert, verify=verify, ) return transport, secured_transport