Skip to main content
Module

x/async_channels/mod.ts>Channel

Inspired by Go & Clojure Channels, async_channels provides channels as an asynchronous communication method between asynchronous functions.
Latest
class Channel
implements AsyncIterable<T>, AsyncIterator<T, void, void>
Re-export
import { Channel } from "https://deno.land/x/async_channels@v1.0.0-rc8/mod.ts";

Constructors

new
Channel(bufferSize?: number, options?: ChannelOptions)

Constructs a new Channel with an optional buffer.

Type Parameters

T

The type of value that can be sent to or received by this channel.

Methods

Closes the channel.

Closing a closed channel have no effect (positive or negative).

Sending a message to a closed channel will throw an AbortedError.

Receiving a message from a closed channel will resolve the promise immediately. See Channel.get for more information.

debug(...args: unknown[]): void

duplicate creates multiple channels (determined by n), and consumes this channel. The consumed values are then sent to all channels

error(...args: unknown[]): void
filter(fn: (val: T) => boolean | Promise<boolean>, pipeOpts?: ChannelPipeOptions): Receiver<T>

filter applies fn to each value in this channel, and returns a new channel that will only contain value for which fn returned true (or a promise that resolves to true).

The returned channel will close after this channel closes (or if the provided signal is triggered).

flat<K>(this: Receiver<Iterable<K> | AsyncIterable<K>>, pipeOpts?: ChannelPipeOptions): Receiver<K>

flat returns a receiver channel that contains the flattened (1 level) values of each value of this channel.

The receiver channel will close, when the original channel closes (or if the provided signal is triggered).

flatMap<TOut>(fn: (val: T) =>
| Iterable<TOut>
| AsyncIterable<TOut>
| Promise<Iterable<TOut>>
| Promise<AsyncIterable<TOut>>
, pipeOpts?: ChannelPipeOptions
): Receiver<TOut>

flatMap returns a receiver channel that contains the flattened (1 level) results of applying fn to each value of this channel.

The receiver channel will close, when the original channel closes (or if the provided signal is triggered).

forEach(fn: (val: T) => unknown | Promise<unknown>, pipeOpts?: ChannelPipeOptions): Receiver<void>

forEach applies fn to each value in this channel, and returns a channel that will contain the results. The returned channel will close after this channel closes (or if the provided signal is triggered).

get(abortCtrl?: AbortController): Promise<[T, true] | [undefined, false]>

get returns a promise that will be resolved with [T, true] when a value is available, or rejected if a provided AbortController is aborted.

If the channel is closed, then the promise will be resolved immediately with [undefined, false].

Receiving from a closed channel:

  import {Channel} from "./channel.ts";
  const ch = new Channel();
  ch.close();
  const [val, ok] = await ch.get()
  console.assert(val === undefined)
  console.assert(ok === false)

Receiving from a buffered channel:

  import {Channel} from "./channel.ts";
  const ch = new Channel(1);
  await ch.send("Hello world!")
  ch.close();
  const [val, ok] = await ch.get()
  console.assert(val === "Hello world!")
  console.assert(ok === true)

Aborting a get request:

  import {Channel, AbortedError} from "./channel.ts";
  const ch = new Channel(1);
  await ch.send("Hello world!")
  ch.close();
  const abortCtrl = new AbortController()
  abortCtrl.abort()
  try {
    await ch.get(abortCtrl);
    console.assert(false, "unreachable");
  } catch (e) {
    console.assert(e instanceof AbortedError);
  }
groupBy<TKey extends (string | symbol)>(fn: (val: T) => TKey | Promise<TKey>, pipeOpts?: ChannelPipeOptions): Record<TKey, Receiver<T>>
map<TOut>(fn: (val: T) => TOut | Promise<TOut>, pipeOpts?: ChannelPipeOptions): Receiver<TOut>

map returns a receiver channel that contains the results of applying fn to each value of this channel.

The receiver channel will close, when the original channel closes (or if the provided signal is triggered).

next(): Promise<IteratorResult<T, void>>

Blocks until a value is available on the channel, or returns immedietly if the channel is closed.

reduce(fn: (prev: T, current: T) => T | Promise<T>, pipeOpts?: ChannelPipeOptions): Receiver<T>

Closes the channel, and returns an empty result.

send(val: T, abortCtrl?: AbortController): Promise<void>

Sends a value on the channel, and returns a promise that will be resolved when a the value is received (see Channel.get), or rejected if a provided AbortController is aborted.

If the channel is closed, then the promise will be rejected with an InvalidTransitionError.

import {Channel, InvalidTransitionError} from "./channel.ts"

const ch = new Channel()
ch.close();
try {
  await ch.send("should fail")
  console.assert(false, "unreachable")
} catch (e) {
  console.assert(e instanceof InvalidTransitionError)
}
subscribe<TObj>(
fn: (_: T) => string | number | symbol,
topics: (keyof TObj)[],
options?: SubscribeOptions,
): SubscribeReturnType<T, TObj>
throw(e?: unknown)

Logs the error, closes the channel, and returns an empty result.

with<TOut, TThis extends Receiver<T>>(this: TThis, fn: (t: TThis) => TOut): TOut

Applies fn on this and returns the result.

[Symbol.asyncIterator](): AsyncGenerator<T, void, void>

Creates an AsyncGenerator that yields all values sent to this channel, and returns when the channel closes.

Static Methods

from<T>(input: Iterable<T> | AsyncIterable<T>, pipeOpts?: ChannelPipeOptions): Receiver<T>