Name: reactive-zmq
Owner: 2GIS
Description: The Akka Streams based ZMQ client.
Created: 2016-03-23 10:45:08.0
Updated: 2018-03-15 22:06:27.0
Pushed: 2017-12-11 08:30:12.0
Size: 47
Language: Scala
GitHub Committers
User | Most Recent Commit | # Commits |
---|
Other Committers
User | Most Recent Commit | # Commits |
---|
This is akka-stream API for zmq. Currently it only supports receiving data via unidirectional ZMQ sockets of types:
See jeromq documentation.
Add the following settings to your build.sbt
:
lvers += Resolver.jcenterRepo
aryDependencies += "ru.dgis" %% "reactive-zmq" % "0.4.0"
Create zmq context and Source
:
rt org.zeromq.ZMQ
context = ZMQ.context(1)
source = ZMQSource(context,
de = ZMQ.PULL,
meout = 1 second,
dresses = List("tcp://127.0.0.1:12345")
Now you may use source
in your graphs:
icit val as = ActorSystem()
icit val m = ActorMaterializer()
ce
ap { x: ByteString => println(x); x }
o(Sink.ignore)
un()
Full example is available here
To stop the Source
you should use the materialized Control
object:
(control, finish) = source
ap { x: ByteString => println(x); x }
oMat(Sink.ignore)(Keep.both)
un()
The Control
object exposes a gracefulStop
method that closes an underlying ZMQ socket and completes the Source
:
stopFuture: Future[Unit] = control.gracefulStop()
icit val ec = as.dispatcher
re.sequence(Seq(stopFuture, finish)).onComplete { _ =>
.terminate()
ntext.close()
Add the following settings to your build.sbt
to use a SNAPSHOT version:
lvers += "OSS JFrog Snapshots" at "https://oss.jfrog.org/artifactory/libs-snapshot/"
aryDependencies += "ru.dgis" %% "reactive-zmq" % "0.5.0-SNAPSHOT"