Name: spark-cep
Owner: Samsung
Description: Spark CEP is an extension of Spark Streaming to support SQL-based query processing
Created: 2015-10-21 05:32:40.0
Updated: 2018-01-14 10:31:25.0
Pushed: 2017-04-12 07:18:18.0
Homepage: null
Size: 59
Language: Scala
GitHub Committers
User | Most Recent Commit | # Commits |
---|
Other Committers
User | Most Recent Commit | # Commits |
---|
Spark CEP is a stream processing engine on top of Apache Spark supporting continuous query language. It has following improvements comparing to the existing Spark Streaming query engines.
StreamSQLContext is the main entry point for all streaming sql related functionalities. StreamSQLContext can be created by:
ssc: StreamingContext
sqlContext: SQLContext
streamSqlContext = new StreamSQLContext(ssc, sqlContext)
Or you could use HiveContext to get full Hive semantics support, like:
ssc: StreamingContext
hiveContext: HiveContext
streamSqlContext = new StreamSQLContext(ssc, hiveContext)
class Person(name: String, age: String)
reate an DStream of Person objects and register it as a stream.
people: DStream[Person] = ssc.socketTextStream(serverIP, serverPort)
ap(_.split(","))
ap(p => Person(p(0), p(1).toInt))
schemaPeopleStream = streamSqlContext.createSchemaDStream(people)
maPeopleStream.registerAsTable("people")
teenagers = sql("SELECT name FROM people WHERE age >= 10 && age <= 19")
he results of SQL queries are themselves DStreams and support all the normal operations
agers.map(t => "Name: " + t(0)).print()
start()
awaitTerminationOrTimeout(30 * 1000)
stop()
userStream: DStream[User]
amSqlContext.registerDStreamAsTable(userStream, "user")
itemStream: DStream[Item]
amSqlContext.registerDStreamAsTable(itemStream, "item")
"SELECT * FROM user JOIN item ON user.id = item.id").print()
historyItem: DataFrame
oryItem.registerTempTable("history")
"SELECT * FROM user JOIN item ON user.id = history.id").print()
"
|SELECT t.word, COUNT(t.word)
|FROM (SELECT * FROM test) OVER (WINDOW '9' SECONDS, SLIDE '3' SECONDS) AS t
|GROUP BY t.word
".stripMargin)
"
|SELECT * FROM
| user1 OVER (WINDOW '9' SECONDS, SLIDE '6' SECONDS) AS u
|JOIN
| user2 OVER (WINDOW '9' SECONDS, SLIDE '6' SECONDS) AS v
|ON u.id = v.id
|WHERE u.id > 1 and u.id < 3 and v.id > 1 and v.id < 3
""".stripMargin)
Note: For time-based windowing join, the sliding size should be same for all the joined streams. This is the limitation of Spark Streaming.
amSqlContext.command(
s"""
|CREATE TEMPORARY TABLE t_kafka (
|word string
|,num int
|)
|USING org.apache.spark.sql.streaming.sources.KafkaSource
|OPTIONS(
|zkQuorum "10.10.10.1:2181",
|brokerList "10.10.10.1:9092,10.10.10.2:9092",
|groupId "test",
|topics "aa:10",
|messageToRow "org.apache.spark.sql.streaming.sources.MessageDelimiter")
""".stripMargin)
Spark CEP is built with sbt, you could use sbt related commands to test/compile/package.
Spark CEP is built on >= Spark-1.5, you could change the Spark version in Build.scala to the version you wanted, currently Spark CEP can be worked with Spark version 1.5+.
To use Spark CEP, put the packaged jar into your environment where Spark could access, you could use spark-submit –jars or other ways.
ARK_HOME}/bin/spark-submit \
--class StreamHQL \
--name "CQLDemo" \
--master yarn-cluster \
--num-executors 4 \
--driver-memory 256m \
--executor-memory 512m \
--executor-cores 1 \
--conf spark.default.parallelism=5 \
lib/spark-cep-assembly-0.1.0-SNAPSHOT.jar \
"{ \
\"kafka.zookeeper.quorum\": \"10.10.10.1:2181\", \
\"redis.shards\": \"shard1\",\
\"redis.sentinels\": \"10.10.10.2:26379\",\
\"redis.database\": \"0\", \
\"redis.expire.sec\": \"600\", \
\"spark.sql.shuffle.partitions\": \"10\" \
}" \
sample_query \
SELECT COUNT(DISTINCT t.duid) FROM stream_test OVER (WINDOW '300' SECONDS, SLIDE '5' SECONDS) AS t
There are few arguments being passed to the Spark CEP job.
First, it requires zookeeper url (kafka.zookeeper.quorum
) for consuming stream from Kafka.
Since it stores the result within a window to redis, it also requires Redis connection information.
You can pass continuous query against a Kafka topic (stream_test
).
If you want to contribute our project, please refer to Governance
Contact: Robert B. Kim, Jun-Seok Heo