import { DecayingDeque } from "https://deno.land/x/grammy_runner@v2.0.3/mod.ts";
A decaying deque is a special kind of doubly linked list that serves as a queue for a special kind of nodes, called drifts.
A decaying deque has a worker function that spawns a task for each element that is added to the queue. This task then gets wrapped into a drift. The drifts are then the actual elements (aka. links) in the queue.
In addition, the decaying deque runs a timer that purges old elements from
the queue. This period of time is determined by the taskTimeout
.
When a task completes or exceeds its timeout, the corresponding drift is removed from the queue. As a result, only drifts with pending tasks are contained in the queue at all times.
When a tasks completes with failure (reject
s or exceeds the timeout), the
respective handler (catchError
or catchTimeout
) is called.
The decaying deque has its name from the observation that new elements are appended to the tail, and the old elements are removed at arbitrary positions in the queue whenever a task completes, hence, the queue seems to decay.
Properties
List of subscribers that wait for the queue to have capacity again. All
functions in this array will be called as soon as new capacity is
available, i.e. the number of pending tasks falls below concurrency
.
Timer that waits for the head element to time out, will be rescheduled
whenever the head element changes. It is undefined
iff the queue is
empty.
Methods
Called when a node completed its lifecycle and should be removed from the
queue. Effectively wraps the remove
call and takes care of the timer.
Removes an element from the queue. Calls subscribers if there is capacity after performing this operation.
Performs a timeout event. This removes the head element as well as all subsequent drifts with the same date (added in the same millisecond).
The timeout handler is called in sequence for every removed drift.
Takes a source element and starts the task for it by calling the worker function. Then wraps this task into a drift. Also makes sure that the drift removes itself from the queue once it completes, and that the error handler is invoked if it fails (rejects).
Adds the provided elements to the queue and starts tasks for all of them
immediately. Returns a Promise
that resolves with concurrency - length
once this value becomes positive.
Returns a Promise
that resolves with concurrency - length
once this
value becomes positive. Use await queue.capacity()
to wait until the
queue has free space again.
Creates a snapshot of the queue by computing a list of those elements that are currently being processed.