Blocking Promise-driven queues with ECMAScript's async/await and generators

Clash Royale CLAN TAG#URR8PPP
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;
up vote
3
down vote
favorite
I implemented an AsyncQueue, whose dequeue() operation returns a Promise. The AsyncLimitedQueue additionally enforces a limit on the number of entries, so that queue() returns a Promise as well.
- Full code + unit tests: https://github.com/ComFreek/async-playground
- Generated documentation: https://comfreek.github.io/async-playground
Both classes use Semaphore, which I also built using Promises. For the sake of brevity, I would like to restrain this question to the queue implementations.
Usage example
Excerpt from https://github.com/ComFreek/async-playground/blob/master/examples/queue-stdio-lines.ts:
async function* readInput() null> = new AsyncQueue();
// readline is an NPM package simplifying reading lines from stdio
const rl = readline.createInterface( /* ... */ );
rl.on('line', (line: string) => queue.queue(line));
rl.on('close', () => queue.queue(null));
yield* queue;
for await (const line of readInput())
if (line === null)
break;
console.log(line);
Questions
Have I followed best practices?
Are the method contracts clean? Especially, have I employed asynchronous operations and promises where appropriate?
Having both interfaces
IAsyncQueueandIAsyncLimitedQueue, I intentionally did not create a common superinterface. Was this a right decision?The reason is that
IAsyncQueue#queueand similar operations are synchronous and therefore non-blocking contrary toIAsyncLimitedQueue#queue, which is asynchronous.
This difference is also manifested in their types,voidvsPromise<void>. MakingAsyncQueue#queuereturning a Promise as well would violate the principle of least astonishment, namely that it is synchronous despite its return type.
Code
Online: repo, docs
AsyncQueue.ts
import ISemaphore, Semaphore from '../semaphore/index';
/**
* Asynchronous FIFO queue with a Promise-driven dequeue operation.
*
* All element values are allowed, especially falsy ones, e.g.
* false, 0, undefined, null, , are all valid elements which
* can be queued and dequeued.
*
* The @link AsyncIterable interface iterates the queue's (future) contents
* ad infinitum. Users are advised to signal the end by manual insertion of a
* special value (a so-called poison pill):
*
* ```
* const queue = new AsyncQueue<string|null>();
* file.on('data', (data) => queue.queue(data));
* file.on('close', () => queue.queue(null));
*
* for await (const data of queue)
* if (data === null)
* break;
*
* // Otherwise, process data
*
* ```
*/
export interface IAsyncQueue<T> extends AsyncIterable<T>
/**
* Queue an element immediately.
*/
queue(data: T): void;
/**
* Queue all elements of an iterable, e.g. an array or a generator function.
*
* @example `queue.queueAll(['myArray', 'of', 'strings'])`
*
* @example If one has a generator function f:
* `function *f(): Iterable<string> ... `
* then you can call `queue.queueAll(f())`.
*/
queueAll(iterable: Iterable<T>): void;
/**
* Queue all elements of an asynchronous iterable, e.g. an asynchronous
* generator functions.
*
* @example Using an asynchronous generator function:
* ```
* async function *f(): AsyncIterable<string>
* yield* ['Array', 'of', 'strings'];
*
*
* const previousSize = queue.size();
* queue.queueAllAsync(f());
* // ^ We do not await the queueing!
* // Therefore: queue.size() === previousSize here!
* // This is indeed guaranteed by JS' execution model. There is
* // no way queueAllAsync could have queried an element from f()
* // asynchronously using a promise before this code gives up
* // the "CPU power" by await or yield.
*
* await queue.dequeue(); // 'Array'
* await queue.dequeue(); // 'of'
* await queue.dequeue(); // 'strings'
*
* // queue.size() === 0 and queue.dequeue() would block
* // ad infinitum
*
* await queue.queueAllAsync(f());
* // We now await the queueing!
* // Therefore: queue.size() === 3 here!
* ```
*
* @example AsyncQueue instances are also asynchronous iterables,
* meaning that you can stack multiple queues together:
* ```
* const backgroundQueue: IAsyncQueue<string> = new AsyncQueue();
* const foregroundQueue: IAsyncQueue<string> = new AsyncQueue();
*
* setTimeout(() => backgroundQueue.queue('Hello World!'), 100);
*
* foregroundQueue.queueAllAsync(backgroundQueue);
* const retrievedString = await foregroundQueue.dequeue();
*
* // retrievedString === 'Hello World!'
* ```
*/
queueAllAsync(iterable: AsyncIterable<T>): Promise<void>;
/**
* Dequeue an element, waiting for data to be available if necessary.
*
* @returns A promise which is fulfilled when an element (as queued by
* queue()) becomes available.
* If multiple dequeus() are issued sequentially, it is
* implementation-defined whether they are fulfilled in the same
* order or not. However, the data is still retrieved in FIFO
* fashion, meaning the first fulfilled promise gets the first
* element, the second fulfilled the second one and so forth.
*/
dequeue(): Promise<T>;
/**
* Dequeue an element if available or throw an exception otherwise.
*
* @returns The first element of the queue.
* @throws An exception if the queue is empty at the time of the call.
*/
poll(): T;
/**
* Return the current size at the moment of the call.
*
* Even though code like
* ```
* if (queue.size() >= 1)
* const element = queue.poll();
*
* ```
* is technically not wrong (due to JS' execution model), users are
* advised to avoid this pattern. Instead, users are encouraged to
*
* - in cases where waiting for a promise is impossible, to use
* @link poll and catch the exception,
* - or to use @link dequeue with JS' `await` or
* `queue.dequeue().then(...)`.
*/
size(): number;
export class NoElementError extends Error
export class AsyncQueue<T> implements IAsyncQueue<T>
private buffer: T = ;
private elementSem: ISemaphore = new Semaphore(0);
public queue(data: T): void
this.buffer.push(data);
this.elementSem.free();
public queueAll(iterable: Iterable<T>): void
for (const element of iterable)
this.queue(element);
public async queueAllAsync(iterable: AsyncIterable<T>): Promise<void>
for await (const element of iterable)
this.queue(element);
public async dequeue(): Promise<T>
await this.elementSem.take();
try
return this.poll();
catch (err)
if (err instanceof NoElementError)
throw new Error('AsyncQueue dequeue: poll() threw an exception
even though dequeue() waited for its element semaphore to be available via take().');
else
throw err;
public poll(): T
if (this.buffer.length >= 1)
const dequeuedElement = this.buffer.shift();
// Force-cast the element since we know that the buffer contains
// at least one element and JS' execution model prohibits other
// interleaving fibers to modify the buffer (=> no race condition).
//
// Also, we cannot check for shift() returning undefined as the queue
// might well contain "undefined" as such.
return (dequeuedElement as T);
else
throw new NoElementError();
public size(): number
return this.buffer.length;
public async *[Symbol.asyncIterator](): AsyncIterableIterator<T>
while (true)
yield this.dequeue();
AsyncLimitedQueue:
import IAsyncQueue, AsyncQueue from './AsyncQueue';
import ISemaphore, Semaphore from '../semaphore/index';
/**
* Asynchronous entrance-limited FIFO queue with a Promise-driven dequeue operation.
*
* Contrary to @link IAsyncQueue, the queue operation is Promise-driven as well,
* e.g. implementations might delay entrance into the queue, e.g. to enforce a
* limit on the number of elements stored in the queue at the same time, cf.
* @link AsyncLimitedQueue.
* Other types of entrance limitations are conceivable as well, such as a
* restriction on the sum of contained elements in case of a number queue.
*
* All element values are allowed, especially falsy ones, e.g.
* false, 0, undefined, null, , are all valid elements which
* can be queued and dequeued.
*
* @link queue IAsyncLimitedQueue#queue operations are possibly delayed and
* executed in implementation-dependenent order.
*
* @example Issueing multiple @link queue operations without awaiting the
* previous ones may result in implementation-defined insertion order.
* ```
* queue.queue(1);
* queue.queue(2);
*
* await queue.dequeue(); // can be 1 or 2
* await queue.dequeue(); // can be 1 or 2 as well (the remaining number)
* ```
*
* @example If you would like to retain the order, await the @link queue
* operations, use @link queueAll IAsyncLimitedQueue#queueAll or
* @link queueAllAsync IAsyncLimitedQueue#queueAllAsync.
* ```
* await queue.queue(1);
* await queue.queue(2);
* ```
* ```
* queue.queueAll([1, 2]);
* ```
*
* The @link AsyncIterable interface iterates the queue's (future) contents
* ad infinitum. Users are advised to signal the end by manual insertion of a
* special value (a so-called poison pill), see @link IAsyncQueue.
*/
export interface IAsyncLimitedQueue<T> extends AsyncIterable<T>
/**
* Queue an element, waiting for entrance if necessary.
*
* @example
* ```
* queue.queue(42).then(() =>
* // 42 is now stored within the queue
* );
* ```
*/
queue(data: T): Promise<void>;
/**
* Queue all elements of an iterable, e.g. an array or a generator function.
* @see IAsyncQueue#queueAll
*/
queueAll(iterable: Iterable<T>): Promise<void>;
/**
* Queue all elements of an asynchronous iterable, e.g. an asynchronous
* generator functions.
*
* @see IAsyncQueue#queueAllAsync
*/
queueAllAsync(iterable: AsyncIterable<T>): Promise<void>;
/**
* Offer an element, only queueing it if entrance is available at the time
* of the call.
*
* @returns True if the element could be inserted right away. False
* otherwise.
*/
offer(data: T): boolean;
/**
* Offer all elements of an iterable for in-order insertion.
*
* @param iterable An iterable whose first (limit - queue.size()) elements
* will be inserted. Iterables which iterate an infinite
* number of elements can also be passed and will *not*
* result in an endless loop.
*
* @returns The number of elements, which could be inserted right away.
* Possibly 0 when the queue was full at the time of the call.
*/
offerAll(iterable: Iterable<T>): number;
/**
* Offer all elements of an asynchronous iterable for in-order insertion.
*
* @param iterable An iterable whose elements will be @link offered
* in-order for this queue.
* The method will stop querying and offering further
* elements upon the first @link offer call, which
* returns `false`.
* <br>
* Contrary to @link offerAll, iterables iterating an
* infinite number of elements might prevent the Promise,
* which @link offerAllAsync returns, from ever resolving.
* <br>
* This depends on @link dequeue operations which could
* get scheduled by the JS VM while elements from the passed
* asynchronous iterator are accessed.
*
* @returns A promise resolving to the number of elements, which could be
* inserted (offered successfully) consecutively without waiting.
* Possibly 0 when the queue was full at the time of the call.
* Fulfillment of this promise is not guaranteed in case of infinite
* iterables.
*/
offerAllAsync(iterable: AsyncIterable<T>): Promise<number>;
/**
* Dequeue an element if available or throw an exception otherwise.
*
* @returns The first element of the queue.
* @throws An exception if the queue is empty at the time of the call.
*/
poll(): T;
/**
* Dequeue an element, waiting for data to be available if necessary.
*
* @returns A promise which is fulfilled when an element (as queued by
* queue()) becomes available.
* If multiple dequeus() are issued sequentially, it is
* implementation-defined whether they are fulfilled in the same
* order or not. However, the data is still retrieved in FIFO
* fashion, meaning the first fulfilled promise gets the first
* element, the second fulfilled the second one and so forth.
*/
dequeue(): Promise<T>;
/**
* Return the current size at the moment of the call.
*
* Even though code like
* ```
* if (queue.size() >= 1)
* const element = queue.poll();
*
* ```
* is technically not wrong (due to JS' execution model), users are
* advised to avoid this pattern. Instead, users are encouraged to
*
* - in cases where waiting for a promise is impossible, to use
* @link poll and catch the exception,
* - or to use @link dequeue with JS' `await` or
* `queue.dequeue().then(...)`.
*/
size(): number;
/**
* Asynchronous element-limited FIFO queue with a Promise-driven dequeue operation.
*
* @link AsyncLimitedQueue#queue operations are delayed (in unspecified order)
* until space becomes available through dequeue operations.
*/
export class AsyncLimitedQueue<T> implements IAsyncLimitedQueue<T>
private limitSem: ISemaphore;
/**
* Initialize the queue.
* @param limit A integer >= 1 specifying the number of elements after which
* queue() effectively blocks (i.e. the promise returned by it
* does not get "immediately" fulfilled for some informal value
* of immediately).
* @param storageQueue An asynchronous (non-limiting) queue backing the data.
* It defaults to a AsyncQueue.
*
* @throws An exception in case the limit is not an integer or is <= 0.
*/
public constructor(limit: number, private storageQueue: IAsyncQueue<T> = new AsyncQueue())
public async queue(data: T): Promise<void>
await this.limitSem.take();
this.storageQueue.queue(data);
public async queueAll(iterable: Iterable<T>): Promise<void>
for (const element of iterable)
await this.queue(element);
public async queueAllAsync(iterable: AsyncIterable<T>): Promise<void>
for await (const element of iterable)
await this.queue(element);
public offer(data: T): boolean
if (this.limitSem.tryTake())
this.storageQueue.queue(data);
return true;
else
return false;
public offerAll(iterable: Iterable<T>): number
let insertedElements = 0;
for (const element of iterable)
if (!this.offer(element))
return insertedElements;
insertedElements++;
return insertedElements;
public async offerAllAsync(iterable: AsyncIterable<T>): Promise<number>
let insertedElements = 0;
for await (const element of iterable)
if (!this.offer(element))
return insertedElements;
insertedElements++;
return insertedElements;
public async dequeue(): Promise<T>
return this.storageQueue.dequeue().then(element =>
this.limitSem.free();
return element;
);
public poll(): T
return this.storageQueue.poll();
public async *[Symbol.asyncIterator](): AsyncIterableIterator<T>
while (true)
yield this.dequeue();
public size(): number
return this.storageQueue.size();
javascript queue promise async-await typescript
add a comment |Â
up vote
3
down vote
favorite
I implemented an AsyncQueue, whose dequeue() operation returns a Promise. The AsyncLimitedQueue additionally enforces a limit on the number of entries, so that queue() returns a Promise as well.
- Full code + unit tests: https://github.com/ComFreek/async-playground
- Generated documentation: https://comfreek.github.io/async-playground
Both classes use Semaphore, which I also built using Promises. For the sake of brevity, I would like to restrain this question to the queue implementations.
Usage example
Excerpt from https://github.com/ComFreek/async-playground/blob/master/examples/queue-stdio-lines.ts:
async function* readInput() null> = new AsyncQueue();
// readline is an NPM package simplifying reading lines from stdio
const rl = readline.createInterface( /* ... */ );
rl.on('line', (line: string) => queue.queue(line));
rl.on('close', () => queue.queue(null));
yield* queue;
for await (const line of readInput())
if (line === null)
break;
console.log(line);
Questions
Have I followed best practices?
Are the method contracts clean? Especially, have I employed asynchronous operations and promises where appropriate?
Having both interfaces
IAsyncQueueandIAsyncLimitedQueue, I intentionally did not create a common superinterface. Was this a right decision?The reason is that
IAsyncQueue#queueand similar operations are synchronous and therefore non-blocking contrary toIAsyncLimitedQueue#queue, which is asynchronous.
This difference is also manifested in their types,voidvsPromise<void>. MakingAsyncQueue#queuereturning a Promise as well would violate the principle of least astonishment, namely that it is synchronous despite its return type.
Code
Online: repo, docs
AsyncQueue.ts
import ISemaphore, Semaphore from '../semaphore/index';
/**
* Asynchronous FIFO queue with a Promise-driven dequeue operation.
*
* All element values are allowed, especially falsy ones, e.g.
* false, 0, undefined, null, , are all valid elements which
* can be queued and dequeued.
*
* The @link AsyncIterable interface iterates the queue's (future) contents
* ad infinitum. Users are advised to signal the end by manual insertion of a
* special value (a so-called poison pill):
*
* ```
* const queue = new AsyncQueue<string|null>();
* file.on('data', (data) => queue.queue(data));
* file.on('close', () => queue.queue(null));
*
* for await (const data of queue)
* if (data === null)
* break;
*
* // Otherwise, process data
*
* ```
*/
export interface IAsyncQueue<T> extends AsyncIterable<T>
/**
* Queue an element immediately.
*/
queue(data: T): void;
/**
* Queue all elements of an iterable, e.g. an array or a generator function.
*
* @example `queue.queueAll(['myArray', 'of', 'strings'])`
*
* @example If one has a generator function f:
* `function *f(): Iterable<string> ... `
* then you can call `queue.queueAll(f())`.
*/
queueAll(iterable: Iterable<T>): void;
/**
* Queue all elements of an asynchronous iterable, e.g. an asynchronous
* generator functions.
*
* @example Using an asynchronous generator function:
* ```
* async function *f(): AsyncIterable<string>
* yield* ['Array', 'of', 'strings'];
*
*
* const previousSize = queue.size();
* queue.queueAllAsync(f());
* // ^ We do not await the queueing!
* // Therefore: queue.size() === previousSize here!
* // This is indeed guaranteed by JS' execution model. There is
* // no way queueAllAsync could have queried an element from f()
* // asynchronously using a promise before this code gives up
* // the "CPU power" by await or yield.
*
* await queue.dequeue(); // 'Array'
* await queue.dequeue(); // 'of'
* await queue.dequeue(); // 'strings'
*
* // queue.size() === 0 and queue.dequeue() would block
* // ad infinitum
*
* await queue.queueAllAsync(f());
* // We now await the queueing!
* // Therefore: queue.size() === 3 here!
* ```
*
* @example AsyncQueue instances are also asynchronous iterables,
* meaning that you can stack multiple queues together:
* ```
* const backgroundQueue: IAsyncQueue<string> = new AsyncQueue();
* const foregroundQueue: IAsyncQueue<string> = new AsyncQueue();
*
* setTimeout(() => backgroundQueue.queue('Hello World!'), 100);
*
* foregroundQueue.queueAllAsync(backgroundQueue);
* const retrievedString = await foregroundQueue.dequeue();
*
* // retrievedString === 'Hello World!'
* ```
*/
queueAllAsync(iterable: AsyncIterable<T>): Promise<void>;
/**
* Dequeue an element, waiting for data to be available if necessary.
*
* @returns A promise which is fulfilled when an element (as queued by
* queue()) becomes available.
* If multiple dequeus() are issued sequentially, it is
* implementation-defined whether they are fulfilled in the same
* order or not. However, the data is still retrieved in FIFO
* fashion, meaning the first fulfilled promise gets the first
* element, the second fulfilled the second one and so forth.
*/
dequeue(): Promise<T>;
/**
* Dequeue an element if available or throw an exception otherwise.
*
* @returns The first element of the queue.
* @throws An exception if the queue is empty at the time of the call.
*/
poll(): T;
/**
* Return the current size at the moment of the call.
*
* Even though code like
* ```
* if (queue.size() >= 1)
* const element = queue.poll();
*
* ```
* is technically not wrong (due to JS' execution model), users are
* advised to avoid this pattern. Instead, users are encouraged to
*
* - in cases where waiting for a promise is impossible, to use
* @link poll and catch the exception,
* - or to use @link dequeue with JS' `await` or
* `queue.dequeue().then(...)`.
*/
size(): number;
export class NoElementError extends Error
export class AsyncQueue<T> implements IAsyncQueue<T>
private buffer: T = ;
private elementSem: ISemaphore = new Semaphore(0);
public queue(data: T): void
this.buffer.push(data);
this.elementSem.free();
public queueAll(iterable: Iterable<T>): void
for (const element of iterable)
this.queue(element);
public async queueAllAsync(iterable: AsyncIterable<T>): Promise<void>
for await (const element of iterable)
this.queue(element);
public async dequeue(): Promise<T>
await this.elementSem.take();
try
return this.poll();
catch (err)
if (err instanceof NoElementError)
throw new Error('AsyncQueue dequeue: poll() threw an exception
even though dequeue() waited for its element semaphore to be available via take().');
else
throw err;
public poll(): T
if (this.buffer.length >= 1)
const dequeuedElement = this.buffer.shift();
// Force-cast the element since we know that the buffer contains
// at least one element and JS' execution model prohibits other
// interleaving fibers to modify the buffer (=> no race condition).
//
// Also, we cannot check for shift() returning undefined as the queue
// might well contain "undefined" as such.
return (dequeuedElement as T);
else
throw new NoElementError();
public size(): number
return this.buffer.length;
public async *[Symbol.asyncIterator](): AsyncIterableIterator<T>
while (true)
yield this.dequeue();
AsyncLimitedQueue:
import IAsyncQueue, AsyncQueue from './AsyncQueue';
import ISemaphore, Semaphore from '../semaphore/index';
/**
* Asynchronous entrance-limited FIFO queue with a Promise-driven dequeue operation.
*
* Contrary to @link IAsyncQueue, the queue operation is Promise-driven as well,
* e.g. implementations might delay entrance into the queue, e.g. to enforce a
* limit on the number of elements stored in the queue at the same time, cf.
* @link AsyncLimitedQueue.
* Other types of entrance limitations are conceivable as well, such as a
* restriction on the sum of contained elements in case of a number queue.
*
* All element values are allowed, especially falsy ones, e.g.
* false, 0, undefined, null, , are all valid elements which
* can be queued and dequeued.
*
* @link queue IAsyncLimitedQueue#queue operations are possibly delayed and
* executed in implementation-dependenent order.
*
* @example Issueing multiple @link queue operations without awaiting the
* previous ones may result in implementation-defined insertion order.
* ```
* queue.queue(1);
* queue.queue(2);
*
* await queue.dequeue(); // can be 1 or 2
* await queue.dequeue(); // can be 1 or 2 as well (the remaining number)
* ```
*
* @example If you would like to retain the order, await the @link queue
* operations, use @link queueAll IAsyncLimitedQueue#queueAll or
* @link queueAllAsync IAsyncLimitedQueue#queueAllAsync.
* ```
* await queue.queue(1);
* await queue.queue(2);
* ```
* ```
* queue.queueAll([1, 2]);
* ```
*
* The @link AsyncIterable interface iterates the queue's (future) contents
* ad infinitum. Users are advised to signal the end by manual insertion of a
* special value (a so-called poison pill), see @link IAsyncQueue.
*/
export interface IAsyncLimitedQueue<T> extends AsyncIterable<T>
/**
* Queue an element, waiting for entrance if necessary.
*
* @example
* ```
* queue.queue(42).then(() =>
* // 42 is now stored within the queue
* );
* ```
*/
queue(data: T): Promise<void>;
/**
* Queue all elements of an iterable, e.g. an array or a generator function.
* @see IAsyncQueue#queueAll
*/
queueAll(iterable: Iterable<T>): Promise<void>;
/**
* Queue all elements of an asynchronous iterable, e.g. an asynchronous
* generator functions.
*
* @see IAsyncQueue#queueAllAsync
*/
queueAllAsync(iterable: AsyncIterable<T>): Promise<void>;
/**
* Offer an element, only queueing it if entrance is available at the time
* of the call.
*
* @returns True if the element could be inserted right away. False
* otherwise.
*/
offer(data: T): boolean;
/**
* Offer all elements of an iterable for in-order insertion.
*
* @param iterable An iterable whose first (limit - queue.size()) elements
* will be inserted. Iterables which iterate an infinite
* number of elements can also be passed and will *not*
* result in an endless loop.
*
* @returns The number of elements, which could be inserted right away.
* Possibly 0 when the queue was full at the time of the call.
*/
offerAll(iterable: Iterable<T>): number;
/**
* Offer all elements of an asynchronous iterable for in-order insertion.
*
* @param iterable An iterable whose elements will be @link offered
* in-order for this queue.
* The method will stop querying and offering further
* elements upon the first @link offer call, which
* returns `false`.
* <br>
* Contrary to @link offerAll, iterables iterating an
* infinite number of elements might prevent the Promise,
* which @link offerAllAsync returns, from ever resolving.
* <br>
* This depends on @link dequeue operations which could
* get scheduled by the JS VM while elements from the passed
* asynchronous iterator are accessed.
*
* @returns A promise resolving to the number of elements, which could be
* inserted (offered successfully) consecutively without waiting.
* Possibly 0 when the queue was full at the time of the call.
* Fulfillment of this promise is not guaranteed in case of infinite
* iterables.
*/
offerAllAsync(iterable: AsyncIterable<T>): Promise<number>;
/**
* Dequeue an element if available or throw an exception otherwise.
*
* @returns The first element of the queue.
* @throws An exception if the queue is empty at the time of the call.
*/
poll(): T;
/**
* Dequeue an element, waiting for data to be available if necessary.
*
* @returns A promise which is fulfilled when an element (as queued by
* queue()) becomes available.
* If multiple dequeus() are issued sequentially, it is
* implementation-defined whether they are fulfilled in the same
* order or not. However, the data is still retrieved in FIFO
* fashion, meaning the first fulfilled promise gets the first
* element, the second fulfilled the second one and so forth.
*/
dequeue(): Promise<T>;
/**
* Return the current size at the moment of the call.
*
* Even though code like
* ```
* if (queue.size() >= 1)
* const element = queue.poll();
*
* ```
* is technically not wrong (due to JS' execution model), users are
* advised to avoid this pattern. Instead, users are encouraged to
*
* - in cases where waiting for a promise is impossible, to use
* @link poll and catch the exception,
* - or to use @link dequeue with JS' `await` or
* `queue.dequeue().then(...)`.
*/
size(): number;
/**
* Asynchronous element-limited FIFO queue with a Promise-driven dequeue operation.
*
* @link AsyncLimitedQueue#queue operations are delayed (in unspecified order)
* until space becomes available through dequeue operations.
*/
export class AsyncLimitedQueue<T> implements IAsyncLimitedQueue<T>
private limitSem: ISemaphore;
/**
* Initialize the queue.
* @param limit A integer >= 1 specifying the number of elements after which
* queue() effectively blocks (i.e. the promise returned by it
* does not get "immediately" fulfilled for some informal value
* of immediately).
* @param storageQueue An asynchronous (non-limiting) queue backing the data.
* It defaults to a AsyncQueue.
*
* @throws An exception in case the limit is not an integer or is <= 0.
*/
public constructor(limit: number, private storageQueue: IAsyncQueue<T> = new AsyncQueue())
public async queue(data: T): Promise<void>
await this.limitSem.take();
this.storageQueue.queue(data);
public async queueAll(iterable: Iterable<T>): Promise<void>
for (const element of iterable)
await this.queue(element);
public async queueAllAsync(iterable: AsyncIterable<T>): Promise<void>
for await (const element of iterable)
await this.queue(element);
public offer(data: T): boolean
if (this.limitSem.tryTake())
this.storageQueue.queue(data);
return true;
else
return false;
public offerAll(iterable: Iterable<T>): number
let insertedElements = 0;
for (const element of iterable)
if (!this.offer(element))
return insertedElements;
insertedElements++;
return insertedElements;
public async offerAllAsync(iterable: AsyncIterable<T>): Promise<number>
let insertedElements = 0;
for await (const element of iterable)
if (!this.offer(element))
return insertedElements;
insertedElements++;
return insertedElements;
public async dequeue(): Promise<T>
return this.storageQueue.dequeue().then(element =>
this.limitSem.free();
return element;
);
public poll(): T
return this.storageQueue.poll();
public async *[Symbol.asyncIterator](): AsyncIterableIterator<T>
while (true)
yield this.dequeue();
public size(): number
return this.storageQueue.size();
javascript queue promise async-await typescript
add a comment |Â
up vote
3
down vote
favorite
up vote
3
down vote
favorite
I implemented an AsyncQueue, whose dequeue() operation returns a Promise. The AsyncLimitedQueue additionally enforces a limit on the number of entries, so that queue() returns a Promise as well.
- Full code + unit tests: https://github.com/ComFreek/async-playground
- Generated documentation: https://comfreek.github.io/async-playground
Both classes use Semaphore, which I also built using Promises. For the sake of brevity, I would like to restrain this question to the queue implementations.
Usage example
Excerpt from https://github.com/ComFreek/async-playground/blob/master/examples/queue-stdio-lines.ts:
async function* readInput() null> = new AsyncQueue();
// readline is an NPM package simplifying reading lines from stdio
const rl = readline.createInterface( /* ... */ );
rl.on('line', (line: string) => queue.queue(line));
rl.on('close', () => queue.queue(null));
yield* queue;
for await (const line of readInput())
if (line === null)
break;
console.log(line);
Questions
Have I followed best practices?
Are the method contracts clean? Especially, have I employed asynchronous operations and promises where appropriate?
Having both interfaces
IAsyncQueueandIAsyncLimitedQueue, I intentionally did not create a common superinterface. Was this a right decision?The reason is that
IAsyncQueue#queueand similar operations are synchronous and therefore non-blocking contrary toIAsyncLimitedQueue#queue, which is asynchronous.
This difference is also manifested in their types,voidvsPromise<void>. MakingAsyncQueue#queuereturning a Promise as well would violate the principle of least astonishment, namely that it is synchronous despite its return type.
Code
Online: repo, docs
AsyncQueue.ts
import ISemaphore, Semaphore from '../semaphore/index';
/**
* Asynchronous FIFO queue with a Promise-driven dequeue operation.
*
* All element values are allowed, especially falsy ones, e.g.
* false, 0, undefined, null, , are all valid elements which
* can be queued and dequeued.
*
* The @link AsyncIterable interface iterates the queue's (future) contents
* ad infinitum. Users are advised to signal the end by manual insertion of a
* special value (a so-called poison pill):
*
* ```
* const queue = new AsyncQueue<string|null>();
* file.on('data', (data) => queue.queue(data));
* file.on('close', () => queue.queue(null));
*
* for await (const data of queue)
* if (data === null)
* break;
*
* // Otherwise, process data
*
* ```
*/
export interface IAsyncQueue<T> extends AsyncIterable<T>
/**
* Queue an element immediately.
*/
queue(data: T): void;
/**
* Queue all elements of an iterable, e.g. an array or a generator function.
*
* @example `queue.queueAll(['myArray', 'of', 'strings'])`
*
* @example If one has a generator function f:
* `function *f(): Iterable<string> ... `
* then you can call `queue.queueAll(f())`.
*/
queueAll(iterable: Iterable<T>): void;
/**
* Queue all elements of an asynchronous iterable, e.g. an asynchronous
* generator functions.
*
* @example Using an asynchronous generator function:
* ```
* async function *f(): AsyncIterable<string>
* yield* ['Array', 'of', 'strings'];
*
*
* const previousSize = queue.size();
* queue.queueAllAsync(f());
* // ^ We do not await the queueing!
* // Therefore: queue.size() === previousSize here!
* // This is indeed guaranteed by JS' execution model. There is
* // no way queueAllAsync could have queried an element from f()
* // asynchronously using a promise before this code gives up
* // the "CPU power" by await or yield.
*
* await queue.dequeue(); // 'Array'
* await queue.dequeue(); // 'of'
* await queue.dequeue(); // 'strings'
*
* // queue.size() === 0 and queue.dequeue() would block
* // ad infinitum
*
* await queue.queueAllAsync(f());
* // We now await the queueing!
* // Therefore: queue.size() === 3 here!
* ```
*
* @example AsyncQueue instances are also asynchronous iterables,
* meaning that you can stack multiple queues together:
* ```
* const backgroundQueue: IAsyncQueue<string> = new AsyncQueue();
* const foregroundQueue: IAsyncQueue<string> = new AsyncQueue();
*
* setTimeout(() => backgroundQueue.queue('Hello World!'), 100);
*
* foregroundQueue.queueAllAsync(backgroundQueue);
* const retrievedString = await foregroundQueue.dequeue();
*
* // retrievedString === 'Hello World!'
* ```
*/
queueAllAsync(iterable: AsyncIterable<T>): Promise<void>;
/**
* Dequeue an element, waiting for data to be available if necessary.
*
* @returns A promise which is fulfilled when an element (as queued by
* queue()) becomes available.
* If multiple dequeus() are issued sequentially, it is
* implementation-defined whether they are fulfilled in the same
* order or not. However, the data is still retrieved in FIFO
* fashion, meaning the first fulfilled promise gets the first
* element, the second fulfilled the second one and so forth.
*/
dequeue(): Promise<T>;
/**
* Dequeue an element if available or throw an exception otherwise.
*
* @returns The first element of the queue.
* @throws An exception if the queue is empty at the time of the call.
*/
poll(): T;
/**
* Return the current size at the moment of the call.
*
* Even though code like
* ```
* if (queue.size() >= 1)
* const element = queue.poll();
*
* ```
* is technically not wrong (due to JS' execution model), users are
* advised to avoid this pattern. Instead, users are encouraged to
*
* - in cases where waiting for a promise is impossible, to use
* @link poll and catch the exception,
* - or to use @link dequeue with JS' `await` or
* `queue.dequeue().then(...)`.
*/
size(): number;
export class NoElementError extends Error
export class AsyncQueue<T> implements IAsyncQueue<T>
private buffer: T = ;
private elementSem: ISemaphore = new Semaphore(0);
public queue(data: T): void
this.buffer.push(data);
this.elementSem.free();
public queueAll(iterable: Iterable<T>): void
for (const element of iterable)
this.queue(element);
public async queueAllAsync(iterable: AsyncIterable<T>): Promise<void>
for await (const element of iterable)
this.queue(element);
public async dequeue(): Promise<T>
await this.elementSem.take();
try
return this.poll();
catch (err)
if (err instanceof NoElementError)
throw new Error('AsyncQueue dequeue: poll() threw an exception
even though dequeue() waited for its element semaphore to be available via take().');
else
throw err;
public poll(): T
if (this.buffer.length >= 1)
const dequeuedElement = this.buffer.shift();
// Force-cast the element since we know that the buffer contains
// at least one element and JS' execution model prohibits other
// interleaving fibers to modify the buffer (=> no race condition).
//
// Also, we cannot check for shift() returning undefined as the queue
// might well contain "undefined" as such.
return (dequeuedElement as T);
else
throw new NoElementError();
public size(): number
return this.buffer.length;
public async *[Symbol.asyncIterator](): AsyncIterableIterator<T>
while (true)
yield this.dequeue();
AsyncLimitedQueue:
import IAsyncQueue, AsyncQueue from './AsyncQueue';
import ISemaphore, Semaphore from '../semaphore/index';
/**
* Asynchronous entrance-limited FIFO queue with a Promise-driven dequeue operation.
*
* Contrary to @link IAsyncQueue, the queue operation is Promise-driven as well,
* e.g. implementations might delay entrance into the queue, e.g. to enforce a
* limit on the number of elements stored in the queue at the same time, cf.
* @link AsyncLimitedQueue.
* Other types of entrance limitations are conceivable as well, such as a
* restriction on the sum of contained elements in case of a number queue.
*
* All element values are allowed, especially falsy ones, e.g.
* false, 0, undefined, null, , are all valid elements which
* can be queued and dequeued.
*
* @link queue IAsyncLimitedQueue#queue operations are possibly delayed and
* executed in implementation-dependenent order.
*
* @example Issueing multiple @link queue operations without awaiting the
* previous ones may result in implementation-defined insertion order.
* ```
* queue.queue(1);
* queue.queue(2);
*
* await queue.dequeue(); // can be 1 or 2
* await queue.dequeue(); // can be 1 or 2 as well (the remaining number)
* ```
*
* @example If you would like to retain the order, await the @link queue
* operations, use @link queueAll IAsyncLimitedQueue#queueAll or
* @link queueAllAsync IAsyncLimitedQueue#queueAllAsync.
* ```
* await queue.queue(1);
* await queue.queue(2);
* ```
* ```
* queue.queueAll([1, 2]);
* ```
*
* The @link AsyncIterable interface iterates the queue's (future) contents
* ad infinitum. Users are advised to signal the end by manual insertion of a
* special value (a so-called poison pill), see @link IAsyncQueue.
*/
export interface IAsyncLimitedQueue<T> extends AsyncIterable<T>
/**
* Queue an element, waiting for entrance if necessary.
*
* @example
* ```
* queue.queue(42).then(() =>
* // 42 is now stored within the queue
* );
* ```
*/
queue(data: T): Promise<void>;
/**
* Queue all elements of an iterable, e.g. an array or a generator function.
* @see IAsyncQueue#queueAll
*/
queueAll(iterable: Iterable<T>): Promise<void>;
/**
* Queue all elements of an asynchronous iterable, e.g. an asynchronous
* generator functions.
*
* @see IAsyncQueue#queueAllAsync
*/
queueAllAsync(iterable: AsyncIterable<T>): Promise<void>;
/**
* Offer an element, only queueing it if entrance is available at the time
* of the call.
*
* @returns True if the element could be inserted right away. False
* otherwise.
*/
offer(data: T): boolean;
/**
* Offer all elements of an iterable for in-order insertion.
*
* @param iterable An iterable whose first (limit - queue.size()) elements
* will be inserted. Iterables which iterate an infinite
* number of elements can also be passed and will *not*
* result in an endless loop.
*
* @returns The number of elements, which could be inserted right away.
* Possibly 0 when the queue was full at the time of the call.
*/
offerAll(iterable: Iterable<T>): number;
/**
* Offer all elements of an asynchronous iterable for in-order insertion.
*
* @param iterable An iterable whose elements will be @link offered
* in-order for this queue.
* The method will stop querying and offering further
* elements upon the first @link offer call, which
* returns `false`.
* <br>
* Contrary to @link offerAll, iterables iterating an
* infinite number of elements might prevent the Promise,
* which @link offerAllAsync returns, from ever resolving.
* <br>
* This depends on @link dequeue operations which could
* get scheduled by the JS VM while elements from the passed
* asynchronous iterator are accessed.
*
* @returns A promise resolving to the number of elements, which could be
* inserted (offered successfully) consecutively without waiting.
* Possibly 0 when the queue was full at the time of the call.
* Fulfillment of this promise is not guaranteed in case of infinite
* iterables.
*/
offerAllAsync(iterable: AsyncIterable<T>): Promise<number>;
/**
* Dequeue an element if available or throw an exception otherwise.
*
* @returns The first element of the queue.
* @throws An exception if the queue is empty at the time of the call.
*/
poll(): T;
/**
* Dequeue an element, waiting for data to be available if necessary.
*
* @returns A promise which is fulfilled when an element (as queued by
* queue()) becomes available.
* If multiple dequeus() are issued sequentially, it is
* implementation-defined whether they are fulfilled in the same
* order or not. However, the data is still retrieved in FIFO
* fashion, meaning the first fulfilled promise gets the first
* element, the second fulfilled the second one and so forth.
*/
dequeue(): Promise<T>;
/**
* Return the current size at the moment of the call.
*
* Even though code like
* ```
* if (queue.size() >= 1)
* const element = queue.poll();
*
* ```
* is technically not wrong (due to JS' execution model), users are
* advised to avoid this pattern. Instead, users are encouraged to
*
* - in cases where waiting for a promise is impossible, to use
* @link poll and catch the exception,
* - or to use @link dequeue with JS' `await` or
* `queue.dequeue().then(...)`.
*/
size(): number;
/**
* Asynchronous element-limited FIFO queue with a Promise-driven dequeue operation.
*
* @link AsyncLimitedQueue#queue operations are delayed (in unspecified order)
* until space becomes available through dequeue operations.
*/
export class AsyncLimitedQueue<T> implements IAsyncLimitedQueue<T>
private limitSem: ISemaphore;
/**
* Initialize the queue.
* @param limit A integer >= 1 specifying the number of elements after which
* queue() effectively blocks (i.e. the promise returned by it
* does not get "immediately" fulfilled for some informal value
* of immediately).
* @param storageQueue An asynchronous (non-limiting) queue backing the data.
* It defaults to a AsyncQueue.
*
* @throws An exception in case the limit is not an integer or is <= 0.
*/
public constructor(limit: number, private storageQueue: IAsyncQueue<T> = new AsyncQueue())
public async queue(data: T): Promise<void>
await this.limitSem.take();
this.storageQueue.queue(data);
public async queueAll(iterable: Iterable<T>): Promise<void>
for (const element of iterable)
await this.queue(element);
public async queueAllAsync(iterable: AsyncIterable<T>): Promise<void>
for await (const element of iterable)
await this.queue(element);
public offer(data: T): boolean
if (this.limitSem.tryTake())
this.storageQueue.queue(data);
return true;
else
return false;
public offerAll(iterable: Iterable<T>): number
let insertedElements = 0;
for (const element of iterable)
if (!this.offer(element))
return insertedElements;
insertedElements++;
return insertedElements;
public async offerAllAsync(iterable: AsyncIterable<T>): Promise<number>
let insertedElements = 0;
for await (const element of iterable)
if (!this.offer(element))
return insertedElements;
insertedElements++;
return insertedElements;
public async dequeue(): Promise<T>
return this.storageQueue.dequeue().then(element =>
this.limitSem.free();
return element;
);
public poll(): T
return this.storageQueue.poll();
public async *[Symbol.asyncIterator](): AsyncIterableIterator<T>
while (true)
yield this.dequeue();
public size(): number
return this.storageQueue.size();
javascript queue promise async-await typescript
I implemented an AsyncQueue, whose dequeue() operation returns a Promise. The AsyncLimitedQueue additionally enforces a limit on the number of entries, so that queue() returns a Promise as well.
- Full code + unit tests: https://github.com/ComFreek/async-playground
- Generated documentation: https://comfreek.github.io/async-playground
Both classes use Semaphore, which I also built using Promises. For the sake of brevity, I would like to restrain this question to the queue implementations.
Usage example
Excerpt from https://github.com/ComFreek/async-playground/blob/master/examples/queue-stdio-lines.ts:
async function* readInput() null> = new AsyncQueue();
// readline is an NPM package simplifying reading lines from stdio
const rl = readline.createInterface( /* ... */ );
rl.on('line', (line: string) => queue.queue(line));
rl.on('close', () => queue.queue(null));
yield* queue;
for await (const line of readInput())
if (line === null)
break;
console.log(line);
Questions
Have I followed best practices?
Are the method contracts clean? Especially, have I employed asynchronous operations and promises where appropriate?
Having both interfaces
IAsyncQueueandIAsyncLimitedQueue, I intentionally did not create a common superinterface. Was this a right decision?The reason is that
IAsyncQueue#queueand similar operations are synchronous and therefore non-blocking contrary toIAsyncLimitedQueue#queue, which is asynchronous.
This difference is also manifested in their types,voidvsPromise<void>. MakingAsyncQueue#queuereturning a Promise as well would violate the principle of least astonishment, namely that it is synchronous despite its return type.
Code
Online: repo, docs
AsyncQueue.ts
import ISemaphore, Semaphore from '../semaphore/index';
/**
* Asynchronous FIFO queue with a Promise-driven dequeue operation.
*
* All element values are allowed, especially falsy ones, e.g.
* false, 0, undefined, null, , are all valid elements which
* can be queued and dequeued.
*
* The @link AsyncIterable interface iterates the queue's (future) contents
* ad infinitum. Users are advised to signal the end by manual insertion of a
* special value (a so-called poison pill):
*
* ```
* const queue = new AsyncQueue<string|null>();
* file.on('data', (data) => queue.queue(data));
* file.on('close', () => queue.queue(null));
*
* for await (const data of queue)
* if (data === null)
* break;
*
* // Otherwise, process data
*
* ```
*/
export interface IAsyncQueue<T> extends AsyncIterable<T>
/**
* Queue an element immediately.
*/
queue(data: T): void;
/**
* Queue all elements of an iterable, e.g. an array or a generator function.
*
* @example `queue.queueAll(['myArray', 'of', 'strings'])`
*
* @example If one has a generator function f:
* `function *f(): Iterable<string> ... `
* then you can call `queue.queueAll(f())`.
*/
queueAll(iterable: Iterable<T>): void;
/**
* Queue all elements of an asynchronous iterable, e.g. an asynchronous
* generator functions.
*
* @example Using an asynchronous generator function:
* ```
* async function *f(): AsyncIterable<string>
* yield* ['Array', 'of', 'strings'];
*
*
* const previousSize = queue.size();
* queue.queueAllAsync(f());
* // ^ We do not await the queueing!
* // Therefore: queue.size() === previousSize here!
* // This is indeed guaranteed by JS' execution model. There is
* // no way queueAllAsync could have queried an element from f()
* // asynchronously using a promise before this code gives up
* // the "CPU power" by await or yield.
*
* await queue.dequeue(); // 'Array'
* await queue.dequeue(); // 'of'
* await queue.dequeue(); // 'strings'
*
* // queue.size() === 0 and queue.dequeue() would block
* // ad infinitum
*
* await queue.queueAllAsync(f());
* // We now await the queueing!
* // Therefore: queue.size() === 3 here!
* ```
*
* @example AsyncQueue instances are also asynchronous iterables,
* meaning that you can stack multiple queues together:
* ```
* const backgroundQueue: IAsyncQueue<string> = new AsyncQueue();
* const foregroundQueue: IAsyncQueue<string> = new AsyncQueue();
*
* setTimeout(() => backgroundQueue.queue('Hello World!'), 100);
*
* foregroundQueue.queueAllAsync(backgroundQueue);
* const retrievedString = await foregroundQueue.dequeue();
*
* // retrievedString === 'Hello World!'
* ```
*/
queueAllAsync(iterable: AsyncIterable<T>): Promise<void>;
/**
* Dequeue an element, waiting for data to be available if necessary.
*
* @returns A promise which is fulfilled when an element (as queued by
* queue()) becomes available.
* If multiple dequeus() are issued sequentially, it is
* implementation-defined whether they are fulfilled in the same
* order or not. However, the data is still retrieved in FIFO
* fashion, meaning the first fulfilled promise gets the first
* element, the second fulfilled the second one and so forth.
*/
dequeue(): Promise<T>;
/**
* Dequeue an element if available or throw an exception otherwise.
*
* @returns The first element of the queue.
* @throws An exception if the queue is empty at the time of the call.
*/
poll(): T;
/**
* Return the current size at the moment of the call.
*
* Even though code like
* ```
* if (queue.size() >= 1)
* const element = queue.poll();
*
* ```
* is technically not wrong (due to JS' execution model), users are
* advised to avoid this pattern. Instead, users are encouraged to
*
* - in cases where waiting for a promise is impossible, to use
* @link poll and catch the exception,
* - or to use @link dequeue with JS' `await` or
* `queue.dequeue().then(...)`.
*/
size(): number;
export class NoElementError extends Error
export class AsyncQueue<T> implements IAsyncQueue<T>
private buffer: T = ;
private elementSem: ISemaphore = new Semaphore(0);
public queue(data: T): void
this.buffer.push(data);
this.elementSem.free();
public queueAll(iterable: Iterable<T>): void
for (const element of iterable)
this.queue(element);
public async queueAllAsync(iterable: AsyncIterable<T>): Promise<void>
for await (const element of iterable)
this.queue(element);
public async dequeue(): Promise<T>
await this.elementSem.take();
try
return this.poll();
catch (err)
if (err instanceof NoElementError)
throw new Error('AsyncQueue dequeue: poll() threw an exception
even though dequeue() waited for its element semaphore to be available via take().');
else
throw err;
public poll(): T
if (this.buffer.length >= 1)
const dequeuedElement = this.buffer.shift();
// Force-cast the element since we know that the buffer contains
// at least one element and JS' execution model prohibits other
// interleaving fibers to modify the buffer (=> no race condition).
//
// Also, we cannot check for shift() returning undefined as the queue
// might well contain "undefined" as such.
return (dequeuedElement as T);
else
throw new NoElementError();
public size(): number
return this.buffer.length;
public async *[Symbol.asyncIterator](): AsyncIterableIterator<T>
while (true)
yield this.dequeue();
AsyncLimitedQueue:
import IAsyncQueue, AsyncQueue from './AsyncQueue';
import ISemaphore, Semaphore from '../semaphore/index';
/**
* Asynchronous entrance-limited FIFO queue with a Promise-driven dequeue operation.
*
* Contrary to @link IAsyncQueue, the queue operation is Promise-driven as well,
* e.g. implementations might delay entrance into the queue, e.g. to enforce a
* limit on the number of elements stored in the queue at the same time, cf.
* @link AsyncLimitedQueue.
* Other types of entrance limitations are conceivable as well, such as a
* restriction on the sum of contained elements in case of a number queue.
*
* All element values are allowed, especially falsy ones, e.g.
* false, 0, undefined, null, , are all valid elements which
* can be queued and dequeued.
*
* @link queue IAsyncLimitedQueue#queue operations are possibly delayed and
* executed in implementation-dependenent order.
*
* @example Issueing multiple @link queue operations without awaiting the
* previous ones may result in implementation-defined insertion order.
* ```
* queue.queue(1);
* queue.queue(2);
*
* await queue.dequeue(); // can be 1 or 2
* await queue.dequeue(); // can be 1 or 2 as well (the remaining number)
* ```
*
* @example If you would like to retain the order, await the @link queue
* operations, use @link queueAll IAsyncLimitedQueue#queueAll or
* @link queueAllAsync IAsyncLimitedQueue#queueAllAsync.
* ```
* await queue.queue(1);
* await queue.queue(2);
* ```
* ```
* queue.queueAll([1, 2]);
* ```
*
* The @link AsyncIterable interface iterates the queue's (future) contents
* ad infinitum. Users are advised to signal the end by manual insertion of a
* special value (a so-called poison pill), see @link IAsyncQueue.
*/
export interface IAsyncLimitedQueue<T> extends AsyncIterable<T>
/**
* Queue an element, waiting for entrance if necessary.
*
* @example
* ```
* queue.queue(42).then(() =>
* // 42 is now stored within the queue
* );
* ```
*/
queue(data: T): Promise<void>;
/**
* Queue all elements of an iterable, e.g. an array or a generator function.
* @see IAsyncQueue#queueAll
*/
queueAll(iterable: Iterable<T>): Promise<void>;
/**
* Queue all elements of an asynchronous iterable, e.g. an asynchronous
* generator functions.
*
* @see IAsyncQueue#queueAllAsync
*/
queueAllAsync(iterable: AsyncIterable<T>): Promise<void>;
/**
* Offer an element, only queueing it if entrance is available at the time
* of the call.
*
* @returns True if the element could be inserted right away. False
* otherwise.
*/
offer(data: T): boolean;
/**
* Offer all elements of an iterable for in-order insertion.
*
* @param iterable An iterable whose first (limit - queue.size()) elements
* will be inserted. Iterables which iterate an infinite
* number of elements can also be passed and will *not*
* result in an endless loop.
*
* @returns The number of elements, which could be inserted right away.
* Possibly 0 when the queue was full at the time of the call.
*/
offerAll(iterable: Iterable<T>): number;
/**
* Offer all elements of an asynchronous iterable for in-order insertion.
*
* @param iterable An iterable whose elements will be @link offered
* in-order for this queue.
* The method will stop querying and offering further
* elements upon the first @link offer call, which
* returns `false`.
* <br>
* Contrary to @link offerAll, iterables iterating an
* infinite number of elements might prevent the Promise,
* which @link offerAllAsync returns, from ever resolving.
* <br>
* This depends on @link dequeue operations which could
* get scheduled by the JS VM while elements from the passed
* asynchronous iterator are accessed.
*
* @returns A promise resolving to the number of elements, which could be
* inserted (offered successfully) consecutively without waiting.
* Possibly 0 when the queue was full at the time of the call.
* Fulfillment of this promise is not guaranteed in case of infinite
* iterables.
*/
offerAllAsync(iterable: AsyncIterable<T>): Promise<number>;
/**
* Dequeue an element if available or throw an exception otherwise.
*
* @returns The first element of the queue.
* @throws An exception if the queue is empty at the time of the call.
*/
poll(): T;
/**
* Dequeue an element, waiting for data to be available if necessary.
*
* @returns A promise which is fulfilled when an element (as queued by
* queue()) becomes available.
* If multiple dequeus() are issued sequentially, it is
* implementation-defined whether they are fulfilled in the same
* order or not. However, the data is still retrieved in FIFO
* fashion, meaning the first fulfilled promise gets the first
* element, the second fulfilled the second one and so forth.
*/
dequeue(): Promise<T>;
/**
* Return the current size at the moment of the call.
*
* Even though code like
* ```
* if (queue.size() >= 1)
* const element = queue.poll();
*
* ```
* is technically not wrong (due to JS' execution model), users are
* advised to avoid this pattern. Instead, users are encouraged to
*
* - in cases where waiting for a promise is impossible, to use
* @link poll and catch the exception,
* - or to use @link dequeue with JS' `await` or
* `queue.dequeue().then(...)`.
*/
size(): number;
/**
* Asynchronous element-limited FIFO queue with a Promise-driven dequeue operation.
*
* @link AsyncLimitedQueue#queue operations are delayed (in unspecified order)
* until space becomes available through dequeue operations.
*/
export class AsyncLimitedQueue<T> implements IAsyncLimitedQueue<T>
private limitSem: ISemaphore;
/**
* Initialize the queue.
* @param limit A integer >= 1 specifying the number of elements after which
* queue() effectively blocks (i.e. the promise returned by it
* does not get "immediately" fulfilled for some informal value
* of immediately).
* @param storageQueue An asynchronous (non-limiting) queue backing the data.
* It defaults to a AsyncQueue.
*
* @throws An exception in case the limit is not an integer or is <= 0.
*/
public constructor(limit: number, private storageQueue: IAsyncQueue<T> = new AsyncQueue())
public async queue(data: T): Promise<void>
await this.limitSem.take();
this.storageQueue.queue(data);
public async queueAll(iterable: Iterable<T>): Promise<void>
for (const element of iterable)
await this.queue(element);
public async queueAllAsync(iterable: AsyncIterable<T>): Promise<void>
for await (const element of iterable)
await this.queue(element);
public offer(data: T): boolean
if (this.limitSem.tryTake())
this.storageQueue.queue(data);
return true;
else
return false;
public offerAll(iterable: Iterable<T>): number
let insertedElements = 0;
for (const element of iterable)
if (!this.offer(element))
return insertedElements;
insertedElements++;
return insertedElements;
public async offerAllAsync(iterable: AsyncIterable<T>): Promise<number>
let insertedElements = 0;
for await (const element of iterable)
if (!this.offer(element))
return insertedElements;
insertedElements++;
return insertedElements;
public async dequeue(): Promise<T>
return this.storageQueue.dequeue().then(element =>
this.limitSem.free();
return element;
);
public poll(): T
return this.storageQueue.poll();
public async *[Symbol.asyncIterator](): AsyncIterableIterator<T>
while (true)
yield this.dequeue();
public size(): number
return this.storageQueue.size();
javascript queue promise async-await typescript
edited Mar 6 at 14:28
asked Mar 4 at 16:13
ComFreek
608617
608617
add a comment |Â
add a comment |Â
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f188795%2fblocking-promise-driven-queues-with-ecmascripts-async-await-and-generators%23new-answer', 'question_page');
);
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password