Blocking Promise-driven queues with ECMAScript's async/await and generators

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;







up vote
3
down vote

favorite












I implemented an AsyncQueue, whose dequeue() operation returns a Promise. The AsyncLimitedQueue additionally enforces a limit on the number of entries, so that queue() returns a Promise as well.



  • Full code + unit tests: https://github.com/ComFreek/async-playground

  • Generated documentation: https://comfreek.github.io/async-playground

Both classes use Semaphore, which I also built using Promises. For the sake of brevity, I would like to restrain this question to the queue implementations.



Usage example



Excerpt from https://github.com/ComFreek/async-playground/blob/master/examples/queue-stdio-lines.ts:



async function* readInput() null> = new AsyncQueue();
// readline is an NPM package simplifying reading lines from stdio
const rl = readline.createInterface( /* ... */ );

rl.on('line', (line: string) => queue.queue(line));
rl.on('close', () => queue.queue(null));

yield* queue;


for await (const line of readInput())
if (line === null)
break;

console.log(line);



Questions



  • Have I followed best practices?


  • Are the method contracts clean? Especially, have I employed asynchronous operations and promises where appropriate?



  • Having both interfaces IAsyncQueue and IAsyncLimitedQueue, I intentionally did not create a common superinterface. Was this a right decision?



    The reason is that IAsyncQueue#queue and similar operations are synchronous and therefore non-blocking contrary to IAsyncLimitedQueue#queue, which is asynchronous.

    This difference is also manifested in their types, void vs Promise<void>. Making AsyncQueue#queue returning a Promise as well would violate the principle of least astonishment, namely that it is synchronous despite its return type.



Code



Online: repo, docs



AsyncQueue.ts



import ISemaphore, Semaphore from '../semaphore/index';

/**
* Asynchronous FIFO queue with a Promise-driven dequeue operation.
*
* All element values are allowed, especially falsy ones, e.g.
* false, 0, undefined, null, , are all valid elements which
* can be queued and dequeued.
*
* The @link AsyncIterable interface iterates the queue's (future) contents
* ad infinitum. Users are advised to signal the end by manual insertion of a
* special value (a so-called poison pill):
*
* ```
* const queue = new AsyncQueue<string|null>();
* file.on('data', (data) => queue.queue(data));
* file.on('close', () => queue.queue(null));
*
* for await (const data of queue)
* if (data === null)
* break;
*
* // Otherwise, process data
*
* ```
*/
export interface IAsyncQueue<T> extends AsyncIterable<T>
/**
* Queue an element immediately.
*/
queue(data: T): void;

/**
* Queue all elements of an iterable, e.g. an array or a generator function.
*
* @example `queue.queueAll(['myArray', 'of', 'strings'])`
*
* @example If one has a generator function f:
* `function *f(): Iterable<string> ... `
* then you can call `queue.queueAll(f())`.
*/
queueAll(iterable: Iterable<T>): void;

/**
* Queue all elements of an asynchronous iterable, e.g. an asynchronous
* generator functions.
*
* @example Using an asynchronous generator function:
* ```
* async function *f(): AsyncIterable<string>
* yield* ['Array', 'of', 'strings'];
*
*
* const previousSize = queue.size();
* queue.queueAllAsync(f());
* // ^ We do not await the queueing!
* // Therefore: queue.size() === previousSize here!
* // This is indeed guaranteed by JS' execution model. There is
* // no way queueAllAsync could have queried an element from f()
* // asynchronously using a promise before this code gives up
* // the "CPU power" by await or yield.
*
* await queue.dequeue(); // 'Array'
* await queue.dequeue(); // 'of'
* await queue.dequeue(); // 'strings'
*
* // queue.size() === 0 and queue.dequeue() would block
* // ad infinitum
*
* await queue.queueAllAsync(f());
* // We now await the queueing!
* // Therefore: queue.size() === 3 here!
* ```
*
* @example AsyncQueue instances are also asynchronous iterables,
* meaning that you can stack multiple queues together:
* ```
* const backgroundQueue: IAsyncQueue<string> = new AsyncQueue();
* const foregroundQueue: IAsyncQueue<string> = new AsyncQueue();
*
* setTimeout(() => backgroundQueue.queue('Hello World!'), 100);
*
* foregroundQueue.queueAllAsync(backgroundQueue);
* const retrievedString = await foregroundQueue.dequeue();
*
* // retrievedString === 'Hello World!'
* ```
*/
queueAllAsync(iterable: AsyncIterable<T>): Promise<void>;

/**
* Dequeue an element, waiting for data to be available if necessary.
*
* @returns A promise which is fulfilled when an element (as queued by
* queue()) becomes available.
* If multiple dequeus() are issued sequentially, it is
* implementation-defined whether they are fulfilled in the same
* order or not. However, the data is still retrieved in FIFO
* fashion, meaning the first fulfilled promise gets the first
* element, the second fulfilled the second one and so forth.
*/
dequeue(): Promise<T>;

/**
* Dequeue an element if available or throw an exception otherwise.
*
* @returns The first element of the queue.
* @throws An exception if the queue is empty at the time of the call.
*/
poll(): T;

/**
* Return the current size at the moment of the call.
*
* Even though code like
* ```
* if (queue.size() >= 1)
* const element = queue.poll();
*
* ```
* is technically not wrong (due to JS' execution model), users are
* advised to avoid this pattern. Instead, users are encouraged to
*
* - in cases where waiting for a promise is impossible, to use
* @link poll and catch the exception,
* - or to use @link dequeue with JS' `await` or
* `queue.dequeue().then(...)`.
*/
size(): number;


export class NoElementError extends Error


export class AsyncQueue<T> implements IAsyncQueue<T>
private buffer: T = ;
private elementSem: ISemaphore = new Semaphore(0);

public queue(data: T): void
this.buffer.push(data);
this.elementSem.free();


public queueAll(iterable: Iterable<T>): void
for (const element of iterable)
this.queue(element);



public async queueAllAsync(iterable: AsyncIterable<T>): Promise<void>
for await (const element of iterable)
this.queue(element);



public async dequeue(): Promise<T>
await this.elementSem.take();

try
return this.poll();

catch (err)
if (err instanceof NoElementError)
throw new Error('AsyncQueue dequeue: poll() threw an exception
even though dequeue() waited for its element semaphore to be available via take().');

else
throw err;




public poll(): T
if (this.buffer.length >= 1)
const dequeuedElement = this.buffer.shift();

// Force-cast the element since we know that the buffer contains
// at least one element and JS' execution model prohibits other
// interleaving fibers to modify the buffer (=> no race condition).
//
// Also, we cannot check for shift() returning undefined as the queue
// might well contain "undefined" as such.
return (dequeuedElement as T);

else
throw new NoElementError();



public size(): number
return this.buffer.length;


public async *[Symbol.asyncIterator](): AsyncIterableIterator<T>
while (true)
yield this.dequeue();





AsyncLimitedQueue:



import IAsyncQueue, AsyncQueue from './AsyncQueue';
import ISemaphore, Semaphore from '../semaphore/index';

/**
* Asynchronous entrance-limited FIFO queue with a Promise-driven dequeue operation.
*
* Contrary to @link IAsyncQueue, the queue operation is Promise-driven as well,
* e.g. implementations might delay entrance into the queue, e.g. to enforce a
* limit on the number of elements stored in the queue at the same time, cf.
* @link AsyncLimitedQueue.
* Other types of entrance limitations are conceivable as well, such as a
* restriction on the sum of contained elements in case of a number queue.
*
* All element values are allowed, especially falsy ones, e.g.
* false, 0, undefined, null, , are all valid elements which
* can be queued and dequeued.
*
* @link queue IAsyncLimitedQueue#queue operations are possibly delayed and
* executed in implementation-dependenent order.
*
* @example Issueing multiple @link queue operations without awaiting the
* previous ones may result in implementation-defined insertion order.
* ```
* queue.queue(1);
* queue.queue(2);
*
* await queue.dequeue(); // can be 1 or 2
* await queue.dequeue(); // can be 1 or 2 as well (the remaining number)
* ```
*
* @example If you would like to retain the order, await the @link queue
* operations, use @link queueAll IAsyncLimitedQueue#queueAll or
* @link queueAllAsync IAsyncLimitedQueue#queueAllAsync.
* ```
* await queue.queue(1);
* await queue.queue(2);
* ```
* ```
* queue.queueAll([1, 2]);
* ```
*
* The @link AsyncIterable interface iterates the queue's (future) contents
* ad infinitum. Users are advised to signal the end by manual insertion of a
* special value (a so-called poison pill), see @link IAsyncQueue.
*/
export interface IAsyncLimitedQueue<T> extends AsyncIterable<T>
/**
* Queue an element, waiting for entrance if necessary.
*
* @example
* ```
* queue.queue(42).then(() =>
* // 42 is now stored within the queue
* );
* ```
*/
queue(data: T): Promise<void>;

/**
* Queue all elements of an iterable, e.g. an array or a generator function.
* @see IAsyncQueue#queueAll
*/
queueAll(iterable: Iterable<T>): Promise<void>;

/**
* Queue all elements of an asynchronous iterable, e.g. an asynchronous
* generator functions.
*
* @see IAsyncQueue#queueAllAsync
*/
queueAllAsync(iterable: AsyncIterable<T>): Promise<void>;

/**
* Offer an element, only queueing it if entrance is available at the time
* of the call.
*
* @returns True if the element could be inserted right away. False
* otherwise.
*/
offer(data: T): boolean;

/**
* Offer all elements of an iterable for in-order insertion.
*
* @param iterable An iterable whose first (limit - queue.size()) elements
* will be inserted. Iterables which iterate an infinite
* number of elements can also be passed and will *not*
* result in an endless loop.
*
* @returns The number of elements, which could be inserted right away.
* Possibly 0 when the queue was full at the time of the call.
*/
offerAll(iterable: Iterable<T>): number;

/**
* Offer all elements of an asynchronous iterable for in-order insertion.
*
* @param iterable An iterable whose elements will be @link offered
* in-order for this queue.
* The method will stop querying and offering further
* elements upon the first @link offer call, which
* returns `false`.
* <br>
* Contrary to @link offerAll, iterables iterating an
* infinite number of elements might prevent the Promise,
* which @link offerAllAsync returns, from ever resolving.
* <br>
* This depends on @link dequeue operations which could
* get scheduled by the JS VM while elements from the passed
* asynchronous iterator are accessed.
*
* @returns A promise resolving to the number of elements, which could be
* inserted (offered successfully) consecutively without waiting.
* Possibly 0 when the queue was full at the time of the call.
* Fulfillment of this promise is not guaranteed in case of infinite
* iterables.
*/
offerAllAsync(iterable: AsyncIterable<T>): Promise<number>;

/**
* Dequeue an element if available or throw an exception otherwise.
*
* @returns The first element of the queue.
* @throws An exception if the queue is empty at the time of the call.
*/
poll(): T;

/**
* Dequeue an element, waiting for data to be available if necessary.
*
* @returns A promise which is fulfilled when an element (as queued by
* queue()) becomes available.
* If multiple dequeus() are issued sequentially, it is
* implementation-defined whether they are fulfilled in the same
* order or not. However, the data is still retrieved in FIFO
* fashion, meaning the first fulfilled promise gets the first
* element, the second fulfilled the second one and so forth.
*/
dequeue(): Promise<T>;

/**
* Return the current size at the moment of the call.
*
* Even though code like
* ```
* if (queue.size() >= 1)
* const element = queue.poll();
*
* ```
* is technically not wrong (due to JS' execution model), users are
* advised to avoid this pattern. Instead, users are encouraged to
*
* - in cases where waiting for a promise is impossible, to use
* @link poll and catch the exception,
* - or to use @link dequeue with JS' `await` or
* `queue.dequeue().then(...)`.
*/
size(): number;


/**
* Asynchronous element-limited FIFO queue with a Promise-driven dequeue operation.
*
* @link AsyncLimitedQueue#queue operations are delayed (in unspecified order)
* until space becomes available through dequeue operations.
*/
export class AsyncLimitedQueue<T> implements IAsyncLimitedQueue<T>
private limitSem: ISemaphore;

/**
* Initialize the queue.
* @param limit A integer >= 1 specifying the number of elements after which
* queue() effectively blocks (i.e. the promise returned by it
* does not get "immediately" fulfilled for some informal value
* of immediately).
* @param storageQueue An asynchronous (non-limiting) queue backing the data.
* It defaults to a AsyncQueue.
*
* @throws An exception in case the limit is not an integer or is <= 0.
*/
public constructor(limit: number, private storageQueue: IAsyncQueue<T> = new AsyncQueue())

public async queue(data: T): Promise<void>
await this.limitSem.take();
this.storageQueue.queue(data);


public async queueAll(iterable: Iterable<T>): Promise<void>
for (const element of iterable)
await this.queue(element);



public async queueAllAsync(iterable: AsyncIterable<T>): Promise<void>
for await (const element of iterable)
await this.queue(element);



public offer(data: T): boolean
if (this.limitSem.tryTake())
this.storageQueue.queue(data);
return true;

else
return false;



public offerAll(iterable: Iterable<T>): number
let insertedElements = 0;

for (const element of iterable)
if (!this.offer(element))
return insertedElements;


insertedElements++;


return insertedElements;


public async offerAllAsync(iterable: AsyncIterable<T>): Promise<number>
let insertedElements = 0;

for await (const element of iterable)
if (!this.offer(element))
return insertedElements;


insertedElements++;


return insertedElements;


public async dequeue(): Promise<T>
return this.storageQueue.dequeue().then(element =>
this.limitSem.free();
return element;
);


public poll(): T
return this.storageQueue.poll();


public async *[Symbol.asyncIterator](): AsyncIterableIterator<T>
while (true)
yield this.dequeue();



public size(): number
return this.storageQueue.size();








share|improve this question



























    up vote
    3
    down vote

    favorite












    I implemented an AsyncQueue, whose dequeue() operation returns a Promise. The AsyncLimitedQueue additionally enforces a limit on the number of entries, so that queue() returns a Promise as well.



    • Full code + unit tests: https://github.com/ComFreek/async-playground

    • Generated documentation: https://comfreek.github.io/async-playground

    Both classes use Semaphore, which I also built using Promises. For the sake of brevity, I would like to restrain this question to the queue implementations.



    Usage example



    Excerpt from https://github.com/ComFreek/async-playground/blob/master/examples/queue-stdio-lines.ts:



    async function* readInput() null> = new AsyncQueue();
    // readline is an NPM package simplifying reading lines from stdio
    const rl = readline.createInterface( /* ... */ );

    rl.on('line', (line: string) => queue.queue(line));
    rl.on('close', () => queue.queue(null));

    yield* queue;


    for await (const line of readInput())
    if (line === null)
    break;

    console.log(line);



    Questions



    • Have I followed best practices?


    • Are the method contracts clean? Especially, have I employed asynchronous operations and promises where appropriate?



    • Having both interfaces IAsyncQueue and IAsyncLimitedQueue, I intentionally did not create a common superinterface. Was this a right decision?



      The reason is that IAsyncQueue#queue and similar operations are synchronous and therefore non-blocking contrary to IAsyncLimitedQueue#queue, which is asynchronous.

      This difference is also manifested in their types, void vs Promise<void>. Making AsyncQueue#queue returning a Promise as well would violate the principle of least astonishment, namely that it is synchronous despite its return type.



    Code



    Online: repo, docs



    AsyncQueue.ts



    import ISemaphore, Semaphore from '../semaphore/index';

    /**
    * Asynchronous FIFO queue with a Promise-driven dequeue operation.
    *
    * All element values are allowed, especially falsy ones, e.g.
    * false, 0, undefined, null, , are all valid elements which
    * can be queued and dequeued.
    *
    * The @link AsyncIterable interface iterates the queue's (future) contents
    * ad infinitum. Users are advised to signal the end by manual insertion of a
    * special value (a so-called poison pill):
    *
    * ```
    * const queue = new AsyncQueue<string|null>();
    * file.on('data', (data) => queue.queue(data));
    * file.on('close', () => queue.queue(null));
    *
    * for await (const data of queue)
    * if (data === null)
    * break;
    *
    * // Otherwise, process data
    *
    * ```
    */
    export interface IAsyncQueue<T> extends AsyncIterable<T>
    /**
    * Queue an element immediately.
    */
    queue(data: T): void;

    /**
    * Queue all elements of an iterable, e.g. an array or a generator function.
    *
    * @example `queue.queueAll(['myArray', 'of', 'strings'])`
    *
    * @example If one has a generator function f:
    * `function *f(): Iterable<string> ... `
    * then you can call `queue.queueAll(f())`.
    */
    queueAll(iterable: Iterable<T>): void;

    /**
    * Queue all elements of an asynchronous iterable, e.g. an asynchronous
    * generator functions.
    *
    * @example Using an asynchronous generator function:
    * ```
    * async function *f(): AsyncIterable<string>
    * yield* ['Array', 'of', 'strings'];
    *
    *
    * const previousSize = queue.size();
    * queue.queueAllAsync(f());
    * // ^ We do not await the queueing!
    * // Therefore: queue.size() === previousSize here!
    * // This is indeed guaranteed by JS' execution model. There is
    * // no way queueAllAsync could have queried an element from f()
    * // asynchronously using a promise before this code gives up
    * // the "CPU power" by await or yield.
    *
    * await queue.dequeue(); // 'Array'
    * await queue.dequeue(); // 'of'
    * await queue.dequeue(); // 'strings'
    *
    * // queue.size() === 0 and queue.dequeue() would block
    * // ad infinitum
    *
    * await queue.queueAllAsync(f());
    * // We now await the queueing!
    * // Therefore: queue.size() === 3 here!
    * ```
    *
    * @example AsyncQueue instances are also asynchronous iterables,
    * meaning that you can stack multiple queues together:
    * ```
    * const backgroundQueue: IAsyncQueue<string> = new AsyncQueue();
    * const foregroundQueue: IAsyncQueue<string> = new AsyncQueue();
    *
    * setTimeout(() => backgroundQueue.queue('Hello World!'), 100);
    *
    * foregroundQueue.queueAllAsync(backgroundQueue);
    * const retrievedString = await foregroundQueue.dequeue();
    *
    * // retrievedString === 'Hello World!'
    * ```
    */
    queueAllAsync(iterable: AsyncIterable<T>): Promise<void>;

    /**
    * Dequeue an element, waiting for data to be available if necessary.
    *
    * @returns A promise which is fulfilled when an element (as queued by
    * queue()) becomes available.
    * If multiple dequeus() are issued sequentially, it is
    * implementation-defined whether they are fulfilled in the same
    * order or not. However, the data is still retrieved in FIFO
    * fashion, meaning the first fulfilled promise gets the first
    * element, the second fulfilled the second one and so forth.
    */
    dequeue(): Promise<T>;

    /**
    * Dequeue an element if available or throw an exception otherwise.
    *
    * @returns The first element of the queue.
    * @throws An exception if the queue is empty at the time of the call.
    */
    poll(): T;

    /**
    * Return the current size at the moment of the call.
    *
    * Even though code like
    * ```
    * if (queue.size() >= 1)
    * const element = queue.poll();
    *
    * ```
    * is technically not wrong (due to JS' execution model), users are
    * advised to avoid this pattern. Instead, users are encouraged to
    *
    * - in cases where waiting for a promise is impossible, to use
    * @link poll and catch the exception,
    * - or to use @link dequeue with JS' `await` or
    * `queue.dequeue().then(...)`.
    */
    size(): number;


    export class NoElementError extends Error


    export class AsyncQueue<T> implements IAsyncQueue<T>
    private buffer: T = ;
    private elementSem: ISemaphore = new Semaphore(0);

    public queue(data: T): void
    this.buffer.push(data);
    this.elementSem.free();


    public queueAll(iterable: Iterable<T>): void
    for (const element of iterable)
    this.queue(element);



    public async queueAllAsync(iterable: AsyncIterable<T>): Promise<void>
    for await (const element of iterable)
    this.queue(element);



    public async dequeue(): Promise<T>
    await this.elementSem.take();

    try
    return this.poll();

    catch (err)
    if (err instanceof NoElementError)
    throw new Error('AsyncQueue dequeue: poll() threw an exception
    even though dequeue() waited for its element semaphore to be available via take().');

    else
    throw err;




    public poll(): T
    if (this.buffer.length >= 1)
    const dequeuedElement = this.buffer.shift();

    // Force-cast the element since we know that the buffer contains
    // at least one element and JS' execution model prohibits other
    // interleaving fibers to modify the buffer (=> no race condition).
    //
    // Also, we cannot check for shift() returning undefined as the queue
    // might well contain "undefined" as such.
    return (dequeuedElement as T);

    else
    throw new NoElementError();



    public size(): number
    return this.buffer.length;


    public async *[Symbol.asyncIterator](): AsyncIterableIterator<T>
    while (true)
    yield this.dequeue();





    AsyncLimitedQueue:



    import IAsyncQueue, AsyncQueue from './AsyncQueue';
    import ISemaphore, Semaphore from '../semaphore/index';

    /**
    * Asynchronous entrance-limited FIFO queue with a Promise-driven dequeue operation.
    *
    * Contrary to @link IAsyncQueue, the queue operation is Promise-driven as well,
    * e.g. implementations might delay entrance into the queue, e.g. to enforce a
    * limit on the number of elements stored in the queue at the same time, cf.
    * @link AsyncLimitedQueue.
    * Other types of entrance limitations are conceivable as well, such as a
    * restriction on the sum of contained elements in case of a number queue.
    *
    * All element values are allowed, especially falsy ones, e.g.
    * false, 0, undefined, null, , are all valid elements which
    * can be queued and dequeued.
    *
    * @link queue IAsyncLimitedQueue#queue operations are possibly delayed and
    * executed in implementation-dependenent order.
    *
    * @example Issueing multiple @link queue operations without awaiting the
    * previous ones may result in implementation-defined insertion order.
    * ```
    * queue.queue(1);
    * queue.queue(2);
    *
    * await queue.dequeue(); // can be 1 or 2
    * await queue.dequeue(); // can be 1 or 2 as well (the remaining number)
    * ```
    *
    * @example If you would like to retain the order, await the @link queue
    * operations, use @link queueAll IAsyncLimitedQueue#queueAll or
    * @link queueAllAsync IAsyncLimitedQueue#queueAllAsync.
    * ```
    * await queue.queue(1);
    * await queue.queue(2);
    * ```
    * ```
    * queue.queueAll([1, 2]);
    * ```
    *
    * The @link AsyncIterable interface iterates the queue's (future) contents
    * ad infinitum. Users are advised to signal the end by manual insertion of a
    * special value (a so-called poison pill), see @link IAsyncQueue.
    */
    export interface IAsyncLimitedQueue<T> extends AsyncIterable<T>
    /**
    * Queue an element, waiting for entrance if necessary.
    *
    * @example
    * ```
    * queue.queue(42).then(() =>
    * // 42 is now stored within the queue
    * );
    * ```
    */
    queue(data: T): Promise<void>;

    /**
    * Queue all elements of an iterable, e.g. an array or a generator function.
    * @see IAsyncQueue#queueAll
    */
    queueAll(iterable: Iterable<T>): Promise<void>;

    /**
    * Queue all elements of an asynchronous iterable, e.g. an asynchronous
    * generator functions.
    *
    * @see IAsyncQueue#queueAllAsync
    */
    queueAllAsync(iterable: AsyncIterable<T>): Promise<void>;

    /**
    * Offer an element, only queueing it if entrance is available at the time
    * of the call.
    *
    * @returns True if the element could be inserted right away. False
    * otherwise.
    */
    offer(data: T): boolean;

    /**
    * Offer all elements of an iterable for in-order insertion.
    *
    * @param iterable An iterable whose first (limit - queue.size()) elements
    * will be inserted. Iterables which iterate an infinite
    * number of elements can also be passed and will *not*
    * result in an endless loop.
    *
    * @returns The number of elements, which could be inserted right away.
    * Possibly 0 when the queue was full at the time of the call.
    */
    offerAll(iterable: Iterable<T>): number;

    /**
    * Offer all elements of an asynchronous iterable for in-order insertion.
    *
    * @param iterable An iterable whose elements will be @link offered
    * in-order for this queue.
    * The method will stop querying and offering further
    * elements upon the first @link offer call, which
    * returns `false`.
    * <br>
    * Contrary to @link offerAll, iterables iterating an
    * infinite number of elements might prevent the Promise,
    * which @link offerAllAsync returns, from ever resolving.
    * <br>
    * This depends on @link dequeue operations which could
    * get scheduled by the JS VM while elements from the passed
    * asynchronous iterator are accessed.
    *
    * @returns A promise resolving to the number of elements, which could be
    * inserted (offered successfully) consecutively without waiting.
    * Possibly 0 when the queue was full at the time of the call.
    * Fulfillment of this promise is not guaranteed in case of infinite
    * iterables.
    */
    offerAllAsync(iterable: AsyncIterable<T>): Promise<number>;

    /**
    * Dequeue an element if available or throw an exception otherwise.
    *
    * @returns The first element of the queue.
    * @throws An exception if the queue is empty at the time of the call.
    */
    poll(): T;

    /**
    * Dequeue an element, waiting for data to be available if necessary.
    *
    * @returns A promise which is fulfilled when an element (as queued by
    * queue()) becomes available.
    * If multiple dequeus() are issued sequentially, it is
    * implementation-defined whether they are fulfilled in the same
    * order or not. However, the data is still retrieved in FIFO
    * fashion, meaning the first fulfilled promise gets the first
    * element, the second fulfilled the second one and so forth.
    */
    dequeue(): Promise<T>;

    /**
    * Return the current size at the moment of the call.
    *
    * Even though code like
    * ```
    * if (queue.size() >= 1)
    * const element = queue.poll();
    *
    * ```
    * is technically not wrong (due to JS' execution model), users are
    * advised to avoid this pattern. Instead, users are encouraged to
    *
    * - in cases where waiting for a promise is impossible, to use
    * @link poll and catch the exception,
    * - or to use @link dequeue with JS' `await` or
    * `queue.dequeue().then(...)`.
    */
    size(): number;


    /**
    * Asynchronous element-limited FIFO queue with a Promise-driven dequeue operation.
    *
    * @link AsyncLimitedQueue#queue operations are delayed (in unspecified order)
    * until space becomes available through dequeue operations.
    */
    export class AsyncLimitedQueue<T> implements IAsyncLimitedQueue<T>
    private limitSem: ISemaphore;

    /**
    * Initialize the queue.
    * @param limit A integer >= 1 specifying the number of elements after which
    * queue() effectively blocks (i.e. the promise returned by it
    * does not get "immediately" fulfilled for some informal value
    * of immediately).
    * @param storageQueue An asynchronous (non-limiting) queue backing the data.
    * It defaults to a AsyncQueue.
    *
    * @throws An exception in case the limit is not an integer or is <= 0.
    */
    public constructor(limit: number, private storageQueue: IAsyncQueue<T> = new AsyncQueue())

    public async queue(data: T): Promise<void>
    await this.limitSem.take();
    this.storageQueue.queue(data);


    public async queueAll(iterable: Iterable<T>): Promise<void>
    for (const element of iterable)
    await this.queue(element);



    public async queueAllAsync(iterable: AsyncIterable<T>): Promise<void>
    for await (const element of iterable)
    await this.queue(element);



    public offer(data: T): boolean
    if (this.limitSem.tryTake())
    this.storageQueue.queue(data);
    return true;

    else
    return false;



    public offerAll(iterable: Iterable<T>): number
    let insertedElements = 0;

    for (const element of iterable)
    if (!this.offer(element))
    return insertedElements;


    insertedElements++;


    return insertedElements;


    public async offerAllAsync(iterable: AsyncIterable<T>): Promise<number>
    let insertedElements = 0;

    for await (const element of iterable)
    if (!this.offer(element))
    return insertedElements;


    insertedElements++;


    return insertedElements;


    public async dequeue(): Promise<T>
    return this.storageQueue.dequeue().then(element =>
    this.limitSem.free();
    return element;
    );


    public poll(): T
    return this.storageQueue.poll();


    public async *[Symbol.asyncIterator](): AsyncIterableIterator<T>
    while (true)
    yield this.dequeue();



    public size(): number
    return this.storageQueue.size();








    share|improve this question























      up vote
      3
      down vote

      favorite









      up vote
      3
      down vote

      favorite











      I implemented an AsyncQueue, whose dequeue() operation returns a Promise. The AsyncLimitedQueue additionally enforces a limit on the number of entries, so that queue() returns a Promise as well.



      • Full code + unit tests: https://github.com/ComFreek/async-playground

      • Generated documentation: https://comfreek.github.io/async-playground

      Both classes use Semaphore, which I also built using Promises. For the sake of brevity, I would like to restrain this question to the queue implementations.



      Usage example



      Excerpt from https://github.com/ComFreek/async-playground/blob/master/examples/queue-stdio-lines.ts:



      async function* readInput() null> = new AsyncQueue();
      // readline is an NPM package simplifying reading lines from stdio
      const rl = readline.createInterface( /* ... */ );

      rl.on('line', (line: string) => queue.queue(line));
      rl.on('close', () => queue.queue(null));

      yield* queue;


      for await (const line of readInput())
      if (line === null)
      break;

      console.log(line);



      Questions



      • Have I followed best practices?


      • Are the method contracts clean? Especially, have I employed asynchronous operations and promises where appropriate?



      • Having both interfaces IAsyncQueue and IAsyncLimitedQueue, I intentionally did not create a common superinterface. Was this a right decision?



        The reason is that IAsyncQueue#queue and similar operations are synchronous and therefore non-blocking contrary to IAsyncLimitedQueue#queue, which is asynchronous.

        This difference is also manifested in their types, void vs Promise<void>. Making AsyncQueue#queue returning a Promise as well would violate the principle of least astonishment, namely that it is synchronous despite its return type.



      Code



      Online: repo, docs



      AsyncQueue.ts



      import ISemaphore, Semaphore from '../semaphore/index';

      /**
      * Asynchronous FIFO queue with a Promise-driven dequeue operation.
      *
      * All element values are allowed, especially falsy ones, e.g.
      * false, 0, undefined, null, , are all valid elements which
      * can be queued and dequeued.
      *
      * The @link AsyncIterable interface iterates the queue's (future) contents
      * ad infinitum. Users are advised to signal the end by manual insertion of a
      * special value (a so-called poison pill):
      *
      * ```
      * const queue = new AsyncQueue<string|null>();
      * file.on('data', (data) => queue.queue(data));
      * file.on('close', () => queue.queue(null));
      *
      * for await (const data of queue)
      * if (data === null)
      * break;
      *
      * // Otherwise, process data
      *
      * ```
      */
      export interface IAsyncQueue<T> extends AsyncIterable<T>
      /**
      * Queue an element immediately.
      */
      queue(data: T): void;

      /**
      * Queue all elements of an iterable, e.g. an array or a generator function.
      *
      * @example `queue.queueAll(['myArray', 'of', 'strings'])`
      *
      * @example If one has a generator function f:
      * `function *f(): Iterable<string> ... `
      * then you can call `queue.queueAll(f())`.
      */
      queueAll(iterable: Iterable<T>): void;

      /**
      * Queue all elements of an asynchronous iterable, e.g. an asynchronous
      * generator functions.
      *
      * @example Using an asynchronous generator function:
      * ```
      * async function *f(): AsyncIterable<string>
      * yield* ['Array', 'of', 'strings'];
      *
      *
      * const previousSize = queue.size();
      * queue.queueAllAsync(f());
      * // ^ We do not await the queueing!
      * // Therefore: queue.size() === previousSize here!
      * // This is indeed guaranteed by JS' execution model. There is
      * // no way queueAllAsync could have queried an element from f()
      * // asynchronously using a promise before this code gives up
      * // the "CPU power" by await or yield.
      *
      * await queue.dequeue(); // 'Array'
      * await queue.dequeue(); // 'of'
      * await queue.dequeue(); // 'strings'
      *
      * // queue.size() === 0 and queue.dequeue() would block
      * // ad infinitum
      *
      * await queue.queueAllAsync(f());
      * // We now await the queueing!
      * // Therefore: queue.size() === 3 here!
      * ```
      *
      * @example AsyncQueue instances are also asynchronous iterables,
      * meaning that you can stack multiple queues together:
      * ```
      * const backgroundQueue: IAsyncQueue<string> = new AsyncQueue();
      * const foregroundQueue: IAsyncQueue<string> = new AsyncQueue();
      *
      * setTimeout(() => backgroundQueue.queue('Hello World!'), 100);
      *
      * foregroundQueue.queueAllAsync(backgroundQueue);
      * const retrievedString = await foregroundQueue.dequeue();
      *
      * // retrievedString === 'Hello World!'
      * ```
      */
      queueAllAsync(iterable: AsyncIterable<T>): Promise<void>;

      /**
      * Dequeue an element, waiting for data to be available if necessary.
      *
      * @returns A promise which is fulfilled when an element (as queued by
      * queue()) becomes available.
      * If multiple dequeus() are issued sequentially, it is
      * implementation-defined whether they are fulfilled in the same
      * order or not. However, the data is still retrieved in FIFO
      * fashion, meaning the first fulfilled promise gets the first
      * element, the second fulfilled the second one and so forth.
      */
      dequeue(): Promise<T>;

      /**
      * Dequeue an element if available or throw an exception otherwise.
      *
      * @returns The first element of the queue.
      * @throws An exception if the queue is empty at the time of the call.
      */
      poll(): T;

      /**
      * Return the current size at the moment of the call.
      *
      * Even though code like
      * ```
      * if (queue.size() >= 1)
      * const element = queue.poll();
      *
      * ```
      * is technically not wrong (due to JS' execution model), users are
      * advised to avoid this pattern. Instead, users are encouraged to
      *
      * - in cases where waiting for a promise is impossible, to use
      * @link poll and catch the exception,
      * - or to use @link dequeue with JS' `await` or
      * `queue.dequeue().then(...)`.
      */
      size(): number;


      export class NoElementError extends Error


      export class AsyncQueue<T> implements IAsyncQueue<T>
      private buffer: T = ;
      private elementSem: ISemaphore = new Semaphore(0);

      public queue(data: T): void
      this.buffer.push(data);
      this.elementSem.free();


      public queueAll(iterable: Iterable<T>): void
      for (const element of iterable)
      this.queue(element);



      public async queueAllAsync(iterable: AsyncIterable<T>): Promise<void>
      for await (const element of iterable)
      this.queue(element);



      public async dequeue(): Promise<T>
      await this.elementSem.take();

      try
      return this.poll();

      catch (err)
      if (err instanceof NoElementError)
      throw new Error('AsyncQueue dequeue: poll() threw an exception
      even though dequeue() waited for its element semaphore to be available via take().');

      else
      throw err;




      public poll(): T
      if (this.buffer.length >= 1)
      const dequeuedElement = this.buffer.shift();

      // Force-cast the element since we know that the buffer contains
      // at least one element and JS' execution model prohibits other
      // interleaving fibers to modify the buffer (=> no race condition).
      //
      // Also, we cannot check for shift() returning undefined as the queue
      // might well contain "undefined" as such.
      return (dequeuedElement as T);

      else
      throw new NoElementError();



      public size(): number
      return this.buffer.length;


      public async *[Symbol.asyncIterator](): AsyncIterableIterator<T>
      while (true)
      yield this.dequeue();





      AsyncLimitedQueue:



      import IAsyncQueue, AsyncQueue from './AsyncQueue';
      import ISemaphore, Semaphore from '../semaphore/index';

      /**
      * Asynchronous entrance-limited FIFO queue with a Promise-driven dequeue operation.
      *
      * Contrary to @link IAsyncQueue, the queue operation is Promise-driven as well,
      * e.g. implementations might delay entrance into the queue, e.g. to enforce a
      * limit on the number of elements stored in the queue at the same time, cf.
      * @link AsyncLimitedQueue.
      * Other types of entrance limitations are conceivable as well, such as a
      * restriction on the sum of contained elements in case of a number queue.
      *
      * All element values are allowed, especially falsy ones, e.g.
      * false, 0, undefined, null, , are all valid elements which
      * can be queued and dequeued.
      *
      * @link queue IAsyncLimitedQueue#queue operations are possibly delayed and
      * executed in implementation-dependenent order.
      *
      * @example Issueing multiple @link queue operations without awaiting the
      * previous ones may result in implementation-defined insertion order.
      * ```
      * queue.queue(1);
      * queue.queue(2);
      *
      * await queue.dequeue(); // can be 1 or 2
      * await queue.dequeue(); // can be 1 or 2 as well (the remaining number)
      * ```
      *
      * @example If you would like to retain the order, await the @link queue
      * operations, use @link queueAll IAsyncLimitedQueue#queueAll or
      * @link queueAllAsync IAsyncLimitedQueue#queueAllAsync.
      * ```
      * await queue.queue(1);
      * await queue.queue(2);
      * ```
      * ```
      * queue.queueAll([1, 2]);
      * ```
      *
      * The @link AsyncIterable interface iterates the queue's (future) contents
      * ad infinitum. Users are advised to signal the end by manual insertion of a
      * special value (a so-called poison pill), see @link IAsyncQueue.
      */
      export interface IAsyncLimitedQueue<T> extends AsyncIterable<T>
      /**
      * Queue an element, waiting for entrance if necessary.
      *
      * @example
      * ```
      * queue.queue(42).then(() =>
      * // 42 is now stored within the queue
      * );
      * ```
      */
      queue(data: T): Promise<void>;

      /**
      * Queue all elements of an iterable, e.g. an array or a generator function.
      * @see IAsyncQueue#queueAll
      */
      queueAll(iterable: Iterable<T>): Promise<void>;

      /**
      * Queue all elements of an asynchronous iterable, e.g. an asynchronous
      * generator functions.
      *
      * @see IAsyncQueue#queueAllAsync
      */
      queueAllAsync(iterable: AsyncIterable<T>): Promise<void>;

      /**
      * Offer an element, only queueing it if entrance is available at the time
      * of the call.
      *
      * @returns True if the element could be inserted right away. False
      * otherwise.
      */
      offer(data: T): boolean;

      /**
      * Offer all elements of an iterable for in-order insertion.
      *
      * @param iterable An iterable whose first (limit - queue.size()) elements
      * will be inserted. Iterables which iterate an infinite
      * number of elements can also be passed and will *not*
      * result in an endless loop.
      *
      * @returns The number of elements, which could be inserted right away.
      * Possibly 0 when the queue was full at the time of the call.
      */
      offerAll(iterable: Iterable<T>): number;

      /**
      * Offer all elements of an asynchronous iterable for in-order insertion.
      *
      * @param iterable An iterable whose elements will be @link offered
      * in-order for this queue.
      * The method will stop querying and offering further
      * elements upon the first @link offer call, which
      * returns `false`.
      * <br>
      * Contrary to @link offerAll, iterables iterating an
      * infinite number of elements might prevent the Promise,
      * which @link offerAllAsync returns, from ever resolving.
      * <br>
      * This depends on @link dequeue operations which could
      * get scheduled by the JS VM while elements from the passed
      * asynchronous iterator are accessed.
      *
      * @returns A promise resolving to the number of elements, which could be
      * inserted (offered successfully) consecutively without waiting.
      * Possibly 0 when the queue was full at the time of the call.
      * Fulfillment of this promise is not guaranteed in case of infinite
      * iterables.
      */
      offerAllAsync(iterable: AsyncIterable<T>): Promise<number>;

      /**
      * Dequeue an element if available or throw an exception otherwise.
      *
      * @returns The first element of the queue.
      * @throws An exception if the queue is empty at the time of the call.
      */
      poll(): T;

      /**
      * Dequeue an element, waiting for data to be available if necessary.
      *
      * @returns A promise which is fulfilled when an element (as queued by
      * queue()) becomes available.
      * If multiple dequeus() are issued sequentially, it is
      * implementation-defined whether they are fulfilled in the same
      * order or not. However, the data is still retrieved in FIFO
      * fashion, meaning the first fulfilled promise gets the first
      * element, the second fulfilled the second one and so forth.
      */
      dequeue(): Promise<T>;

      /**
      * Return the current size at the moment of the call.
      *
      * Even though code like
      * ```
      * if (queue.size() >= 1)
      * const element = queue.poll();
      *
      * ```
      * is technically not wrong (due to JS' execution model), users are
      * advised to avoid this pattern. Instead, users are encouraged to
      *
      * - in cases where waiting for a promise is impossible, to use
      * @link poll and catch the exception,
      * - or to use @link dequeue with JS' `await` or
      * `queue.dequeue().then(...)`.
      */
      size(): number;


      /**
      * Asynchronous element-limited FIFO queue with a Promise-driven dequeue operation.
      *
      * @link AsyncLimitedQueue#queue operations are delayed (in unspecified order)
      * until space becomes available through dequeue operations.
      */
      export class AsyncLimitedQueue<T> implements IAsyncLimitedQueue<T>
      private limitSem: ISemaphore;

      /**
      * Initialize the queue.
      * @param limit A integer >= 1 specifying the number of elements after which
      * queue() effectively blocks (i.e. the promise returned by it
      * does not get "immediately" fulfilled for some informal value
      * of immediately).
      * @param storageQueue An asynchronous (non-limiting) queue backing the data.
      * It defaults to a AsyncQueue.
      *
      * @throws An exception in case the limit is not an integer or is <= 0.
      */
      public constructor(limit: number, private storageQueue: IAsyncQueue<T> = new AsyncQueue())

      public async queue(data: T): Promise<void>
      await this.limitSem.take();
      this.storageQueue.queue(data);


      public async queueAll(iterable: Iterable<T>): Promise<void>
      for (const element of iterable)
      await this.queue(element);



      public async queueAllAsync(iterable: AsyncIterable<T>): Promise<void>
      for await (const element of iterable)
      await this.queue(element);



      public offer(data: T): boolean
      if (this.limitSem.tryTake())
      this.storageQueue.queue(data);
      return true;

      else
      return false;



      public offerAll(iterable: Iterable<T>): number
      let insertedElements = 0;

      for (const element of iterable)
      if (!this.offer(element))
      return insertedElements;


      insertedElements++;


      return insertedElements;


      public async offerAllAsync(iterable: AsyncIterable<T>): Promise<number>
      let insertedElements = 0;

      for await (const element of iterable)
      if (!this.offer(element))
      return insertedElements;


      insertedElements++;


      return insertedElements;


      public async dequeue(): Promise<T>
      return this.storageQueue.dequeue().then(element =>
      this.limitSem.free();
      return element;
      );


      public poll(): T
      return this.storageQueue.poll();


      public async *[Symbol.asyncIterator](): AsyncIterableIterator<T>
      while (true)
      yield this.dequeue();



      public size(): number
      return this.storageQueue.size();








      share|improve this question













      I implemented an AsyncQueue, whose dequeue() operation returns a Promise. The AsyncLimitedQueue additionally enforces a limit on the number of entries, so that queue() returns a Promise as well.



      • Full code + unit tests: https://github.com/ComFreek/async-playground

      • Generated documentation: https://comfreek.github.io/async-playground

      Both classes use Semaphore, which I also built using Promises. For the sake of brevity, I would like to restrain this question to the queue implementations.



      Usage example



      Excerpt from https://github.com/ComFreek/async-playground/blob/master/examples/queue-stdio-lines.ts:



      async function* readInput() null> = new AsyncQueue();
      // readline is an NPM package simplifying reading lines from stdio
      const rl = readline.createInterface( /* ... */ );

      rl.on('line', (line: string) => queue.queue(line));
      rl.on('close', () => queue.queue(null));

      yield* queue;


      for await (const line of readInput())
      if (line === null)
      break;

      console.log(line);



      Questions



      • Have I followed best practices?


      • Are the method contracts clean? Especially, have I employed asynchronous operations and promises where appropriate?



      • Having both interfaces IAsyncQueue and IAsyncLimitedQueue, I intentionally did not create a common superinterface. Was this a right decision?



        The reason is that IAsyncQueue#queue and similar operations are synchronous and therefore non-blocking contrary to IAsyncLimitedQueue#queue, which is asynchronous.

        This difference is also manifested in their types, void vs Promise<void>. Making AsyncQueue#queue returning a Promise as well would violate the principle of least astonishment, namely that it is synchronous despite its return type.



      Code



      Online: repo, docs



      AsyncQueue.ts



      import ISemaphore, Semaphore from '../semaphore/index';

      /**
      * Asynchronous FIFO queue with a Promise-driven dequeue operation.
      *
      * All element values are allowed, especially falsy ones, e.g.
      * false, 0, undefined, null, , are all valid elements which
      * can be queued and dequeued.
      *
      * The @link AsyncIterable interface iterates the queue's (future) contents
      * ad infinitum. Users are advised to signal the end by manual insertion of a
      * special value (a so-called poison pill):
      *
      * ```
      * const queue = new AsyncQueue<string|null>();
      * file.on('data', (data) => queue.queue(data));
      * file.on('close', () => queue.queue(null));
      *
      * for await (const data of queue)
      * if (data === null)
      * break;
      *
      * // Otherwise, process data
      *
      * ```
      */
      export interface IAsyncQueue<T> extends AsyncIterable<T>
      /**
      * Queue an element immediately.
      */
      queue(data: T): void;

      /**
      * Queue all elements of an iterable, e.g. an array or a generator function.
      *
      * @example `queue.queueAll(['myArray', 'of', 'strings'])`
      *
      * @example If one has a generator function f:
      * `function *f(): Iterable<string> ... `
      * then you can call `queue.queueAll(f())`.
      */
      queueAll(iterable: Iterable<T>): void;

      /**
      * Queue all elements of an asynchronous iterable, e.g. an asynchronous
      * generator functions.
      *
      * @example Using an asynchronous generator function:
      * ```
      * async function *f(): AsyncIterable<string>
      * yield* ['Array', 'of', 'strings'];
      *
      *
      * const previousSize = queue.size();
      * queue.queueAllAsync(f());
      * // ^ We do not await the queueing!
      * // Therefore: queue.size() === previousSize here!
      * // This is indeed guaranteed by JS' execution model. There is
      * // no way queueAllAsync could have queried an element from f()
      * // asynchronously using a promise before this code gives up
      * // the "CPU power" by await or yield.
      *
      * await queue.dequeue(); // 'Array'
      * await queue.dequeue(); // 'of'
      * await queue.dequeue(); // 'strings'
      *
      * // queue.size() === 0 and queue.dequeue() would block
      * // ad infinitum
      *
      * await queue.queueAllAsync(f());
      * // We now await the queueing!
      * // Therefore: queue.size() === 3 here!
      * ```
      *
      * @example AsyncQueue instances are also asynchronous iterables,
      * meaning that you can stack multiple queues together:
      * ```
      * const backgroundQueue: IAsyncQueue<string> = new AsyncQueue();
      * const foregroundQueue: IAsyncQueue<string> = new AsyncQueue();
      *
      * setTimeout(() => backgroundQueue.queue('Hello World!'), 100);
      *
      * foregroundQueue.queueAllAsync(backgroundQueue);
      * const retrievedString = await foregroundQueue.dequeue();
      *
      * // retrievedString === 'Hello World!'
      * ```
      */
      queueAllAsync(iterable: AsyncIterable<T>): Promise<void>;

      /**
      * Dequeue an element, waiting for data to be available if necessary.
      *
      * @returns A promise which is fulfilled when an element (as queued by
      * queue()) becomes available.
      * If multiple dequeus() are issued sequentially, it is
      * implementation-defined whether they are fulfilled in the same
      * order or not. However, the data is still retrieved in FIFO
      * fashion, meaning the first fulfilled promise gets the first
      * element, the second fulfilled the second one and so forth.
      */
      dequeue(): Promise<T>;

      /**
      * Dequeue an element if available or throw an exception otherwise.
      *
      * @returns The first element of the queue.
      * @throws An exception if the queue is empty at the time of the call.
      */
      poll(): T;

      /**
      * Return the current size at the moment of the call.
      *
      * Even though code like
      * ```
      * if (queue.size() >= 1)
      * const element = queue.poll();
      *
      * ```
      * is technically not wrong (due to JS' execution model), users are
      * advised to avoid this pattern. Instead, users are encouraged to
      *
      * - in cases where waiting for a promise is impossible, to use
      * @link poll and catch the exception,
      * - or to use @link dequeue with JS' `await` or
      * `queue.dequeue().then(...)`.
      */
      size(): number;


      export class NoElementError extends Error


      export class AsyncQueue<T> implements IAsyncQueue<T>
      private buffer: T = ;
      private elementSem: ISemaphore = new Semaphore(0);

      public queue(data: T): void
      this.buffer.push(data);
      this.elementSem.free();


      public queueAll(iterable: Iterable<T>): void
      for (const element of iterable)
      this.queue(element);



      public async queueAllAsync(iterable: AsyncIterable<T>): Promise<void>
      for await (const element of iterable)
      this.queue(element);



      public async dequeue(): Promise<T>
      await this.elementSem.take();

      try
      return this.poll();

      catch (err)
      if (err instanceof NoElementError)
      throw new Error('AsyncQueue dequeue: poll() threw an exception
      even though dequeue() waited for its element semaphore to be available via take().');

      else
      throw err;




      public poll(): T
      if (this.buffer.length >= 1)
      const dequeuedElement = this.buffer.shift();

      // Force-cast the element since we know that the buffer contains
      // at least one element and JS' execution model prohibits other
      // interleaving fibers to modify the buffer (=> no race condition).
      //
      // Also, we cannot check for shift() returning undefined as the queue
      // might well contain "undefined" as such.
      return (dequeuedElement as T);

      else
      throw new NoElementError();



      public size(): number
      return this.buffer.length;


      public async *[Symbol.asyncIterator](): AsyncIterableIterator<T>
      while (true)
      yield this.dequeue();





      AsyncLimitedQueue:



      import IAsyncQueue, AsyncQueue from './AsyncQueue';
      import ISemaphore, Semaphore from '../semaphore/index';

      /**
      * Asynchronous entrance-limited FIFO queue with a Promise-driven dequeue operation.
      *
      * Contrary to @link IAsyncQueue, the queue operation is Promise-driven as well,
      * e.g. implementations might delay entrance into the queue, e.g. to enforce a
      * limit on the number of elements stored in the queue at the same time, cf.
      * @link AsyncLimitedQueue.
      * Other types of entrance limitations are conceivable as well, such as a
      * restriction on the sum of contained elements in case of a number queue.
      *
      * All element values are allowed, especially falsy ones, e.g.
      * false, 0, undefined, null, , are all valid elements which
      * can be queued and dequeued.
      *
      * @link queue IAsyncLimitedQueue#queue operations are possibly delayed and
      * executed in implementation-dependenent order.
      *
      * @example Issueing multiple @link queue operations without awaiting the
      * previous ones may result in implementation-defined insertion order.
      * ```
      * queue.queue(1);
      * queue.queue(2);
      *
      * await queue.dequeue(); // can be 1 or 2
      * await queue.dequeue(); // can be 1 or 2 as well (the remaining number)
      * ```
      *
      * @example If you would like to retain the order, await the @link queue
      * operations, use @link queueAll IAsyncLimitedQueue#queueAll or
      * @link queueAllAsync IAsyncLimitedQueue#queueAllAsync.
      * ```
      * await queue.queue(1);
      * await queue.queue(2);
      * ```
      * ```
      * queue.queueAll([1, 2]);
      * ```
      *
      * The @link AsyncIterable interface iterates the queue's (future) contents
      * ad infinitum. Users are advised to signal the end by manual insertion of a
      * special value (a so-called poison pill), see @link IAsyncQueue.
      */
      export interface IAsyncLimitedQueue<T> extends AsyncIterable<T>
      /**
      * Queue an element, waiting for entrance if necessary.
      *
      * @example
      * ```
      * queue.queue(42).then(() =>
      * // 42 is now stored within the queue
      * );
      * ```
      */
      queue(data: T): Promise<void>;

      /**
      * Queue all elements of an iterable, e.g. an array or a generator function.
      * @see IAsyncQueue#queueAll
      */
      queueAll(iterable: Iterable<T>): Promise<void>;

      /**
      * Queue all elements of an asynchronous iterable, e.g. an asynchronous
      * generator functions.
      *
      * @see IAsyncQueue#queueAllAsync
      */
      queueAllAsync(iterable: AsyncIterable<T>): Promise<void>;

      /**
      * Offer an element, only queueing it if entrance is available at the time
      * of the call.
      *
      * @returns True if the element could be inserted right away. False
      * otherwise.
      */
      offer(data: T): boolean;

      /**
      * Offer all elements of an iterable for in-order insertion.
      *
      * @param iterable An iterable whose first (limit - queue.size()) elements
      * will be inserted. Iterables which iterate an infinite
      * number of elements can also be passed and will *not*
      * result in an endless loop.
      *
      * @returns The number of elements, which could be inserted right away.
      * Possibly 0 when the queue was full at the time of the call.
      */
      offerAll(iterable: Iterable<T>): number;

      /**
      * Offer all elements of an asynchronous iterable for in-order insertion.
      *
      * @param iterable An iterable whose elements will be @link offered
      * in-order for this queue.
      * The method will stop querying and offering further
      * elements upon the first @link offer call, which
      * returns `false`.
      * <br>
      * Contrary to @link offerAll, iterables iterating an
      * infinite number of elements might prevent the Promise,
      * which @link offerAllAsync returns, from ever resolving.
      * <br>
      * This depends on @link dequeue operations which could
      * get scheduled by the JS VM while elements from the passed
      * asynchronous iterator are accessed.
      *
      * @returns A promise resolving to the number of elements, which could be
      * inserted (offered successfully) consecutively without waiting.
      * Possibly 0 when the queue was full at the time of the call.
      * Fulfillment of this promise is not guaranteed in case of infinite
      * iterables.
      */
      offerAllAsync(iterable: AsyncIterable<T>): Promise<number>;

      /**
      * Dequeue an element if available or throw an exception otherwise.
      *
      * @returns The first element of the queue.
      * @throws An exception if the queue is empty at the time of the call.
      */
      poll(): T;

      /**
      * Dequeue an element, waiting for data to be available if necessary.
      *
      * @returns A promise which is fulfilled when an element (as queued by
      * queue()) becomes available.
      * If multiple dequeus() are issued sequentially, it is
      * implementation-defined whether they are fulfilled in the same
      * order or not. However, the data is still retrieved in FIFO
      * fashion, meaning the first fulfilled promise gets the first
      * element, the second fulfilled the second one and so forth.
      */
      dequeue(): Promise<T>;

      /**
      * Return the current size at the moment of the call.
      *
      * Even though code like
      * ```
      * if (queue.size() >= 1)
      * const element = queue.poll();
      *
      * ```
      * is technically not wrong (due to JS' execution model), users are
      * advised to avoid this pattern. Instead, users are encouraged to
      *
      * - in cases where waiting for a promise is impossible, to use
      * @link poll and catch the exception,
      * - or to use @link dequeue with JS' `await` or
      * `queue.dequeue().then(...)`.
      */
      size(): number;


      /**
      * Asynchronous element-limited FIFO queue with a Promise-driven dequeue operation.
      *
      * @link AsyncLimitedQueue#queue operations are delayed (in unspecified order)
      * until space becomes available through dequeue operations.
      */
      export class AsyncLimitedQueue<T> implements IAsyncLimitedQueue<T>
      private limitSem: ISemaphore;

      /**
      * Initialize the queue.
      * @param limit A integer >= 1 specifying the number of elements after which
      * queue() effectively blocks (i.e. the promise returned by it
      * does not get "immediately" fulfilled for some informal value
      * of immediately).
      * @param storageQueue An asynchronous (non-limiting) queue backing the data.
      * It defaults to a AsyncQueue.
      *
      * @throws An exception in case the limit is not an integer or is <= 0.
      */
      public constructor(limit: number, private storageQueue: IAsyncQueue<T> = new AsyncQueue())

      public async queue(data: T): Promise<void>
      await this.limitSem.take();
      this.storageQueue.queue(data);


      public async queueAll(iterable: Iterable<T>): Promise<void>
      for (const element of iterable)
      await this.queue(element);



      public async queueAllAsync(iterable: AsyncIterable<T>): Promise<void>
      for await (const element of iterable)
      await this.queue(element);



      public offer(data: T): boolean
      if (this.limitSem.tryTake())
      this.storageQueue.queue(data);
      return true;

      else
      return false;



      public offerAll(iterable: Iterable<T>): number
      let insertedElements = 0;

      for (const element of iterable)
      if (!this.offer(element))
      return insertedElements;


      insertedElements++;


      return insertedElements;


      public async offerAllAsync(iterable: AsyncIterable<T>): Promise<number>
      let insertedElements = 0;

      for await (const element of iterable)
      if (!this.offer(element))
      return insertedElements;


      insertedElements++;


      return insertedElements;


      public async dequeue(): Promise<T>
      return this.storageQueue.dequeue().then(element =>
      this.limitSem.free();
      return element;
      );


      public poll(): T
      return this.storageQueue.poll();


      public async *[Symbol.asyncIterator](): AsyncIterableIterator<T>
      while (true)
      yield this.dequeue();



      public size(): number
      return this.storageQueue.size();










      share|improve this question












      share|improve this question




      share|improve this question








      edited Mar 6 at 14:28
























      asked Mar 4 at 16:13









      ComFreek

      608617




      608617

























          active

          oldest

          votes











          Your Answer




          StackExchange.ifUsing("editor", function ()
          return StackExchange.using("mathjaxEditing", function ()
          StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix)
          StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
          );
          );
          , "mathjax-editing");

          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "196"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          convertImagesToLinks: false,
          noModals: false,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: null,
          bindNavPrevention: true,
          postfix: "",
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );








           

          draft saved


          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f188795%2fblocking-promise-driven-queues-with-ecmascripts-async-await-and-generators%23new-answer', 'question_page');

          );

          Post as a guest



































          active

          oldest

          votes













          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes










           

          draft saved


          draft discarded


























           


          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f188795%2fblocking-promise-driven-queues-with-ecmascripts-async-await-and-generators%23new-answer', 'question_page');

          );

          Post as a guest