IBM/java-async-util

Name: java-async-util

Owner: International Business Machines

Description: Java utilities for working with CompletionStages

Created: 2018-05-01 20:30:29.0

Updated: 2018-05-07 16:52:47.0

Pushed: 2018-05-07 23:22:13.0

Homepage: null

Size: 1212

Language: Java

GitHub Committers

UserMost Recent Commit# Commits

Other Committers

UserEmailMost Recent Commit# Commits

README

async-util

Introduction

async-util is a library for working with Java 8 CompletionStages. Its primary goal is to provide tools for asynchronous coordination, including iterative production/consumption of CompletionStages and non-blocking asynchronous mutual exclusion support.

The library is broken up into three packages:

To get started, you can browse the javadocs or walk through some example code.

Downloading

To add a dependency on asyncutil

endency>
<groupId>com.ibm.async</groupId>
<artifactId>asyncutil</artifactId>
<version>0.1.0</version>
pendency>

To get support for Flow (JDK9+ only)

endency>
<groupId>com.ibm.async</groupId>
<artifactId>asyncutil-flow</artifactId>
<version>0.1.0</version>
pendency>
Locks

The locks package provides asynchronous analogs of familiar synchronization primitives, all with efficient non-blocking implementations. Imagine we again have some source of asynchronity (say asynchronous network requests), and we'd like to implement an asynchronous method that makes a request and generates a result based on the request's response and some state that requires access under mutual exclusion.

s MyAsyncClass {
 not thread safe
ivate MutableState mutableState;

ivate CompletionStage<Response> asyncNetworkOperation(Request request) {...}


mpletionStage<Result> makeRequest(Request request) {
return asyncNetworkOperation(request)
  .thenApply(response -> {
    // unsafe!
    mutableState.update(response);
    return mutableState.produceResult();
  });


If we wrap the mutableState operations in a synchronized block, we'll end up blocking the thread pool that runs our network operations. This is especially undesirable if this threadpool is possibly serving other interests in our application. We could solve that by creating our own thread pool just to do the locking + state manipulation using thenApplyAsync but that has a number of downsides

Instead we can use AsyncLock to provide exclusive access to the MutableState. We will try to acquire the lock on the thread that completes the network operation stage, and if it is not available we'll receive a CompletionStage that will notify us when it becomes available.


ate AsyncLock lock = AsyncLock.create();

letionStage<Result> makeRequest(Request request) {
turn asyncNetworkOperation(request)
.thenCompose(response ->
  lock.acquireLock().thenApply(token -> {
    try {
      mutableState.update(response);
      return mutableState.produceResult();
    } finally {
      token.release();
    }
  })
});

for cleanliness, we can use StageSupport.tryWith for try-with-resources emulation:

letionStage<Result> makeRequest(Request request) {
turn asyncNetworkOperation(request)
.thenCompose(response ->
  StageSupport.tryWith(lock.acquireLock(), ignored -> {
      mutableState.update(response);
      return mutableState.produceResult();
  })
);

The package provides asynchronous versions of read/write locks, stamped locks, semaphores and named locks. The full locks javadoc contains more information.

Iteration

The classes in this package provide ways to generate and consume results asynchronously. The main mechanism is AsyncIterator interface, which can be considered an asynchronous analog of the Stream API. The full iteration javadocs contains more information on AsyncIterator as well as other asynchronous iteration constructs.

Consider the following example from the Stream documentation

sum = widgets.stream()
ilter(w -> w.getColor() == RED)
apToInt(w -> w.getWeight())
um();

Say widgets was not a concrete collection, but instead generating a widget involved an asynchronous network request (or an expensive CPU computation, etc). If we instead make the source of widgets an AsyncIterator we can asynchronously apply the pipeline every time a widget becomes available, and return a CompletionStage which will be complete when the pipeline has finished. In this example, let's say we are only interested in the first 100 red widgets.

ake an asynchronous network request that yields a widget
letionStage<Widget> getWidget();

letionStage<Integer> sum = AsyncIterator
enerate(() -> getWidget())
ilter(w -> w.getColor() == RED)
ake(100)
henApply(w -> w.getWeight())
ollect(Collectors.summingInt(i -> i));

This will make one getWidget request at a time, running the rest of the pipeline operations each time a widget is generated on whatever thread processes the response required to generate the widget. When the widget stream is finished (in this case, after receiving 100 red widgets), the CompletionStage sum will complete with the result of the reduction operation. AsyncIterators have many other capabilities; if getting the weight required asynchronity we could use thenCompose instead of thenApply, if we needed to collect the weights into a collection we could use collect(Collector), etc.

It's often limiting to only be able to produce results for consumption iteratively. AsyncQueue provides ways to produce these values in parallel without any blocking synchronization:

mplements AsyncIterator
cQueue widgets = AsyncQueues.unbounded();

edicate NUM_THREADS threads to producing widgets
(int i = 0; i < NUM_THREADS; i++) {
ecutor.submit(() -> {
// send returns whether the queue is still accepting values
while (widgets.send(expensiveComputeWidget());
;


reate a pipeline the same way as before
letionStage<Integer> sum = widgets.filter(...)...;

nce we get our sum, we can terminate the queue, stopping widget production
thenRun(() -> widgets.terminate());
Util

The util package contains interfaces and static functions for commonly reimplemented CompletionStage patterns. The best way to discover them all is to browse the javadoc.

StageSupport

StageSupport contains miscellaneous utility functions, including methods to create already completed exceptional stages, common transformations, and methods for working with resources that need to be asynchronously released.

AsyncCloseable

An interface analogous to AutoCloseable for objects that hold resources that must be relinquished asynchronously. By implementing AsyncCloseable with your own objects, you can take advantage of the try* methods on StageSupport to safely relinquish resources after performing asynchronous actions.

Combinators

Static functions for combining collections of stages into a single result stage. The Java standard library provides two such methods with CompletableFuture.allOf / CompletableFuture.anyOf . Unfortunately, these only work with arrays of CompletableFuture, not CompletionStage, so you must first convert using toCompletableFuture if you wish to use these methods. Collections must be converted to arrays well. The methods on Combinators work on CompletionStage and collections directly, furthermore several additional useful combinators are added. For example, to get the results of multiple stages with CompletableFuture.allOf:

letableFuture<Integer>[] arr = ...
letionStage<List<Integer>> listFuture = CompletableFuture.allOf(arr).thenApply(ignore -> {
nal List<Integer> ints = new ArrayList<>();
r (CompletableFuture<Integer> stage : arr) {
return ints.add(stage.join());

turn ints;

Instead, you can use collect on Combinators:

ection<CompletionStage<Integer>> stages = ...
letionStage<List<Integer>> listFuture = Combinators.collect(stages, Collectors.toList());
Contributing

Contributions welcome! See Contributing for details.


This work is supported by the National Institutes of Health's National Center for Advancing Translational Sciences, Grant Number U24TR002306. This work is solely the responsibility of the creators and does not necessarily represent the official views of the National Institutes of Health.