Source code for uprate.bucket

from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING, Generic, Literal, TypeVar

from .errors import RateLimitError
from .ratelimit import RateLimit

if TYPE_CHECKING:
    from .store import BaseStore
    from .rate import Rate, RateGroup

    from types import TracebackType

__all__ = (
    "Bucket",
)

T = TypeVar("T")

class BucketCM(Generic[T]):
    key: T
    bucket: Bucket[T]

    def __init__(self, bucket: Bucket[T], key: T):
        self.bucket = bucket
        self.key = key

    async def __aenter__(self):
        if self.bucket._queue:
            await self.bucket._queue.get()

        await self.__wait()
        return None

    async def __aexit__(self,
                        exc_type: type[BaseException] | None,
                        exc: BaseException | None,
                        tb: TracebackType | None) -> Literal[False]:
        if self.bucket._queue:
            self.bucket._queue.put_nowait(None)

        return False

    async def __wait(self) -> None:
        while True:
            try:
                await self.bucket._limit.acquire(self.key)
            except RateLimitError as err:
                await asyncio.sleep(float(err))
            else:
                return None

[docs]class Bucket(Generic[T]): """A high level ratelimit construct to obey both ratelimits and concurrency. .. note:: Unlike other constructs in uprate, :class:`.Bucket` does not have a sync counterpart for threaded applications, a sync counterpart maybe added with enough interest. Parameters ---------- rate : :class:`~uprate.rate.Rate`, :class:`~uprate.rate.RateGroup`, (``Rate | RateGroup``) The rate(s) to enforce on keys in this bucket. store : :class:`~uprate.store.BaseStore`, (``BaseStore | None``) The store to use for the under lying :class:`~uprate.ratelimit.RateLimit`. By default, :data:`None`. concurrency : :class:`int` The number of concurrently executing acquires/context managers. If ``0`` or lower then only the ratelimit is enforced. By default, ``0`` """ _limit: RateLimit[T] _queue: asyncio.Queue | None def __init__(self, rate: Rate | RateGroup, store: BaseStore[T] = None, concurrency: int = 0) -> None: self._limit = RateLimit(rate, store) if concurrency > 0: self._queue = asyncio.Queue(maxsize=concurrency) for _ in range(concurrency): self._queue.put_nowait(None) else: self._queue = None
[docs] @classmethod def from_limit(cls, limit: RateLimit[T]) -> Bucket[T]: """Create a bucket from a :class:`~uprate.ratelimit.RateLimit` Parameters ---------- limit : :class:`~uprate.ratelimit.RateLimit` The ratelimit to create a :class:`.Bucket` from. The provided ratelimit is used as it is, hence mutating it wil also affect the bucket Returns ------- :class:`.Bucket` The created bucket """ self = object.__new__(cls) self.limit = limit return self
[docs] def acquire(self, key: T) -> BucketCM[T]: """Return an async context manager which tries to acquire a usage token upon entering while respecting the concurrency limit. Parameters ---------- key : :data:`.T` The key to acquire. Returns ------- :class:`uprate.bucket.BucketCM` The async context manager for the key. Example ------- .. code-block:: python3 bucket: Bucket[str] = Bucket(30 / Seconds(1), concurrency=1) ... # Wait until concurrency and ratelimit are satisfied. async with bucket.acquire("key"): ... """ return BucketCM(self, key)
[docs] async def reset(self, key: T = None) -> None: """Reset the given key. Parameters ---------- key : :data:`.T`, :data:`None`, (``T | None``) The key to reset ratelimit for. If :data:`None`, then resets all ratelimits, by default :data:`None`. """ self._limit.reset()
@property def rates(self) -> tuple[Rate, ...]: """tuple[:class:`~uprate.rate.Rate`] : Same as :attr:`.RateLimit.rates`""" return self._limit.rates @property def store(self) -> BaseStore: """:class:`~uprate.store.BaseStore` : Same as :attr:`.RateLimit.store`""" return self._limit.store