Name: flint
Owner: Two Sigma
Description: A Time Series Library for Apache Spark
Created: 2016-10-19 17:44:15.0
Updated: 2018-01-20 17:51:05.0
Pushed: 2018-01-12 17:43:34.0
Size: 772
Language: Scala
GitHub Committers
User | Most Recent Commit | # Commits |
---|
Other Committers
User | Most Recent Commit | # Commits |
---|
The ability to analyze time series data at scale is critical for the success of finance and IoT applications based on Spark. Flint is Two Sigma's implementation of highly optimized time series operations in Spark. It performs truly parallel and rich analyses on time series data by taking advantage of the natural ordering in time series data to provide locality-based optimizations.
Flint is an open source library for Spark based around the TimeSeriesRDD
, a time series aware data structure, and a collection of time series utility and analysis functions that use TimeSeriesRDD
s.
Unlike DataFrame
and Dataset
, Flint's TimeSeriesRDD
s can leverage the existing ordering properties of datasets at rest and the fact that almost all data manipulations and analysis over these datasets respect their temporal ordering properties.
It differs from other time series efforts in Spark in its ability to efficiently compute across panel data or on large scale high frequency data.
| Dependency | Version | | ————– | —————– | | Spark version | 2.0 | | Scala version | 2.11.7 and above | | Python version | 3.5 and above |
To build this sbt project, one could simply do
assembly
The python bindings for Flint, including quickstart instructions, are documented at python/README.md. API documentation is available at http://ts-flint.readthedocs.io/en/latest/.
TimeSeriesRDD
The entry point into all functionalities for time series analysis in Flint is the TimeSeriesRDD
class or object. In high level, a TimeSeriesRDD
contains an OrderedRDD
which could be used to represent a sequence of ordering key-value pairs. A TimeSeriesRDD
uses Long
to represent timestamps in nanoseconds since epoch as keys and InternalRow
s as values for OrderedRDD
to represent a time series data set.
TimeSeriesRDD
Applications can create a TimeSeriesRDD
from an existing RDD
, from an OrderedRDD
, from a DataFrame
, or from a single csv file.
As an example, the following creates a TimeSeriesRDD
from a gzipped CSV file with header and specific datetime format.
rt com.twosigma.flint.timeseries.CSV
tsRdd = CSV.from(
lContext,
ile://foo/bar/data.csv",
ader = true,
teFormat = "yyyyMMdd HH:mm:ss.SSS",
dec = "gzip",
rted = true
To create a TimeSeriesRDD
from a DataFrame
, you have to make sure the DataFrame
contains a column named “time” of type LongType
.
rt com.twosigma.flint.timeseries.TimeSeriesRDD
rt scala.concurrent.duration._
df = ... // A DataFrame whose rows have been sorted by their timestamps under "time" column
tsRdd = TimeSeriesRDD.fromDF(dataFrame = df)(isSorted = true, timeUnit = MILLISECONDS)
One could also create a TimeSeriesRDD
from a RDD[Row]
or an OrderedRDD[Long, Row]
by providing a schema, e.g.
rt com.twosigma.flint.timeseries._
rt scala.concurrent.duration._
rdd = ... // An RDD whose rows have sorted by their timestamps
tsRdd = TimeSeriesRDD.fromRDD(
d,
hema = Schema("time" -> LongType, "price" -> DoubleType)
Sorted = true,
meUnit = MILLISECONDS
It is also possible to create a TimeSeriesRDD
from a dataset stored as parquet format file(s). The TimeSeriesRDD.fromParquet()
function provides the option to specify which columns and/or the time range you are interested, e.g.
rt com.twosigma.flint.timeseries._
rt scala.concurrent.duration._
tsRdd = TimeSeriesRDD.fromParquet(
lContext,
th = "hdfs://foo/bar/"
Sorted = true,
meUnit = MILLISECONDS,
lumns = Seq("time", "id", "price"), // By default, null for all columns
gin = "20100101", // By default, null for no boundary at begin
d = "20150101" // By default, null for no boundary at end
Similar to DataFrame
, one could get the schema
of a TimeSeriesRDD
, and perform operations like first()
, cache()
, collect()
, repartition()
, persist()
, etc. Other than those basic operations supported by DataFrame
or RDD
, one could manipulate rows and columns with the following functions.
cast
A function to cast numeric columns to a different numeric type (e.g. DoubleType to IntegerType).
priceTSRdd = ... // A TimeSeriesRDD with schema Schema("time" -> LongType, "id", "price" -> DoubleType)
result = priceTSRdd.cast("price" -> IntegerType)
keepRows
(or deleteRows
) A function to filter rows based on given criteria.
priceTSRdd = ... // A TimeSeriesRDD with columns "time", "id", and "price"
result = priceTSRdd.keepRows { row: Row => row.getAs[Double]("price") > 100.0 }
deleteColumns
(or keepColumns
) A function to filter columns by names.priceTSRdd = ... // A TimeSeriesRDD with columns "time", "id", and "price"
result1 = priceTSRdd.keepColumns("time") // A TimeSeriesRDD with only "time" column
result2 = priceTSRdd.deleteColumns("id") // A TimeSeriesRDD with only "time" and "price" columns
renameColumns
A function to modify column names without changing corresponding data types, e.g.priceTSRdd = ... // A TimeSeriesRDD with columns "time", "id", and "price"
result = priceTSRdd.renameColumns("id" -> "ticker", "price" -> "highPrice")
setTime
A function to modify the time column, e.g.priceTSRdd = ... // A TimeSeriesRDD with columns "time", "id", and "price"
result = priceTSRdd.setTime {
w: Row =>
// Set the new time to the closest trading time to the current time.
nextClosestTradingTime(row.get("id"), row.getAs[Long]("time"))
addColumns
A function to add to a row with one or more new columns whose values are calculated by using only values from a row. For example, we have a TimeSeriesRDD
with three columns “time”, “highPrice”, and “lowPrice”, and we want to add a new column named “diff” to calculte the difference of the “highPrice” and “lowPrice”.priceTSRdd = ... // A TimeSeriesRDD with columns "time", "highPrice", and "lowPrice"
results = priceTSRdd.addColumns(
iff" -> DoubleType -> {
r: Row => r.getAs[Double]("highPrice") - r.getAs[Double]("lowPrice")
TimeSeriesRDD with a new column "diff" = "highPrice" - "lowPrice"
addColumnsForCycle
A cycle is defined as a list of rows that share exactly the same timestamps. For a column to be added and a list of rows in a cycle, one could use this function to add the same or different values for each row in that cycle.priceTSRdd = ...
TimeSeriesRDD with columns "time", "id", and "sellingPrice"
ime id sellingPrice
---------------------
000L 0 1.0
000L 1 2.0
000L 1 3.0
000L 0 3.0
000L 0 4.0
000L 1 5.0
000L 2 6.0
results = priceTSRdd.addColumnsForCycle(
djustedSellingPrice" -> DoubleType -> { rows: Seq[Row] =>
rows.map { row => (row, row.getAs[Double]("sellingPrice") * rows.size) }.toMap
ime id sellingPrice adjustedSellingPrice
------------------------------------------
000L 0 1.0 3.0
000L 1 2.0 6.0
000L 1 3.0 9.0
000L 0 3.0 12.0
000L 0 4.0 16.0
000L 1 5.0 20.0
000L 2 6.0 24.0
A group function is to group rows with nearby (or exactly the same) timestamps.
groupByCycle
A function to group rows within a cycle, i.e. rows with exactly the same timestamps. For example,priceTSRdd = ...
TimeSeriesRDD with columns "time" and "price"
ime price
----------
000L 1.0
000L 2.0
000L 3.0
000L 4.0
000L 5.0
results = priceTSRdd.groupByCycle()
ime rows
-----------------------------------------------
000L [[1000L, 1.0], [1000L, 2.0]]
000L [[2000L, 3.0], [2000L, 4.0], [2000L, 5.0]]
groupByInterval
A funcion to group rows whose timestamps falling into an interval. Intervals could be defined by another TimeSeriesRDD
. Its timestamps will be used to defined intervals, i.e. two sequential timestamps define an interval. For example,priceTSRdd = ...
TimeSeriesRDD with columns "time" and "price"
ime price
----------
000L 1.0
500L 2.0
000L 3.0
500L 4.0
clockTSRdd = ...
TimeSeriesRDD with only column "time"
ime
----
000L
000L
000L
results = priceTSRdd.groupByInterval(clockTSRdd)
ime rows
---------------------------------
000L [[1000L, 1.0], [1500L, 2.0]]
000L [[2000L, 3.0], [2500L, 4.0]]
addWindows
For each row, this function adds a new column whose value for a row is a list of rows within its window
.priceTSRdd = ...
TimeSeriesRDD with columns "time" and "price"
ime price
----------
000L 1.0
500L 2.0
000L 3.0
500L 4.0
result = priceTSRdd.addWindows(Window.pastAbsoluteTime("1000ns"))
ime price window_past_1000ns
-----------------------------------------------------
000L 1.0 [[1000L, 1.0]]
500L 2.0 [[1000L, 1.0], [1500L, 2.0]]
000L 3.0 [[1000L, 1.0], [1500L, 2.0], [2000L, 3.0]]
500L 4.0 [[1500L, 2.0], [2000L, 3.0], [2500L, 4.0]]
A temporal join function is a join function defined by a matching criteria over time. A tolerance
in temporal join matching criteria specifies how much it should look past or look futue.
leftJoin
A function performs the temporal left-join to the right TimeSeriesRDD
, i.e. left-join using inexact timestamp matches. For each row in the left, append the most recent row from the right at or before the same time. An example to join two TimeSeriesRDD
s is as follows.leftTSRdd = ...
rightTSRdd = ...
result = leftTSRdd.leftJoin(rightTSRdd, tolerance = "1day")
futureLeftJoin
A function performs the temporal future left-join to the right TimeSeriesRDD
, i.e. left-join using inexact timestamp matches. For each row in the left, appends the closest future row from the right at or after the same time.result = leftTSRdd.futureLeftJoin(rightTSRdd, tolerance = "1day")
Summarize functions are the functions to apply summarizer(s) to rows within a certain period, like cycle, interval, windows, etc.
summarizeCycles
A function computes aggregate statistics of rows that are within a cycle, i.e. rows share a timestamp.volTSRdd = ...
TimeSeriesRDD with columns "time", "id", and "volume"
ime id volume
-----------
000L 1 100
000L 2 200
000L 1 300
000L 2 400
result = volTSRdd.summarizeCycles(Summary.sum("volume"))
ime volume_sum
---------------
000L 300
000L 700
Similarly, we could summarize over intervals, windows, or the whole time series data set. See
summarizeIntervals
summarizeWindows
addSummaryColumns
One could check timeseries.summarize.summarizer
for different kinds of summarizer(s), like ZScoreSummarizer
, CorrelationSummarizer
, NthCentralMomentSummarizer
etc.
flint.math.stat.regression
aims to provide a library similar to apache statistics package and python statsmodels package.
rt breeze.linalg.DenseVector
rt com.twosigma.flint.math.stat.regression._
enerate a random data set from a linear model with beta = [1.0, 2.0] and intercept = 3.0
data = WeightedLabeledPoint.generateSampleData(sc, DenseVector(1.0, 2.0), 3.0)
it the data using the OLS linear regression.
model = OLSMultipleLinearRegression.regression(data)
etrieve the estimate beta and intercept.
l.estimateRegressionParameters
The following table list different implementations cross different packages or libraries.
flint.math.stat.regression.LinearRegressionModel
apache.commons.math3.stat.regression.OLSMultipleLinearRegression
statsmodels.api
in Python| flint.stat | apache.commons.math3 | statsmodels | | —— | —– | —– | | calculateCenteredTSS | n/a | centered_tss | | calculateHC0 | n/a | cov_HC0 | | calculateHC1 | n/a | cov_HC1 | | calculateEigenvaluesOfGramianMatrix | n/a | eigenvals | | calculateRegressionParametersPValue | n/a | pvalues | | calculateRegressionParametersTStat | n/a | tvalues | | calculateRSquared | n/a | rsquared | | calculateSumOfSquaredResidue | n/a | ssr| | calculateStandardErrorsOfHC0 | n/a | HC0_se | | calculateStandardErrorsOfHC1 | n/a | HC1_se | | calculateUncenteredTSS | n/a | uncentered_tss | | estimateBayesianInformationCriterion | n/a | bic | | estimateAkaikeInformationCriterion | n/a | aic | | estimateLogLikelihood | n/a | loglike | | estimateErrorVariance | estimateErrorVariance | mse_resid | | estimateRegressionParameters | estimateRegressionParameters | params | | estimateRegressionParametersVariance | estimateRegressionParametersVariance | normalized_cov_params | | estimateRegressionParametersStandardErrors | estimateRegressionParametersStandardErrors | bse | | estimateErrorVariance | estimateErrorVariance | scale | | getN | getN | nobs |
In order to accept your code contributions, please fill out the appropriate Contributor License Agreement in the cla
folder and submit it to tsos@twosigma.com.
Apache Spark is a trademark of The Apache Software Foundation. The Apache Software Foundation is not affiliated, endorsed, connected, sponsored or otherwise associated in any way to Two Sigma, Flint, or this website in any manner.
© Two Sigma Open Source, LLC