Skip to main content
Latest
class DecayingDeque
Re-export
import { DecayingDeque } from "https://deno.land/x/grammy_runner@v2.0.3/mod.ts";

A decaying deque is a special kind of doubly linked list that serves as a queue for a special kind of nodes, called drifts.

A decaying deque has a worker function that spawns a task for each element that is added to the queue. This task then gets wrapped into a drift. The drifts are then the actual elements (aka. links) in the queue.

In addition, the decaying deque runs a timer that purges old elements from the queue. This period of time is determined by the taskTimeout.

When a task completes or exceeds its timeout, the corresponding drift is removed from the queue. As a result, only drifts with pending tasks are contained in the queue at all times.

When a tasks completes with failure (rejects or exceeds the timeout), the respective handler (catchError or catchTimeout) is called.

The decaying deque has its name from the observation that new elements are appended to the tail, and the old elements are removed at arbitrary positions in the queue whenever a task completes, hence, the queue seems to decay.

Constructors

new
DecayingDeque(
taskTimeout: number,
worker: (t: Y) => Promise<void>,
concurrency: boolean | number,
catchError: (err: R, elem: Y) => void | Promise<void>,
catchTimeout: (t: Y, task: Promise<void>) => void,
)

Creates a new decaying queue with the given parameters.

Type Parameters

Y
optional
R = unknown

Properties

private
emptySubscribers: Array<() => void>
private
head: Drift<Y> | null

Head element (oldest), null iff the queue is empty

private
len: number

Number of drifts in the queue. Equivalent to the number of currently pending tasks.

private
subscribers: Array<(capacity: number) => void>

List of subscribers that wait for the queue to have capacity again. All functions in this array will be called as soon as new capacity is available, i.e. the number of pending tasks falls below concurrency.

private
tail: Drift<Y> | null

Tail element (newest), null iff the queue is empty

private
timer: ReturnType<setTimeout> | undefined

Timer that waits for the head element to time out, will be rescheduled whenever the head element changes. It is undefined iff the queue is empty.

readonly
concurrency: number

Number of currently pending tasks that we strive for (add calls will resolve only after the number of pending tasks falls below this value.

In the context of grammy, it is possible to await calls to add to determine when to fetch more updates.

readonly
length

Number of pending tasks in the queue. Equivalent to this.pendingTasks().length (but much more efficient).

Methods

private
decay(node: Drift<Y>): void

Called when a node completed its lifecycle and should be removed from the queue. Effectively wraps the remove call and takes care of the timer.

private
remove(node: Drift<Y>): void

Removes an element from the queue. Calls subscribers if there is capacity after performing this operation.

private
startTimer(ms?): void

Starts a timer that fires off a timeout after the given period of time.

private
timeout(): void

Performs a timeout event. This removes the head element as well as all subsequent drifts with the same date (added in the same millisecond).

The timeout handler is called in sequence for every removed drift.

private
toDrift(elem: Y, date: number): Drift<Y>

Takes a source element and starts the task for it by calling the worker function. Then wraps this task into a drift. Also makes sure that the drift removes itself from the queue once it completes, and that the error handler is invoked if it fails (rejects).

add(elems: Y[]): Promise<number>

Adds the provided elements to the queue and starts tasks for all of them immediately. Returns a Promise that resolves with concurrency - length once this value becomes positive.

capacity(): Promise<number>

Returns a Promise that resolves with concurrency - length once this value becomes positive. Use await queue.capacity() to wait until the queue has free space again.

empty(): Promise<void>

Creates a snapshot of the queue by computing a list of those elements that are currently being processed.