Source code for aiogram.webhook.aiohttp_server

import asyncio
import secrets
from abc import ABC, abstractmethod
from asyncio import Transport
from collections.abc import Awaitable, Callable
from typing import TYPE_CHECKING, Any, cast

from aiohttp import JsonPayload, MultipartWriter, Payload, web
from aiohttp.typedefs import Handler
from aiohttp.web_app import Application
from aiohttp.web_middlewares import middleware

from aiogram import Bot, Dispatcher, loggers
from aiogram.methods import TelegramMethod
from aiogram.methods.base import TelegramType
from aiogram.webhook.security import IPFilter

if TYPE_CHECKING:
    from aiogram.types import InputFile


def setup_application(app: Application, dispatcher: Dispatcher, /, **kwargs: Any) -> None:
    """
    This function helps to configure a startup-shutdown process

    :param app: aiohttp application
    :param dispatcher: aiogram dispatcher
    :param kwargs: additional data
    :return:
    """
    workflow_data = {
        "app": app,
        "dispatcher": dispatcher,
        **dispatcher.workflow_data,
        **kwargs,
    }

    async def on_startup(*a: Any, **kw: Any) -> None:  # pragma: no cover
        await dispatcher.emit_startup(**workflow_data)

    async def on_shutdown(*a: Any, **kw: Any) -> None:  # pragma: no cover
        await dispatcher.emit_shutdown(**workflow_data)

    app.on_startup.append(on_startup)
    app.on_shutdown.append(on_shutdown)


def check_ip(ip_filter: IPFilter, request: web.Request) -> tuple[str, bool]:
    # Try to resolve client IP over reverse proxy
    if forwarded_for := request.headers.get("X-Forwarded-For", ""):
        # Get the left-most ip when there is multiple ips
        # (request got through multiple proxy/load balancers)
        # https://github.com/aiogram/aiogram/issues/672
        forwarded_for, *_ = forwarded_for.split(",", maxsplit=1)
        return forwarded_for, forwarded_for in ip_filter

    # When reverse proxy is not configured IP address can be resolved from incoming connection
    if peer_name := cast(Transport, request.transport).get_extra_info("peername"):
        host, _ = peer_name
        return host, host in ip_filter

    # Potentially impossible case
    return "", False  # pragma: no cover


[docs] def ip_filter_middleware( ip_filter: IPFilter, ) -> Callable[[web.Request, Handler], Awaitable[Any]]: """ :param ip_filter: :return: """ @middleware async def _ip_filter_middleware(request: web.Request, handler: Handler) -> Any: ip_address, accept = check_ip(ip_filter=ip_filter, request=request) if not accept: loggers.webhook.warning("Blocking request from an unauthorized IP: %s", ip_address) raise web.HTTPUnauthorized() return await handler(request) return _ip_filter_middleware
[docs] class BaseRequestHandler(ABC):
[docs] def __init__( self, dispatcher: Dispatcher, handle_in_background: bool = False, **data: Any, ) -> None: """ Base handler that helps to handle incoming request from aiohttp and propagate it to the Dispatcher :param dispatcher: instance of :class:`aiogram.dispatcher.dispatcher.Dispatcher` :param handle_in_background: immediately responds to the Telegram instead of a waiting end of a handler process """ self.dispatcher = dispatcher self.handle_in_background = handle_in_background self.data = data self._background_feed_update_tasks: set[asyncio.Task[Any]] = set()
[docs] def register(self, app: Application, /, path: str, **kwargs: Any) -> None: """ Register route and shutdown callback :param app: instance of aiohttp Application :param path: route path :param kwargs: """ app.on_shutdown.append(self._handle_close) app.router.add_route("POST", path, self.handle, **kwargs)
async def _handle_close(self, *a: Any, **kw: Any) -> None: await self.close() @abstractmethod async def close(self) -> None: pass
[docs] @abstractmethod async def resolve_bot(self, request: web.Request) -> Bot: """ This method should be implemented in subclasses of this class. Resolve Bot instance from request. :param request: :return: Bot instance """
@abstractmethod def verify_secret(self, telegram_secret_token: str, bot: Bot) -> bool: pass async def _background_feed_update(self, bot: Bot, update: dict[str, Any]) -> None: result = await self.dispatcher.feed_raw_update(bot=bot, update=update, **self.data) if isinstance(result, TelegramMethod): await self.dispatcher.silent_call_request(bot=bot, result=result) async def _handle_request_background(self, bot: Bot, request: web.Request) -> web.Response: feed_update_task = asyncio.create_task( self._background_feed_update( bot=bot, update=await request.json(loads=bot.session.json_loads), ), ) self._background_feed_update_tasks.add(feed_update_task) feed_update_task.add_done_callback(self._background_feed_update_tasks.discard) return web.json_response({}, dumps=bot.session.json_dumps) def _build_response_writer( self, bot: Bot, result: TelegramMethod[TelegramType] | None, ) -> Payload: if not result: # we need to return something "empty" # and "empty" form doesn't work # since it's sending only "end" boundary w/o "start" return JsonPayload({}) writer = MultipartWriter( "form-data", boundary=f"webhookBoundary{secrets.token_urlsafe(16)}", ) payload = writer.append(result.__api_method__) payload.set_content_disposition("form-data", name="method") files: dict[str, InputFile] = {} for key, value in result.model_dump(warnings=False).items(): value = bot.session.prepare_value(value, bot=bot, files=files) if not value: continue payload = writer.append(value) payload.set_content_disposition("form-data", name=key) for key, value in files.items(): payload = writer.append(value.read(bot)) payload.set_content_disposition( "form-data", name=key, filename=value.filename or key, ) return writer async def _handle_request(self, bot: Bot, request: web.Request) -> web.Response: result: TelegramMethod[Any] | None = await self.dispatcher.feed_webhook_update( bot, await request.json(loads=bot.session.json_loads), **self.data, ) return web.Response(body=self._build_response_writer(bot=bot, result=result)) async def handle(self, request: web.Request) -> web.Response: bot = await self.resolve_bot(request) if not self.verify_secret(request.headers.get("X-Telegram-Bot-Api-Secret-Token", ""), bot): return web.Response(body="Unauthorized", status=401) if self.handle_in_background: return await self._handle_request_background(bot=bot, request=request) return await self._handle_request(bot=bot, request=request) __call__ = handle
[docs] class SimpleRequestHandler(BaseRequestHandler):
[docs] def __init__( self, dispatcher: Dispatcher, bot: Bot, handle_in_background: bool = True, secret_token: str | None = None, **data: Any, ) -> None: """ Handler for single Bot instance :param dispatcher: instance of :class:`aiogram.dispatcher.dispatcher.Dispatcher` :param handle_in_background: immediately responds to the Telegram instead of a waiting end of handler process :param bot: instance of :class:`aiogram.client.bot.Bot` """ super().__init__(dispatcher=dispatcher, handle_in_background=handle_in_background, **data) self.bot = bot self.secret_token = secret_token
def verify_secret(self, telegram_secret_token: str, bot: Bot) -> bool: if self.secret_token: return secrets.compare_digest(telegram_secret_token, self.secret_token) return True
[docs] async def close(self) -> None: """ Close bot session """ await self.bot.session.close()
[docs] async def resolve_bot(self, request: web.Request) -> Bot: return self.bot
[docs] class TokenBasedRequestHandler(BaseRequestHandler):
[docs] def __init__( self, dispatcher: Dispatcher, handle_in_background: bool = True, bot_settings: dict[str, Any] | None = None, **data: Any, ) -> None: """ Handler that supports multiple bots the context will be resolved from path variable 'bot_token' .. note:: This handler is not recommended in due to token is available in URL and can be logged by reverse proxy server or other middleware. :param dispatcher: instance of :class:`aiogram.dispatcher.dispatcher.Dispatcher` :param handle_in_background: immediately responds to the Telegram instead of a waiting end of handler process :param bot_settings: kwargs that will be passed to new Bot instance """ super().__init__(dispatcher=dispatcher, handle_in_background=handle_in_background, **data) if bot_settings is None: bot_settings = {} self.bot_settings = bot_settings self.bots: dict[str, Bot] = {}
def verify_secret(self, telegram_secret_token: str, bot: Bot) -> bool: return True async def close(self) -> None: for bot in self.bots.values(): await bot.session.close()
[docs] def register(self, app: Application, /, path: str, **kwargs: Any) -> None: """ Validate path, register route and shutdown callback :param app: instance of aiohttp Application :param path: route path :param kwargs: """ if "{bot_token}" not in path: msg = "Path should contains '{bot_token}' substring" raise ValueError(msg) super().register(app, path=path, **kwargs)
[docs] async def resolve_bot(self, request: web.Request) -> Bot: """ Get bot token from a path and create or get from cache Bot instance :param request: :return: """ token = request.match_info["bot_token"] if token not in self.bots: self.bots[token] = Bot(token=token, **self.bot_settings) return self.bots[token]