Bucket

A bucket is a client aimed utility to aid concurrency and ratelimit combinations, it is also async context manager syntatic sugar over RateLimit

class uprate.bucket.Bucket(rate: Rate | RateGroup, store: BaseStore[T] = None, concurrency: int = 0)[source]

A high level ratelimit construct to obey both ratelimits and concurrency.

Note

Unlike other constructs in uprate, Bucket does not have a sync counterpart for threaded applications, a sync counterpart maybe added with enough interest.

Parameters
  • rate (Rate, RateGroup, (Rate | RateGroup)) – The rate(s) to enforce on keys in this bucket.

  • store (BaseStore, (BaseStore | None)) – The store to use for the under lying RateLimit. By default, None.

  • concurrency (int) – The number of concurrently executing acquires/context managers. If 0 or lower then only the ratelimit is enforced. By default, 0

classmethod from_limit(limit: uprate.ratelimit.RateLimit[uprate.bucket.T]) uprate.bucket.Bucket[uprate.bucket.T][source]

Create a bucket from a RateLimit

Parameters

limit (RateLimit) – The ratelimit to create a Bucket from. The provided ratelimit is used as it is, hence mutating it wil also affect the bucket

Returns

The created bucket

Return type

Bucket

acquire(key: uprate.bucket.T) uprate.bucket.BucketCM[uprate.bucket.T][source]

Return an async context manager which tries to acquire a usage token upon entering while respecting the concurrency limit.

Parameters

key (T) – The key to acquire.

Returns

The async context manager for the key.

Return type

uprate.bucket.BucketCM

Example

bucket: Bucket[str] = Bucket(30 / Seconds(1), concurrency=1)
...
# Wait until concurrency and ratelimit are satisfied.
async with bucket.acquire("key"):
    ...
async reset(key: Optional[uprate.bucket.T] = None) None[source]

Reset the given key.

Parameters

key (T, None, (T | None)) – The key to reset ratelimit for. If None, then resets all ratelimits, by default None.

property rates: tuple[Rate, ...]

Same as RateLimit.rates

Type

tuple[Rate]

property store: BaseStore

Same as RateLimit.store

Type

BaseStore

Example

An example of concurrency management with buckets
import uprate as up
import asyncio

async def worker(id: int, bucket: up.Bucket[str]):
    async with bucket.acquire("GLOBAL_KEY"):
        # Lower the id longer the sleep
        await asyncio.sleep(0.4 + (4 - id)/100)
        print("hello from ", id)

async def main():
    # Bucket uses a queue so during creation a running loop must be present.
    bucket = up.Bucket[str](2 / up.Seconds(1), concurrency=2)
    await asyncio.gather(*(asyncio.create_task(worker(i, bucket)) for i in range(5)))

asyncio.run(main())

# Output:
# hello from  1
# hello from  0
# hello from  3
# hello from  2
# hello from  4