Utilities for async operations.
npm install @thermopylae/lib.async
This package contains utilities for async operations. More details can be found in API doc.
Below is a use case example of how LabeledConditionalVariableManager can be used.
import {
LabeledConditionalVariableManager,
LockedOperationType,
AwaiterRole
} from '@thermopylae/lib.async';
import {
AsyncFunction
} from '@thermopylae/core.declarations';
class Cache {
// acts as Read-Write Lock, i.e. shared for Reads, exclusive for Writes
private readonly conditionalVariable: LabeledConditionalVariableManager<string, string>;
private readonly entries: Map<string, string>; // assuming they will expire somehow
private readonly storageReader: AsyncFunction<string, string>;
private readonly storageWriter: AsyncFunction<string, void>;
public constructor(storageReader: AsyncFunction<string, string>, storageWriter: AsyncFunction<string, void>) {
this.conditionalVariable = new LabeledConditionalVariableManager();
this.entries = new Map<string, string>();
this.storageReader = storageReader;
this.storageWriter = storageWriter;
}
public async get(key: string): Promise<string | undefined> {
// acquire read-write lock
// in case it is acquired already by `set` operation, wait will throw
const lock = await this.conditionalVariable.wait(key, LockedOperationType.READ);
if (lock.role === AwaiterRole.CONSUMER) {
// lock has been acquired already by someone who initiated `get` operation for this key
// just return promise and wait until PRODUCER will resolve/reject it
return lock.promise;
}
let valueOrError = this.entries.get(key);
if (valueOrError === undefined) {
try {
valueOrError = await this.storageReader(key);
this.entries.set(key, valueOrError);
} catch (e) {
valueOrError = e;
}
}
// we are the PRODUCER, so we need to notify ourself and other consumers with value of the key or error that occurred
// also the lock needs to be released, so that `set` operation can acquire it
this.conditionalVariable.notifyAll(key, valueOrError);
return lock.promise;
}
public async set(key: string, value: string): Promise<void> {
// acquire exclusive lock
// in case it is acquired already by `set` or `get` cache operations, wait will throw
await this.conditionalVariable.wait(key, LockedOperationType.WRITE);
// there is no need to check for producer consumer roles,
// because WRITE lock is either acquired, or an error is thrown
try {
await this.storageWriter(key, value);
this.entries.set(key, value);
} finally {
// release exclusive lock, so it can be used by either `set` or `get` cache operations
this.conditionalVariable.notifyAll(key);
}
}
}
import { PromiseExecutor } from '@thermopylae/lib.async';
function fetchUserDetails(accountId: string): Promise<object> {
return fetch(`http://localhost:8080/user/${accountId}`);
}
// will process id's in batches of 2 elements,
// i.e. will make up to 2 network calls simultaneusly
let results = PromiseExecutor.run(
fetchUserDetails,
[1, 2, 3, 4, 5],
2
);
console.log(results);
// will process all id's in parallel,
// i.e. will make 5 network calls simultaneusly
results = PromiseExecutor.run(
fetchUserDetails,
[1, 2, 3, 4, 5],
PromiseExecutor.PARALLEL
);
console.log(results);
// will process id's in sequential order,
// i.e. will 1 network call simultaneusly
results = PromiseExecutor.run(
fetchUserDetails,
[1, 2, 3, 4, 5],
PromiseExecutor.SEQUENTIAL
);
console.log(results);
API documentation is available here.
It can also be generated by issuing the following commands:
git clone git@github.com:marinrusu1997/thermopylae.git
cd thermopylae
yarn install
yarn workspace @thermopylae/lib.async run doc
👤 Rusu Marin
Copyright © 2021 Rusu Marin.
This project is MIT licensed.