Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added AbortablePromise and Deferred #10

Merged
merged 9 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion LICENSE.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License

Copyright (c) 2023 Savva Mikhalevski
Copyright (c) 2024 Savva Mikhalevski

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
240 changes: 131 additions & 109 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,58 @@ npm install --save-prod parallel-universe

🚀 [API documentation is available here.](https://smikhalevski.github.io/parallel-universe/)

- [`AbortablePromise`](#abortablepromise)
- [`Deferred`](#deferred)
- [`AsyncQueue`](#asyncqueue)
- [Acknowledgements](#acknowledgements)
- [Blocking vs non-blocking acknowledgements](#blocking-vs-non-blocking-acknowledgements)
- [`WorkPool`](#workpool)
- [`Executor`](#executor)
- [`Lock`](#lock)
- [`Blocker`](#blocker)
- [`PubSub`](#pubsub)
- [`repeatUntil`](#repeatuntil)
- [`repeat`](#repeat)
- [`waitFor`](#waitfor)
- [`sleep`](#sleep)
- [`raceTimeout`](#racetimeout)
- [`delay`](#delay)
- [`timeout`](#timeout)

# `AbortablePromise`

The promise that can be aborted.

```ts
const promise = new AbortablePromise((resolve, reject, signal) => {
signal.addEventListener('abort', () => {
// Listen to the signal being aborted
});

// Resolve or reject the promise
});

promises.abort();
```

When [`abort`](https://smikhalevski.github.io/parallel-universe/classes/AbortablePromise.html#abort) is called,
the promise is instantly rejected with
an [`AbortError`](https://developer.mozilla.org/en-US/docs/Web/API/DOMException#aborterror) if it isn't settled yet.

Provide a custom abort reason:

```ts
promise.abort(new Error('Operation aborted'));
```

# `Deferred`

The promise that can be resolved externally.

```ts
const promise = new Deferred<string>();

promise.then(value => {
doSomething(value);
});

promises.resolve('Mars');
```

# `AsyncQueue`

Expand All @@ -33,63 +73,71 @@ Asynchronous queue decouples value providers and value consumers.
const queue = new AsyncQueue();

// Provider adds a value
queue.add('Mars');
queue.append('Mars');

// Consumer takes a value
queue.take();
// ⮕ Promise { 'Mars' }
// ⮕ AbortablePromise { 'Mars' }
```

`add` appends the value to the queue, while `take` removes the value from the queue as soon as it is available. If there
are no values in the queue upon `take` call then the returned promise is resolved after the next `add` call.
`append` appends the value to the queue, while `take` removes the value from the queue as soon as it is available.
If there are no values in the queue upon `take` call then the returned promise is resolved after the next `append` call.

```ts
const queue = new AsyncQueue();

// The returned promise would be resolved after the add call
// The returned promise would be resolved after the append call
queue.take();
// ⮕ Promise { 'Mars' }
// ⮕ AbortablePromise { 'Mars' }

queue.add('Mars');
queue.append('Mars');
```

Consumers receive values from the queue in the same order they were added by providers:

```ts
const queue = new AsyncQueue();

queue.add('Mars');
queue.add('Venus');
queue.append('Mars');
queue.append('Venus');

queue.take();
// ⮕ Promise { 'Mars' }
// ⮕ AbortablePromise { 'Mars' }

queue.take();
// ⮕ Promise { 'Venus' }
// ⮕ AbortablePromise { 'Venus' }
```

## Acknowledgements

In some cases removing the value from the queue isn't the desirable behavior, since the consumer may not be able to
process the taken value. Use `takeAck` to examine available value and acknowledge that it can be processed.
process the taken value. Use `takeAck` to examine available value and acknowledge that it can be processed. `takeAck`
returns a tuple of the available value and the acknowledgement callback. The consumer should call `ack` to notify
the queue on weather to remove the value from the queue or to retain it.

```ts
queue.takeAck().then(([value, ack]) => {
if (doSomeChecks()) {
ack();
doSomething(value);
try {
if (doSomeChecks()) {
ack(true);
doSomething(value);
}
} finally {
ack(false);
}
});
```

`takeAck` returns a tuple of the available value and the acknowledgement callback. The consumer should call `ack` to
notify the queue on weather to remove the value from the queue or to retain it.
To guarantee that consumers receive values in the same order as they were provided, acknowledgements prevent subsequent
consumers from being fulfilled until `ack` is called. Be sure to call `ack` to prevent the queue from being stuck
indefinitely.

Calling `ack` multiple times is safe, since only the first call would have an effect.

To acknowledge that the consumer can process the value, and the value must be removed from the queue use:

```ts
ack();
// or ack(true)
ack(true);
```

To acknowledge that the value should be retained by the queue use:
Expand All @@ -103,64 +151,14 @@ The value that was retained in the queue becomes available for the subsequent co
```ts
const queue = new AsyncQueue();

queue.add('Pluto');
queue.append('Pluto');

queue.takeAck(([value, ack]) => {
ack(false); // Tells queue to retain the value
});

queue.take();
// ⮕ Promise { 'Pluto' }
```

## Blocking vs non-blocking acknowledgements

If you didn't call `ack`, the acknowledgement would be automatically revoked on _the next tick_ after the promise
returned by `takeAck` is resolved, and the value would remain in the queue.

If acknowledgement was revoked, the `ack` call would throw an error:

```ts
queue.takeAck()
.then(protocol => protocol) // Add an extra tick
.then(([value, ack]) => {
ack();
// ❌ Error: AsyncQueue acknowledgement was revoked
});
```

To prevent the acknowledgement from being revoked, request a blocking acknowledgement:

```ts
queue.takeBlockingAck()
.then(protocol => protocol) // Add an extra tick
.then(([value, ack]) => {
ack(); // Value successfully acknowledged
doSomething(value);
});
```

Blocking acknowledgement should be used if the consumer has to perform asynchronous actions _before_ processing the
value.

To guarantee that consumers receive values in the same order as they were provided, blocking acknowledgements prevent
subsequent consumers from being resolved until `ack` is called. Be sure to call `ack` to prevent the queue from being
stuck indefinitely.

```ts
async function blockingConsumer() {
const [value, ack] = queue.takeAck(true);

try {
if (await doSomeChecks()) {
ack(true);
doSomething(value);
}
} finally {
// It's safe to call ack multiple times since it's a no-op
ack(false);
}
}
// ⮕ AbortablePromise { 'Pluto' }
```

# `WorkPool`
Expand All @@ -172,8 +170,10 @@ wait in the queue.
// The pool that processes 5 callbacks in parallel at maximum
const pool = new WorkPool(5);

pool.submit(async signal => doSomething());
// ⮕ Promise<ReturnType<typeof doSomething>>
pool.submit(signal => {
return Promise.resolve('Mars');
});
// ⮕ AbortablePromise<string>
```

You can change how many callbacks can the pool process in parallel:
Expand Down Expand Up @@ -207,7 +207,7 @@ Create an `Executor` instance and submit a callback for execution:
const executor = new Executor();

executor.execute(doSomething);
// ⮕ Promise<void>
// ⮕ AbortablePromise<void>
```

The `execute` method returns a promise that is fulfilled when the promise returned from the callback is settled. If
Expand All @@ -225,7 +225,7 @@ executor.execute(async signal => {
executor.abort();
```

When execution is aborted the current `result` and `reason` remain intact.
When execution is aborted the current `value` and `reason` remain intact.

To reset the executor to the initial state use:

Expand Down Expand Up @@ -319,39 +319,54 @@ pubSub.subscribe(message => {
pubSub.publish('Pluto');
```

# `repeatUntil`
# `repeat`

Invokes a callback periodically with the given delay between settlements of returned promises until the condition is
met. By default, the callback is invoked indefinitely with no delay between settlements:

```ts
repeat(async () => {
await doSomething();
});
// ⮕ AbortablePromise<void>
```

Specify a delay between invocations:

```ts
repeat(doSomething, 3000);
// ⮕ AbortablePromise<void>
```

Invokes a callback multiple times until the condition is met.
Abort the loop:

```ts
repeatUntil(
// The callback that is invoked repeatedly
async () => doSomething(),
const promise = repeat(doSomething, 3000);

// The condition clause must return a truthy value to stop
// the loop
result => result.isFulfilled,
promise.abort();
```

// An optional callback that returns a delay in milliseconds
// between iterations
result => 100,
Specify the condition when the loop must be stopped. The example below randomly picks a planet name once every 3 seconds
and fulfills the returned promise when `'Mars'` is picked:

```ts
repeat(
() => ['Mars', 'Pluto', 'Venus'][Math.floor(Math.random() * 3)],
3000,
value => value === 'Mars'
);
// ⮕ Promise<ReturnType<typeof doSomething>>
// ⮕ AbortablePromise<string>
```

You can combine `repeatUntil` with [`raceTimeout`](#racetimeout) to limit the repeat duration:
You can combine `repeat` with [`timeout`](#timeout) to limit the repeat duration:

```ts
raceTimeout(
signal =>
repeatUntil(
() => doSomething(),
result => signal.aborted || result.isFulfilled,
100
),
timeout(
repeat(async () => {
await doSomething();
}),
5000
);
// ⮕ Promise<ReturnType<typeof doSomething>>
```

# `waitFor`
Expand All @@ -360,7 +375,7 @@ Returns a promise that is fulfilled when a callback returns a truthy value:

```ts
waitFor(async () => doSomething());
// ⮕ Promise<ReturnType<typeof doSomething>>
// ⮕ AbortablePromise<ReturnType<typeof doSomething>>
```

If you don't want `waitFor` to invoke the callback too frequently, provide a delay in milliseconds:
Expand All @@ -369,26 +384,33 @@ If you don't want `waitFor` to invoke the callback too frequently, provide a del
waitFor(doSomething, 1_000);
```

# `sleep`
# `delay`

Returns a promise that resolves after a timeout. If signal is aborted then the returned promise is rejected with an
error.

```ts
sleep(100, abortController.signal);
// ⮕ Promise<void>
delay(100);
// ⮕ AbortablePromise<void>
```

Delay can be resolved with a value:

```ts
delay(100, 'Pluto');
// ⮕ AbortablePromise<string>
```

# `raceTimeout`
# `timeout`

Rejects with an error if the execution time exceeds the timeout.

```ts
raceTimeout(async signal => doSomething(), 100);
timeout(async signal => doSomething(), 100);
// ⮕ Promise<ReturnType<typeof doSomething>>

raceTimeout(
new Promise(resolve => {
timeout(
new AbortablePromise(resolve => {
// Resolve the promise value
}),
100
Expand Down
Loading
Loading