Исходный код tbank.delivery.aio

from __future__ import annotations

from typing import Any, Dict, Optional

import httpx

from tbank.core.auth import BearerAuth
from tbank.core.client import BaseAsyncClient
from tbank.core.client import ensure_idempotency_key as _idem
from tbank.core.errors import TBankAPIError
from tbank.core.retry import RetryPolicy
from tbank.core.transport import AsyncTransport
from tbank.core.urls import PROD_URL, SANDBOX_URL
from tbank.delivery._endpoints import DOCUMENTS as _DOCUMENTS
from tbank.delivery._endpoints import MEETINGS as _MEETINGS
from tbank.delivery._endpoints import TASKS as _TASKS
from tbank.delivery.errors import error_from_delivery_response
from tbank.delivery.models import (
    CancelTaskRequest,
    CreateMeetingRequest,
    CreateMeetingResult,
    CreateTaskRequest,
    CreateTaskResult,
    DeliveryTask,
    GetIntervalsRequest,
    GetIntervalsResult,
    UpdateTaskRequest,
    UploadDocumentResult,
)


[документация] class DeliveryClient(BaseAsyncClient): """Асинхронный клиент партнёрской доставки: задания, встречи и интервалы, документы (загрузка и скачивание). Работает на обычном хосте по **Bearer**-токену. Провод — `camelCase`. """ def __init__( self, token: str, *, base_url: Optional[str] = None, sandbox: bool = False, retry: Optional[RetryPolicy] = None, transport: Optional[AsyncTransport] = None, ) -> None: resolved = base_url or (SANDBOX_URL if sandbox else PROD_URL) transport = transport or AsyncTransport( base_url=resolved, auth=BearerAuth(token), retry=retry ) super().__init__(transport) def _error_from_response(self, response: httpx.Response) -> TBankAPIError: return error_from_delivery_response(response) # --- Задания ---
[документация] async def create_task( self, request: CreateTaskRequest, *, idempotency_key: Optional[str] = None ) -> CreateTaskResult: """Создать задание на доставку/встречу.""" return await self._send( "POST", _TASKS, CreateTaskResult, body=request, idempotency_key=_idem(idempotency_key), )
[документация] async def get_task(self, task_id: str) -> DeliveryTask: """Карточка задания.""" return await self._get(f"{_TASKS}/{task_id}", DeliveryTask)
[документация] async def update_task(self, task_id: str, request: UpdateTaskRequest) -> None: """Обновить задание.""" await self._send("PUT", f"{_TASKS}/{task_id}", body=request)
[документация] async def cancel_task(self, task_id: str, request: CancelTaskRequest) -> None: """Отменить задание.""" await self._send("POST", f"{_TASKS}/{task_id}/cancel", body=request)
# --- Встречи ---
[документация] async def get_intervals(self, request: GetIntervalsRequest) -> GetIntervalsResult: """Доступные интервалы встречи по адресу.""" return await self._send( "POST", f"{_MEETINGS}/intervals", GetIntervalsResult, body=request )
[документация] async def create_meeting( self, request: CreateMeetingRequest ) -> CreateMeetingResult: """Назначить встречу на выбранный интервал.""" return await self._send("POST", _MEETINGS, CreateMeetingResult, body=request)
# --- Документы ---
[документация] async def upload_document( self, task_id: str, document_type: str, content: bytes, *, prev_doc_id: Optional[str] = None, filename: str = "document", ) -> UploadDocumentResult: """Загрузить документ и связать с заданием (multipart).""" data: Dict[str, Any] = {"taskId": task_id, "type": document_type} if prev_doc_id is not None: data["prevDocId"] = prev_doc_id response = await self._transport.request( "POST", _DOCUMENTS, data=data, files={"content": (filename, content)} ) self._raise_for_http(response) return UploadDocumentResult.model_validate(self._parse_body(response))
[документация] async def download_document(self, document_id: str) -> bytes: """Скачать документ задания (бинарное содержимое).""" response = await self._transport.request("GET", f"{_DOCUMENTS}/{document_id}") self._raise_for_http(response) return response.content