Name: spark_log_data
Owner: looker
Description: Flume-to-Spark-Streaming Log Parser
Created: 2016-05-11 16:24:12.0
Updated: 2017-09-20 18:35:08.0
Pushed: 2016-06-03 15:08:47.0
Size: 33
Language: Scala
GitHub Committers
User | Most Recent Commit | # Commits |
---|
Other Committers
User | Most Recent Commit | # Commits |
---|
This is a sample Spark Streaming application written in Scala, the purpose of which is to take a stream of logs from Flume, parse the raw logs, create a Spark dataframe, and write the data to Parquet in HDFS.
The following walkthrough is meant to get the user up and running with an example in localmode; however, there are a few minor changes?particularly with the Flume set up?that allows this to run over a network and on an entire Spark cluster.
Because we're going to create a custom Flume configuration for Spark Streaming, we need to make sure the necessary jars are in the classpath. Flume has a convenient way of doing this using the plugins.d
directory structure.
he-flume-1.6.0-bin/
ins.d/
k/
b/
libext/
commons-lang3-3.3.2.jar
scala-library-2.10.5.jar
spark-assembly-1.5.2-hadoop2.6.0-amzn-2.jar
ark-streaming-flume-assembly_2.10-1.6.1.jar
ark-streaming-flume-sink_2.10-1.6.1.jar
conf/logdata.conf
):me the components of agent
t.sources = terminal
t.sinks = logger spark
t.channels = memory1 memory2
agent.sources.terminal.type = exec agent.sources.terminal.command = tail -f /home/hadoop/generator/logs/access.log
agent.sinks.logger.type = logger
agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink agent.sinks.spark.hostname = localhost agent.sinks.spark.port = 9988 agent.sinks.spark.channel = memory1
agent.channels.memory1.type = memory agent.channels.memory1.capacity = 10000 agent.channels.memory1.transactionCapacity = 1000
agent.channels.memory2.type = memory agent.channels.memory2.capacity = 10000 agent.channels.memory2.transactionCapacity = 1000
agent.sources.terminal.channels = memory1 memory2 agent.sinks.logger.channel = memory1 agent.sinks.spark.channel = memory2
art Flume agent: `./bin/flume-ng agent --conf conf --conf-file conf/logdata.conf --name agent -Dflume.root.logger=INFO,console`
Spark Application
one the repo: `git@github.com:looker/spark_log_data.git`
en `/src/main/resources/application.conf` and set your HDFS output location.
mpile into uber jar: `sbt assembly`
bmit application to Spark: `./bin/spark-submit --master local[2] --class logDataWebinar /spark_log_data/target/scala-2.10/Log\ Data\ Webinar-assembly-1.0.jar localhost 9988 60`
Hive
e going to use the Hive Metastore to interface with our Parquet files by creating an [external table](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-ExternalTables).
re up Hive command-line client: `hive`
eate database: `create database if not exists logdata;`
eate table:
drop table if exists logdata.event;
create external table logdata.event (
ip_address string
, identifier string
, user_id string
, created_at timestamp
, method string
, uri string
, protocol string
, status string
, size string
, referer string
, agent string
, user_meta_info string)
stored as parquet location 'hdfs://YOUR-HDFS-ENDPOINT:PORT/YOUR/PATH/loglines.parquet';
msck repair table logdata.event; / recover partitions /
Thrift Server and Beeline
art Thrift Server: `sudo -u spark HADOOP_USER_NAME=hadoop HIVE_SERVER2_THRIFT_PORT=10001 /usr/lib/spark/sbin/start-thriftserver.sh`
e Beeline to interface with external tables: `./bin/beeline --color=yes -u 'jdbc:hive2://localhost:10001/logdata' -n hadoop`
sue SQL: `select count(*) from logdata.event;`
op Thrift Server: `sudo -u spark /usr/lib/spark/sbin/stop-thriftserver.sh`