twosigma/flint

Name: flint

Owner: Two Sigma

Description: A Time Series Library for Apache Spark

Created: 2016-10-19 17:44:15.0

Updated: 2018-01-20 17:51:05.0

Pushed: 2018-01-12 17:43:34.0

Homepage:

Size: 772

Language: Scala

GitHub Committers

UserMost Recent Commit# Commits

Other Committers

UserEmailMost Recent Commit# Commits

README

Flint: A Time Series Library for Apache Spark

The ability to analyze time series data at scale is critical for the success of finance and IoT applications based on Spark. Flint is Two Sigma's implementation of highly optimized time series operations in Spark. It performs truly parallel and rich analyses on time series data by taking advantage of the natural ordering in time series data to provide locality-based optimizations.

Flint is an open source library for Spark based around the TimeSeriesRDD, a time series aware data structure, and a collection of time series utility and analysis functions that use TimeSeriesRDDs. Unlike DataFrame and Dataset, Flint's TimeSeriesRDDs can leverage the existing ordering properties of datasets at rest and the fact that almost all data manipulations and analysis over these datasets respect their temporal ordering properties. It differs from other time series efforts in Spark in its ability to efficiently compute across panel data or on large scale high frequency data.

Documentation Status

Requirements

| Dependency | Version | | ————– | —————– | | Spark version | 2.0 | | Scala version | 2.11.7 and above | | Python version | 3.5 and above |

How to build

To build this sbt project, one could simply do

assembly
Python bindings

The python bindings for Flint, including quickstart instructions, are documented at python/README.md. API documentation is available at http://ts-flint.readthedocs.io/en/latest/.

Getting Started
Starting Point: TimeSeriesRDD

The entry point into all functionalities for time series analysis in Flint is the TimeSeriesRDD class or object. In high level, a TimeSeriesRDD contains an OrderedRDD which could be used to represent a sequence of ordering key-value pairs. A TimeSeriesRDD uses Long to represent timestamps in nanoseconds since epoch as keys and InternalRows as values for OrderedRDD to represent a time series data set.

Create TimeSeriesRDD

Applications can create a TimeSeriesRDD from an existing RDD, from an OrderedRDD, from a DataFrame, or from a single csv file.

As an example, the following creates a TimeSeriesRDD from a gzipped CSV file with header and specific datetime format.

rt com.twosigma.flint.timeseries.CSV
tsRdd = CSV.from(
lContext,
ile://foo/bar/data.csv",
ader = true,
teFormat = "yyyyMMdd HH:mm:ss.SSS",
dec = "gzip",
rted = true

To create a TimeSeriesRDD from a DataFrame, you have to make sure the DataFrame contains a column named “time” of type LongType.

rt com.twosigma.flint.timeseries.TimeSeriesRDD
rt scala.concurrent.duration._
df = ... // A DataFrame whose rows have been sorted by their timestamps under "time" column
tsRdd = TimeSeriesRDD.fromDF(dataFrame = df)(isSorted = true, timeUnit = MILLISECONDS)

One could also create a TimeSeriesRDD from a RDD[Row] or an OrderedRDD[Long, Row] by providing a schema, e.g.

rt com.twosigma.flint.timeseries._
rt scala.concurrent.duration._
rdd = ... // An RDD whose rows have sorted by their timestamps
tsRdd = TimeSeriesRDD.fromRDD(
d,
hema = Schema("time" -> LongType, "price" -> DoubleType)
Sorted = true,
meUnit = MILLISECONDS

It is also possible to create a TimeSeriesRDD from a dataset stored as parquet format file(s). The TimeSeriesRDD.fromParquet() function provides the option to specify which columns and/or the time range you are interested, e.g.

rt com.twosigma.flint.timeseries._
rt scala.concurrent.duration._
tsRdd = TimeSeriesRDD.fromParquet(
lContext,
th = "hdfs://foo/bar/"
Sorted = true,
meUnit = MILLISECONDS,
lumns = Seq("time", "id", "price"),  // By default, null for all columns
gin = "20100101",                    // By default, null for no boundary at begin
d = "20150101"                       // By default, null for no boundary at end

Basic Operations

Similar to DataFrame, one could get the schema of a TimeSeriesRDD, and perform operations like first(), cache(), collect(), repartition(), persist(), etc. Other than those basic operations supported by DataFrame or RDD, one could manipulate rows and columns with the following functions.

priceTSRdd = ... // A TimeSeriesRDD with columns "time", "id", and "price"
result = priceTSRdd.keepRows { row: Row => row.getAs[Double]("price") > 100.0 }
priceTSRdd = ... // A TimeSeriesRDD with columns "time", "id", and "price"
result1 = priceTSRdd.keepColumns("time") // A TimeSeriesRDD with only "time" column
result2 = priceTSRdd.deleteColumns("id") // A TimeSeriesRDD with only "time" and "price" columns
priceTSRdd = ... // A TimeSeriesRDD with columns "time", "id", and "price"
result = priceTSRdd.renameColumns("id" -> "ticker", "price" -> "highPrice")
priceTSRdd = ... // A TimeSeriesRDD with columns "time", "id", and "price"
result = priceTSRdd.setTime {
w: Row =>
// Set the new time to the closest trading time to the current time.
nextClosestTradingTime(row.get("id"), row.getAs[Long]("time"))

Add Columns
priceTSRdd = ... // A TimeSeriesRDD with columns "time", "highPrice", and "lowPrice"
results = priceTSRdd.addColumns(
iff" -> DoubleType -> {
r: Row => r.getAs[Double]("highPrice") - r.getAs[Double]("lowPrice")


 TimeSeriesRDD with a new column "diff" = "highPrice" - "lowPrice"
priceTSRdd = ...
 TimeSeriesRDD with columns "time", "id", and "sellingPrice"
ime  id  sellingPrice
---------------------
000L 0   1.0
000L 1   2.0
000L 1   3.0
000L 0   3.0
000L 0   4.0
000L 1   5.0
000L 2   6.0

results = priceTSRdd.addColumnsForCycle(
djustedSellingPrice" -> DoubleType -> { rows: Seq[Row] =>
rows.map { row => (row, row.getAs[Double]("sellingPrice") * rows.size) }.toMap


ime  id  sellingPrice adjustedSellingPrice
------------------------------------------
000L 0   1.0          3.0
000L 1   2.0          6.0
000L 1   3.0          9.0
000L 0   3.0          12.0
000L 0   4.0          16.0
000L 1   5.0          20.0
000L 2   6.0          24.0
Group functions

A group function is to group rows with nearby (or exactly the same) timestamps.

priceTSRdd = ...
 TimeSeriesRDD with columns "time" and "price"
ime  price
----------
000L 1.0
000L 2.0
000L 3.0
000L 4.0
000L 5.0

results = priceTSRdd.groupByCycle()
ime  rows
-----------------------------------------------
000L [[1000L, 1.0], [1000L, 2.0]]
000L [[2000L, 3.0], [2000L, 4.0], [2000L, 5.0]]
priceTSRdd = ...
 TimeSeriesRDD with columns "time" and "price"
ime  price
----------
000L 1.0
500L 2.0
000L 3.0
500L 4.0

clockTSRdd = ...
 TimeSeriesRDD with only column "time"
ime
----
000L
000L
000L

results = priceTSRdd.groupByInterval(clockTSRdd)
ime  rows
---------------------------------
000L [[1000L, 1.0], [1500L, 2.0]]
000L [[2000L, 3.0], [2500L, 4.0]]
priceTSRdd = ...
 TimeSeriesRDD with columns "time" and "price"
ime  price
----------
000L 1.0
500L 2.0
000L 3.0
500L 4.0

result = priceTSRdd.addWindows(Window.pastAbsoluteTime("1000ns"))
ime  price window_past_1000ns
-----------------------------------------------------
000L 1.0   [[1000L, 1.0]]
500L 2.0   [[1000L, 1.0], [1500L, 2.0]]
000L 3.0   [[1000L, 1.0], [1500L, 2.0], [2000L, 3.0]]
500L 4.0   [[1500L, 2.0], [2000L, 3.0], [2500L, 4.0]]
Temporal Join Functions

A temporal join function is a join function defined by a matching criteria over time. A tolerance in temporal join matching criteria specifies how much it should look past or look futue.

leftTSRdd = ...
rightTSRdd = ...
result = leftTSRdd.leftJoin(rightTSRdd, tolerance = "1day")
result = leftTSRdd.futureLeftJoin(rightTSRdd, tolerance = "1day")
Summarize Functions

Summarize functions are the functions to apply summarizer(s) to rows within a certain period, like cycle, interval, windows, etc.

volTSRdd = ...
 TimeSeriesRDD with columns "time", "id", and "volume"
ime  id volume
-----------
000L 1  100
000L 2  200
000L 1  300
000L 2  400

result = volTSRdd.summarizeCycles(Summary.sum("volume"))
ime  volume_sum
---------------
000L 300
000L 700

Similarly, we could summarize over intervals, windows, or the whole time series data set. See

One could check timeseries.summarize.summarizer for different kinds of summarizer(s), like ZScoreSummarizer, CorrelationSummarizer, NthCentralMomentSummarizer etc.

stat.regression

flint.math.stat.regression aims to provide a library similar to apache statistics package and python statsmodels package.

Quick start
rt breeze.linalg.DenseVector
rt com.twosigma.flint.math.stat.regression._

enerate a random data set from a linear model with beta = [1.0, 2.0] and intercept = 3.0
data = WeightedLabeledPoint.generateSampleData(sc, DenseVector(1.0, 2.0), 3.0)

it the data using the OLS linear regression.
model = OLSMultipleLinearRegression.regression(data)

etrieve the estimate beta and intercept.
l.estimateRegressionParameters
Compare to org.apache.common.math3 and statsmodels in Python

The following table list different implementations cross different packages or libraries.

| flint.stat | apache.commons.math3 | statsmodels | | —— | —– | —– | | calculateCenteredTSS | n/a | centered_tss | | calculateHC0 | n/a | cov_HC0 | | calculateHC1 | n/a | cov_HC1 | | calculateEigenvaluesOfGramianMatrix | n/a | eigenvals | | calculateRegressionParametersPValue | n/a | pvalues | | calculateRegressionParametersTStat | n/a | tvalues | | calculateRSquared | n/a | rsquared | | calculateSumOfSquaredResidue | n/a | ssr| | calculateStandardErrorsOfHC0 | n/a | HC0_se | | calculateStandardErrorsOfHC1 | n/a | HC1_se | | calculateUncenteredTSS | n/a | uncentered_tss | | estimateBayesianInformationCriterion | n/a | bic | | estimateAkaikeInformationCriterion | n/a | aic | | estimateLogLikelihood | n/a | loglike | | estimateErrorVariance | estimateErrorVariance | mse_resid | | estimateRegressionParameters | estimateRegressionParameters | params | | estimateRegressionParametersVariance | estimateRegressionParametersVariance | normalized_cov_params | | estimateRegressionParametersStandardErrors | estimateRegressionParametersStandardErrors | bse | | estimateErrorVariance | estimateErrorVariance | scale | | getN | getN | nobs |

Contributing

In order to accept your code contributions, please fill out the appropriate Contributor License Agreement in the cla folder and submit it to tsos@twosigma.com.

Disclaimer

Apache Spark is a trademark of The Apache Software Foundation. The Apache Software Foundation is not affiliated, endorsed, connected, sponsored or otherwise associated in any way to Two Sigma, Flint, or this website in any manner.

© Two Sigma Open Source, LLC


This work is supported by the National Institutes of Health's National Center for Advancing Translational Sciences, Grant Number U24TR002306. This work is solely the responsibility of the creators and does not necessarily represent the official views of the National Institutes of Health.