vert-x/mod-rxvertx

Name: mod-rxvertx

Owner: vert.x

Description: # Vert.x 2.x is **deprecated** - use instead

Created: 2013-05-14 07:07:04.0

Updated: 2018-01-22 07:19:13.0

Pushed: 2016-11-02 10:05:30.0

Homepage: http://vertx.io/docs/vertx-rx/java/

Size: 347

Language: Java

GitHub Committers

UserMost Recent Commit# Commits

Other Committers

UserEmailMost Recent Commit# Commits

README

Vert.x 2.x is deprecated - use instead http://vertx.io/docs/vertx-rx/java/

mod-rxvertx

Vert.x module which uses RxJava to add support for Reactive Extensions (RX) using the RxJava library. This allows VertX developers to use the RxJava type-safe composable API to build VertX verticles.

Dependencies
Status

Currently Observable wrappers are provided for

There are also base Observable adapters that map Handler and AsyncResultHandler to Observable that can be used to call other Handler based APIs.

Support coming soon for

Usage

This is a non-runnable module, which means you add it to your module via the “includes” attribute of mod.json.

All standard API methods of the form

 method(args...,Handler<T> handler)

are typically available in the form

rvable<T> method(args...)

where the operation is executed immediately or

rvable<T> observeMethod(args...)

where the operation is executed on subscribe. This latter form is the more 'pure' Rx method and should be used where possible (required to maintain semantics of concat eg)

EventBus
entBus rxEventBus = new RxEventBus(vertx.eventBus());
entBus.<String>registerHandler("foo").subscribe(new Action1<RxMessage<String>>() {
blic void call(RxMessage<String> message) {
// Send a single reply
message.reply("pong!");



rvable<RxMessage<String>> obs = rxEventBus.send("foo", "ping!");

subscribe(
w Action1<RxMessage<String>>() {
public void call(RxMessage<String> message) {
  // Handle response 
}

w Action1<Throwable>() {
public void call(Throwable err) {
 // Handle error
}


Scheduler

The standard RxJava schedulers are not compatible with VertX. In order to preserve the Vert.x Threading Model all callbacks to a Verticle must be made in the context of that Verticle instance.

RxVertx provides a custom Scheduler implementation that uses the Verticle context to scheduler timers and ensure callbacks run on the correct context.

In the following example the scheduler is used to run a Timer and then buffer the output.

Note: The RxVertx scheduler must always be used to observe results inside the Verticle. It is possible to use the other Schedulers (eg for blocking calls) as long as you always use `observeOn` to route the callbacks onto the Verticle EventLoop. For timers it is more efficient to just use the Vert.x scheduler

rtx rx = new RxVertx(vertx);
rvable o = (some observable source)

rvable
  .timer(10, 10, TimeUnit.MILLISECONDS, rx.contextScheduler())
  .buffer(100,TimeUnit.MILLISECONDS,rx.contextScheduler())
  .take(10)
  .subscribe(...)
Timer

The timer functions are provided via the RxVertx wrapper. The timer is set on-subscribe. To cancel a timer that has not first, or a periodic timer, just unsubscribe.

rtx rx = new RxVertx(vertx);
etTimer(100).subscribe(new Action1<Long>() {
blic void call(Long t) {
// Timer fired


The new Scheduler means you can use the native RxJava Timer methods - this Timer may be deprecated in future

Helper

The support class RxSupport provides several helper methods for some standard tasks

Streams

There are two primary wrappers

Observable RxSupport.toObservable(ReadStream)

Convert a ReadStream into an Observable<Buffer>

RxSupport.stream(Observable,WriteStream)

Stream the output of an Observable to a WriteStream.

please note that this method does not handle writeQueueFull so cannot be used as a pump


This work is supported by the National Institutes of Health's National Center for Advancing Translational Sciences, Grant Number U24TR002306. This work is solely the responsibility of the creators and does not necessarily represent the official views of the National Institutes of Health.