spotify/flo

Name: flo

Owner: Spotify

Description: A lightweight workflow definition library

Created: 2015-11-18 22:06:27.0

Updated: 2018-05-24 12:43:33.0

Pushed: 2018-05-24 12:43:34.0

Homepage: http://spotify.github.io/flo/maven/latest/

Size: 6229

Language: Java

GitHub Committers

UserMost Recent Commit# Commits

Other Committers

UserEmailMost Recent Commit# Commits

README

flo    CircleCI Codecov Maven Central License

flo is a lightweight workflow definition library

Some key features

Dependency
endency>
roupId>com.spotify</groupId>
rtifactId>flo-workflow</artifactId>
ersion>${flo.version}</version>
pendency>
bt
.spotify" %% "flo-scala" % floVersion

JavaDocs here: http://spotify.github.io/flo/maven/latest/apidocs/

Table of contents
Quick Example: Fibonacci

Fibonacci serves as a good example even though it's not at all the kind of thing that flo is meant to be used for. Nevertheless, it demonstrates how a task DAG can be recursively defined with arbitrary logic governing which inputs are chosen.

s Fib {

atic Task<Long> fib(long n) {
TaskBuilder<Long> builder = Task.named("fib", n).ofType(Long.class);
if (n < 2) {
  return builder
      .process(() -> n);
} else {
  return builder
      .in(() -> fib(n - 1))
      .in(() -> fib(n - 2))
      .process((a, b) -> a + b);
}


blic static void main(String[] args) {
Task<Long> fib92 = fib(92);
EvalContext evalContext = MemoizingContext.composeWith(EvalContext.sync());
EvalContext.Value<Long> value = evalContext.evaluate(fib92);

value.consume(f92 -> System.out.println("fib(92) = " + f92));


Scala equivalent

rt com.spotify.flo._

ct Fib extends App {

f fib(n: Long): Task[Long] = defTask[Long](n) dsl (
if (n < 2)
  $ process n
else
  $ in      fib(n - 1)
    in      fib(n - 2)
    process (_ + _)


l fib92 = fib(92)
l evalContext = MemoizingContext.composeWith(EvalContext.sync)
l value = evalContext.evaluate(fib92)

lue.consume((f92:Long) => println("fib(92) = " + f92))

For more details on a high-level runner implementation, see flo-runner.

Task<T>

Task<T> is one of the more central types in flo. It represents some task which will evaluate a value of type T. It has a parameterized name, zero or more input tasks and a processing function which will be executed when inputs are evaluated. Tasks come with a few key properties governing how they are defined, behave and are interacted with. We'll cover these in the following sections.

Tasks are defined by regular methods

Your workflow tasks are not defined as classes that extend Task<T>, rather they are defined by using the TaskBuilder API as we've already seen in the fibonacci example. This is in many ways very similar to a very clean class with no mutable state, only final members and two overridden methods for inputs and evaluation function. But with a very important difference, we're handling the input tasks in a type-safe manner. Each input task you add will further construct the type for your evaluation function. This is how we can get a clean lambda such as (a, b) -> a + b as the evaluation function for our fibonacci example.

Here's a simple example of a flo task depending on two other tasks:

<Integer> myTask(String arg) {
turn Task.named("MyTask", arg).ofType(Integer.class)
  .in(() -> otherTask(arg))
  .in(() -> yetATask(arg))
  .process((otherResult, yetAResult) -> /* ... */);

This is how the same thing would typically look like in other libraries:

s MyTask extends Task<Integer> {

ivate final String arg;

Task(String arg) {
super("MyTask", arg);
this.arg = arg;


verride
blic List<? extends Task<?>> inputs() {
return Arrays.asList(new OtherTask(arg), new YetATask(arg));


verride
blic Integer process(List<Object> inputs) {
// lose all type safety and guess your inputs
// ...


Task embedding

There's of course nothing stopping you from having the task defined in a regular class. It might even be useful if your evaluation function is part of an existing class. flo does not force anything on to your types, it just needs to know what to run.

s SomeExistingClass {

ivate final String arg;

meExistingClass(String arg) {
this.arg = arg;


sk<Integer> task() {
return Task.named("EmbeddedTask", arg).ofType(Integer.class)
    .in(() -> otherTask(arg))
    .in(() -> yetATask(arg))
    .process(this::process);


t process(String otherResult, int yetAResult) {
// ...


Tasks are lazy

Creating instances of Task<T> is cheap. No matter how complex and deep the task DAG might be, creating the top level Task<T> will not cause the whole DAG to be created. This is because all inputs are declared using a Supplier<T>, utilizing their properties for deferred evaluation:

Library.maybeNeedsValue(() -> expensiveCalculation());

This pattern is on its way to become an idiom for achieving lazyness in Java 8. A good example is the additions to the Java 8 Logger class which lets the logger decide if the log line for a certain log level should be computed or not.

So we can easily create an endlessly recursive task (useless, but illustrative) and still be able to construct instances of it without having to worry about how complex or resource consuming the construction might be.

<String> endless() {
turn Task.named("Endless").ofType(String.class)
  .in(() -> endless())
  .process((impossible) -> impossible);

This means that we can always refer to tasks directly by using their definition:

Id endlessTaskId = endless().id();
Task DAGs as data structures

A Task<T> can be transformed into a data structure where a materialized view of the task DAG is needed. In this example we have two simple tasks where one is used as the input to the other.

<String> first(String arg) {
turn Task.named("First", arg).ofType(String.class)
  .process(() -> "hello " + arg);


<String> second(String arg) {
turn Task.named("Second", arg).ofType(String.class)
  .in(() -> first(arg))
  .process((firstResult) -> "well, " + firstResult);


 printTaskInfo() {
sk<String> task = second("flo");
skInfo taskInfo = TaskInfo.ofTask(task);
stem.out.println("taskInfo = " + taskInfo);

taskInfo in this example will be:

Info = TaskInfo {
=Second(flo)#375f5234,
Reference=false,
puts=[
TaskInfo {
  id=First(flo)#65f4e738,
  isReference=false,
  inputs=[]
}


The id and inputs fields should be pretty self explanatory. isReference is a boolean which signals if some task has already been materialized earlier in the tree, given a depth first, post-order traversal.

Recall that the DAG expansion can chose inputs arbitrarily based on the arguments. In workflow libraries where expansion is coupled with evaluation, it's hard to know what will be evaluated beforehand. Evaluation planning and result caching/memoizing becomes integral parts of such libraries. flo aims to expose useful information together with flexible evaluation apis to make it a library for easily building workflow management systems, rather than trying to be the can-do-it-all workflow management system itself. More about how this is achieved in the EvalContext sections.

EvalContext

EvalContext defines an interface to a context in which Task<T> instances are evaluated. The context is responsible for expanding the task DAG and invoking the task process-functions. It gives library authors a powerful abstraction to use when implementing the specific details of evaluating a task DAG. All details around setting up wiring of dependencies between tasks, interaction with user code for DAG expansion, invoking task functions with upstream arguments, and other mundane plumbing is dealt with by flo.

These are just a few aspects of evaluation that can be implemented in a EvalContext:

Since multi worker, asynchronous evaluation is a very common pre-requisite for many evaluation implementations, flo comes with a base implementation of an AsyncContext that can be extended with further behaviour.

See also SyncContext, InstrumentedContext and MemoizingContext.


This work is supported by the National Institutes of Health's National Center for Advancing Translational Sciences, Grant Number U24TR002306. This work is solely the responsibility of the creators and does not necessarily represent the official views of the National Institutes of Health.