Name: slim
Owner: Hammer Lab
Description: Node server that listens to Spark events, aggregates statistics, and writes them to Mongo
Created: 2015-06-11 15:46:58.0
Updated: 2017-12-08 13:23:19.0
Pushed: 2016-08-26 05:21:57.0
Size: 804
Language: JavaScript
GitHub Committers
User | Most Recent Commit | # Commits |
---|
Other Committers
User | Most Recent Commit | # Commits |
---|
slim
slim
is a Node server that:
spark-json-relay
),The Spree documentation has lots more information about using these components together, which is the intended use case for slim
. Go there to learn more!
If you're just interested in running slim
, e.g. to persist Spark events to Mongo, you can:
Run a mongod
process.
By default, slim
will look for one at http://localhost:3001/meteor (which is where Spree starts it by default).
Download slim
(slim.js
on NPM) and run it:
m install -g slim.js
im
Per the instructions on the JsonRelay
repo, register JsonRelay
as a listener in your spark-shell
or spark-submit
commands:
In Spark >= 1.5.0, use spark-packages:
ckages org.hammerlab:spark-json-relay:2.0.0
nf spark.extraListeners=org.apache.spark.JsonRelay
In Spark < 1.5.0, first download the JAR:
et https://repo1.maven.org/maven2/org/hammerlab/spark-json-relay/2.0.0/spark-json-relay-2.0.0.jar
?then add these flags to your spark-{shell-submit}
commands:
river-class-path spark-json-relay-2.0.0.jar
onf spark.extraListeners=org.apache.spark.JsonRelay
All events emitted by your Spark application will be written to Mongo by slim
!
-h
/--mongo-host
; default localhost
.-p
/--mongo-port
; default 3001
.-d
/--mongo-db
; default meteor
.-u
/--mongo-url
: full Mongo URL, starting with mongodb://
; overrides the above three options if present.-m
/--meteor-url
: URL of a Meteor server running Spree, whose Mongo slim
should write to.-P
/--port
: default 8123
; port to listen for JsonRelay
clients on.--conf spark.slim.port
arg passed to Spark (which also defaults to 8123
, conveniently).--log
: if present, all events received from JsonRelay
clients will be written to this file.test/data/small/input/events.json
for an example file generated in this way.-l
: default info
; log-level passed to tracer
logging utility.debug
, info
, warn
, error
.Probably the easiest way to grok slim
's record types and schemas is to peek at the test/data/small/output
directory.
slim
writes out 13 (!) types of records, each of which corresponds to a noun from the Spark universe (or a “join” of two such nouns):
The nesting above indicates which records are stored and accessed via instances of other record types. For example, when slim
sees a TaskEnd
event, e.g.:
vent": "SparkListenerTaskEnd",
ppId": "local-1437323845748",
tage ID": 1,
tage Attempt ID": 0,
ask Info": {
"Task ID": 6,
"Index": 2,
"Attempt": 0,
?
ask Metrics": { ? },
?it first looks up the App
with appId
equal to "local-1437323845748"
(this field is added to every event by JsonRelay
), then asks that App
for a Stage
matching the "Stage ID"
field, etc.
dump-records.js
This script reads in a file, specified by the --in
flag, that contains logged slim
events (see the --log
option above).
It runs those events through a slim
server (which writes info about them to Mongo) and then dumps the resulting Mongo contents into per-collection JSON files in an output directory specified either as a cmdline argument (if one is present) or by swapping output
for input
in the path to the input log file.
For example, this command will read test/data/small/input/events.json
and produce test/data/small/output/*
:
de test/lib/dump-records.js --in test/data/small
This script was used to generate the test/data/*/output
directories (modulo a few manual tweaks to the resulting JSON).
record.js
This file defines a simple Mongo ORM for the classes defined in the models
directory.
It mixes-in methods that allow instances of the models
classes to be manipulated in ways that should be familiar to Mongo users, e.g. .set
, .inc
, .dec
, etc.
In an effort to keep slim
lean, it is set up to basically only write to Mongo, assuming its in-memory representation of all objects' state matches what is in Mongo. However, on receiving events that refer to an application it doesn't currently have in memory, it pages in any and all Mongo records from all collections that pertain to that application, in case e.g. a previous instance of slim
had processed this applications events before going down.
Currently each Mongo record limits itself to one “in-flight” Mongo-upsert-request at a time; any time one finishes, that record checks whether it has additional queued up changes that have accrued since that request took off, and fires off another upsert if so.
This is necessary because e.g. $set
operations are not commutative. At this point, much of the work has been moved to $inc
/$dec
operators, but a few things still risk ending up in the wrong state if allowed to race, e.g. records' status
field that describes whether they are RUNNING
, SUCCEEDED
, FAILED
, etc.
Additionally, this constraint is useful for easing Mongo thrashing during bursts of activity, e.g. stages that finish large numbers of tasks quickly, so relaxing it is not even really a goal.
Much of the logic in slim
has to do with maintaining various aggregate statistics that it collects. To this end, many fields, typically counts/“totals”, are denormalized onto several different record types.
The quintessential example of this is tasks' “metrics” fields; denormalized totals of these are rolled up on to StageAttempts, Executors, Stage-Executor joins, Jobs, and Applications (the latter storing e.g. the sum of all shuffle read bytes across all tasks in all stages).
Two other notable examples are memory/disk/tachyon usage (stored on blocks, denormalized onto Executors, RDDs, Executor-RDD joins, Applications) and durations (total duration of all tasks is rolled up onto all “parent” records of task attempts).
slim
's test cases and attendant tooling provide a good way to check assumptions about what it will do with various Spark-event input data; checking them out is recommended.
slim >= 1.2.0
is necessary for Spark >= 1.5.0. It has been tested pretty heavily against Spark 1.5.0 and 1.4.*. It's been tested less heavily, but should Just Work?, on Sparks from 1.3.0, when the spark.extraListeners
conf option was added, which JsonRelay
uses to hook in to the driver.
Please file issues if you have any trouble using slim
or have any questions!
In particular, if you see error messages being logged, or if slim
throws and halts, I'd like to know about it. Three really useful pieces of information for debugging, in rough order of decreasing usefulness, are:
slim
with the --log
option, passing a file, and slim
will log all the events it is receiving to that file.slim
's stdout.slim
.