from __future__ import annotations
from abc import abstractmethod
from collections.abc import Hashable
from time import monotonic as _now
from typing import (TYPE_CHECKING, Optional, Protocol, TypeVar,
runtime_checkable)
if TYPE_CHECKING:
from .rate import Rate
from .ratelimit import RateLimit
__all__ = (
"BaseStore",
"MemoryStore"
)
T = TypeVar("T", contravariant=True)
"""An unbound and unconstrained contravariant TypeVar"""
H = TypeVar("H", contravariant=True, bound=Hashable)
"""A contravariant TypeVar bound to :class:`collections.abc.Hashable`"""
[docs]@runtime_checkable
class BaseStore(Protocol[T]):
"""A protocol defining the design of async stores.
This is generic in TypeVar :data:`uprate.store.T` which is unbound and unconstrained.
Subclasses implementing this protocol should inherit from this class.
Attributes
----------
limit : :class:`uprate.ratelimit.RateLimit`
The RateLimit to which this store is bound to.
"""
limit: RateLimit
[docs] def setup(self, ratelimit: RateLimit):
"""Adds the ratelimit that this store is bound to as an
attribute under :attr:`.BaseStore.limit`. This method exists
only to create a circular reference between the store and ratelimit.
:attr:`uprate.ratelimit.RateLimit.rates` attribute allows the store to access
all implemented rates.
Parameters
----------
ratelimit : :class:`uprate.ratelimit.RateLimit`
The ratelimit which this store is bound to.
"""
self.limit = ratelimit
[docs] @abstractmethod
async def acquire(self, key: T) -> tuple[bool, float, Optional[Rate]]:
"""Try to acquire a usage token for given key.
.. note::
To get all the rates that this key follows use
.. code-block:: python
self.limit.rates
.. note::
If a HashMap like data-structure is being nested, then it's best that it is nested by
the rates instead of the keys, since the number of keys may not exceed 1 in most cases,
while the number of keys could grow upto 100k or more fairly quickly.
Parameters
----------
key : :data:`uprate.store.T`
The key to acquire a ratelimit for.
Returns
-------
tuple[:class:`bool`, :class:`float`, :class:`~uprate.rate.Rate` | :data:`None`]
A three element tuple, the first element of type :class:`bool` depicting success.
Second element :class:`float` which is the amount of time to retry in, If a usage
token was acquired this should return ``0`` other-wise the time in which a
usage token will be available. If the store does not support retry time then it
should return a negative value like ``-1`` (negative values shall be returned only on
failure if the retry time cannot be determined).
The last element is the :class:`uprate.rate.Rate` object which was violated,
this must be the rate which will take the longest to reset. The last element is
expected to be :data:`None` if acquiring was successfull.
"""
...
[docs] @abstractmethod
async def reset(self, key: T) -> None:
"""Reset the usage tokens for given key.
Implementation wise, deleting all the records for the given key should be enough.
Parameters
----------
key : :data:`uprate.store.T`
The key to acquire a ratelimit for.
"""
...
[docs] @abstractmethod
async def clear(self) -> None:
"""Reset all the keys in the store.
"""
...
[docs]class MemoryStore(BaseStore[H]):
"""An implementation of :class:`.BaseStore` protocol.
This implementation uses :class:`dict` and ejects stale buckets/keys
periodically only when :meth:`.MemoryStore.acquire` is called.
This is a generic in TypeVar :data:`.H`
Attributes
----------
limit : :class:`uprate.ratelimit.RateLimit`
The RateLimit to which this store is bound to.
"""
_data: dict[H, tuple[list[int | float], ...]]
def __init__(self):
self._data = {}
self._last_verified = 0.0
def setup(self, ratelimit: RateLimit):
super().setup(ratelimit)
self._max_period = self.limit.rates[-1].period
async def acquire(self, key: H) -> tuple[bool, float, Optional[Rate]]:
now = _now()
# Would using loop.call_at be a better idea?
# or per key scheduled callback maybe?
self.verify_cache() # Evict stale keys
record = self._data.get(key, None)
if record is None:
# 1st insert
self._data[key] = tuple([i.uses - 1, now] for i in self.limit.rates)
return True, 0.0, None
else:
worst: float = False
worst_rate: Optional[Rate] = None
# Optimisations: We do not need to update every rate
# that expires only the ones that don't have usage tokens.
for use_dt, rate in zip(record, self.limit.rates):
if use_dt[0] == 0:
if (then := (use_dt[1] + rate.period)) <= now:
# We have no tokens left but the rate has expired
# so we reset it and acquire a token.
use_dt[:] = [rate.uses - 1, now]
elif (retry := then - now) > worst:
# no tokens and the rate has time left to expire.
worst = retry
worst_rate = rate
else:
use_dt[0] -= 1
if worst is False:
return True, 0.0, None
return False, worst, worst_rate
async def reset(self, key: H) -> None:
del self._data[key]
async def clear(self) -> None:
self._data.clear()
def verify_cache(self) -> None:
# There is no way something has expired since the last
# check if enough time hasn't passed.
now = _now()
if (now - self._last_verified) < self._max_period:
return
delete = list[H]()
for k, v in self._data.items():
if self._max_period < (now - v[-1][1]):
delete.append(k)
for i in delete:
del self._data[i]
self._last_verified = _now()