Name: java-async-util
Owner: International Business Machines
Description: Java utilities for working with CompletionStages
Created: 2018-05-01 20:30:29.0
Updated: 2018-05-07 16:52:47.0
Pushed: 2018-05-07 23:22:13.0
Homepage: null
Size: 1212
Language: Java
GitHub Committers
User | Most Recent Commit | # Commits |
---|
Other Committers
User | Most Recent Commit | # Commits |
---|
async-util is a library for working with Java 8 CompletionStages. Its primary goal is to provide tools for asynchronous coordination, including iterative production/consumption of CompletionStages and non-blocking asynchronous mutual exclusion support.
The library is broken up into three packages:
To get started, you can browse the javadocs or walk through some example code.
To add a dependency on asyncutil
endency>
<groupId>com.ibm.async</groupId>
<artifactId>asyncutil</artifactId>
<version>0.1.0</version>
pendency>
To get support for Flow (JDK9+ only)
endency>
<groupId>com.ibm.async</groupId>
<artifactId>asyncutil-flow</artifactId>
<version>0.1.0</version>
pendency>
The locks package provides asynchronous analogs of familiar synchronization primitives, all with efficient non-blocking implementations. Imagine we again have some source of asynchronity (say asynchronous network requests), and we'd like to implement an asynchronous method that makes a request and generates a result based on the request's response and some state that requires access under mutual exclusion.
s MyAsyncClass {
not thread safe
ivate MutableState mutableState;
ivate CompletionStage<Response> asyncNetworkOperation(Request request) {...}
mpletionStage<Result> makeRequest(Request request) {
return asyncNetworkOperation(request)
.thenApply(response -> {
// unsafe!
mutableState.update(response);
return mutableState.produceResult();
});
If we wrap the mutableState
operations in a synchronized
block, we'll end up blocking the thread pool that runs our network operations. This is especially undesirable if this threadpool is possibly serving other interests in our application. We could solve that by creating our own thread pool just to do the locking + state manipulation using thenApplyAsync
but that has a number of downsides
Instead we can use AsyncLock
to provide exclusive access to the MutableState
. We will try to acquire the lock on the thread that completes the network operation stage, and if it is not available we'll receive a CompletionStage that will notify us when it becomes available.
ate AsyncLock lock = AsyncLock.create();
letionStage<Result> makeRequest(Request request) {
turn asyncNetworkOperation(request)
.thenCompose(response ->
lock.acquireLock().thenApply(token -> {
try {
mutableState.update(response);
return mutableState.produceResult();
} finally {
token.release();
}
})
});
for cleanliness, we can use StageSupport.tryWith
for try-with-resources emulation:
letionStage<Result> makeRequest(Request request) {
turn asyncNetworkOperation(request)
.thenCompose(response ->
StageSupport.tryWith(lock.acquireLock(), ignored -> {
mutableState.update(response);
return mutableState.produceResult();
})
);
The package provides asynchronous versions of read/write locks, stamped locks, semaphores and named locks. The full locks javadoc contains more information.
The classes in this package provide ways to generate and consume results asynchronously. The main mechanism is AsyncIterator
interface, which can be considered an asynchronous analog of the Stream API. The full iteration javadocs contains more information on AsyncIterator
as well as other asynchronous iteration constructs.
Consider the following example from the Stream
documentation
sum = widgets.stream()
ilter(w -> w.getColor() == RED)
apToInt(w -> w.getWeight())
um();
Say widgets was not a concrete collection, but instead generating a widget involved an asynchronous network request (or an expensive CPU computation, etc). If we instead make the source of widgets an AsyncIterator
we can asynchronously apply the pipeline every time a widget becomes available, and return a CompletionStage which will be complete when the pipeline has finished. In this example, let's say we are only interested in the first 100 red widgets.
ake an asynchronous network request that yields a widget
letionStage<Widget> getWidget();
letionStage<Integer> sum = AsyncIterator
enerate(() -> getWidget())
ilter(w -> w.getColor() == RED)
ake(100)
henApply(w -> w.getWeight())
ollect(Collectors.summingInt(i -> i));
This will make one getWidget
request at a time, running the rest of the pipeline operations each time a widget is generated on whatever thread processes the response required to generate the widget. When the widget stream is finished (in this case, after receiving 100 red widgets), the CompletionStage sum
will complete with the result of the reduction operation. AsyncIterators
have many other capabilities; if getting the weight required asynchronity we could use thenCompose
instead of thenApply
, if we needed to collect the weights into a collection we could use collect(Collector)
, etc.
It's often limiting to only be able to produce results for consumption iteratively. AsyncQueue
provides ways to produce these values in parallel without any blocking synchronization:
mplements AsyncIterator
cQueue widgets = AsyncQueues.unbounded();
edicate NUM_THREADS threads to producing widgets
(int i = 0; i < NUM_THREADS; i++) {
ecutor.submit(() -> {
// send returns whether the queue is still accepting values
while (widgets.send(expensiveComputeWidget());
;
reate a pipeline the same way as before
letionStage<Integer> sum = widgets.filter(...)...;
nce we get our sum, we can terminate the queue, stopping widget production
thenRun(() -> widgets.terminate());
The util package contains interfaces and static functions for commonly reimplemented CompletionStage
patterns. The best way to discover them all is to browse the javadoc.
StageSupport
StageSupport contains miscellaneous utility functions, including methods to create already completed exceptional stages, common transformations, and methods for working with resources that need to be asynchronously released.
AsyncCloseable
An interface analogous to AutoCloseable for objects that hold resources that must be relinquished asynchronously. By implementing AsyncCloseable
with your own objects, you can take advantage of the try*
methods on StageSupport
to safely relinquish resources after performing asynchronous actions.
Combinators
Static functions for combining collections of stages into a single result stage. The Java standard library provides two such methods with CompletableFuture.allOf / CompletableFuture.anyOf . Unfortunately, these only work with arrays of CompletableFuture
, not CompletionStage
, so you must first convert using toCompletableFuture
if you wish to use these methods. Collections must be converted to arrays well. The methods on Combinators
work on CompletionStage
and collections directly, furthermore several additional useful combinators are added. For example, to get the results of multiple stages with CompletableFuture.allOf
:
letableFuture<Integer>[] arr = ...
letionStage<List<Integer>> listFuture = CompletableFuture.allOf(arr).thenApply(ignore -> {
nal List<Integer> ints = new ArrayList<>();
r (CompletableFuture<Integer> stage : arr) {
return ints.add(stage.join());
turn ints;
Instead, you can use collect
on Combinators
:
ection<CompletionStage<Integer>> stages = ...
letionStage<List<Integer>> listFuture = Combinators.collect(stages, Collectors.toList());
Contributions welcome! See Contributing for details.