looker/spark_log_data

Name: spark_log_data

Owner: looker

Description: Flume-to-Spark-Streaming Log Parser

Created: 2016-05-11 16:24:12.0

Updated: 2017-09-20 18:35:08.0

Pushed: 2016-06-03 15:08:47.0

Homepage:

Size: 33

Language: Scala

GitHub Committers

UserMost Recent Commit# Commits

Other Committers

UserEmailMost Recent Commit# Commits

README

Spark Log Parser

Overview

This is a sample Spark Streaming application written in Scala, the purpose of which is to take a stream of logs from Flume, parse the raw logs, create a Spark dataframe, and write the data to Parquet in HDFS. image

Walkthrough

The following walkthrough is meant to get the user up and running with an example in localmode; however, there are a few minor changes?particularly with the Flume set up?that allows this to run over a network and on an entire Spark cluster.

Preliminaries
Flume Setup

Because we're going to create a custom Flume configuration for Spark Streaming, we need to make sure the necessary jars are in the classpath. Flume has a convenient way of doing this using the plugins.d directory structure.

describe source

agent.sources.terminal.type = exec agent.sources.terminal.command = tail -f /home/hadoop/generator/logs/access.log

describe logger sink (in production, pipe raw logs to HDFS)

agent.sinks.logger.type = logger

describe spark sink

agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink agent.sinks.spark.hostname = localhost agent.sinks.spark.port = 9988 agent.sinks.spark.channel = memory1

channel buffers events in memory (used with logger sink)

agent.channels.memory1.type = memory agent.channels.memory1.capacity = 10000 agent.channels.memory1.transactionCapacity = 1000

channel buffers events in memory (used with spark sink)

agent.channels.memory2.type = memory agent.channels.memory2.capacity = 10000 agent.channels.memory2.transactionCapacity = 1000

tie source and sinks with respective channels

agent.sources.terminal.channels = memory1 memory2 agent.sinks.logger.channel = memory1 agent.sinks.spark.channel = memory2

art Flume agent: `./bin/flume-ng agent --conf conf --conf-file conf/logdata.conf --name agent -Dflume.root.logger=INFO,console`

Spark Application
one the repo: `git@github.com:looker/spark_log_data.git`
en `/src/main/resources/application.conf` and set your HDFS output location.
mpile into uber jar: `sbt assembly`
bmit application to Spark: `./bin/spark-submit --master local[2] --class logDataWebinar /spark_log_data/target/scala-2.10/Log\ Data\ Webinar-assembly-1.0.jar localhost 9988 60`

Hive
e going to use the Hive Metastore to interface with our Parquet files by creating an [external table](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-ExternalTables).
re up Hive command-line client: `hive`
eate database: `create database if not exists logdata;`
eate table:

drop table if exists logdata.event;

create external table logdata.event (

ip_address string
, identifier string
, user_id string
, created_at timestamp
, method string
, uri string
, protocol string
, status string
, size string
, referer string
, agent string
, user_meta_info string)

stored as parquet location 'hdfs://YOUR-HDFS-ENDPOINT:PORT/YOUR/PATH/loglines.parquet';

msck repair table logdata.event; / recover partitions /

Thrift Server and Beeline
art Thrift Server: `sudo -u spark HADOOP_USER_NAME=hadoop HIVE_SERVER2_THRIFT_PORT=10001 /usr/lib/spark/sbin/start-thriftserver.sh`
e Beeline to interface with external tables: `./bin/beeline --color=yes -u 'jdbc:hive2://localhost:10001/logdata' -n hadoop`
sue SQL: `select count(*) from logdata.event;`
op Thrift Server: `sudo -u spark /usr/lib/spark/sbin/stop-thriftserver.sh`

This work is supported by the National Institutes of Health's National Center for Advancing Translational Sciences, Grant Number U24TR002306. This work is solely the responsibility of the creators and does not necessarily represent the official views of the National Institutes of Health.