IBM/kafka-streaming-click-analysis

Name: kafka-streaming-click-analysis

Owner: International Business Machines

Description: Use Kafka and Apache Spark streaming to perform click stream analytics

Created: 2017-10-05 12:10:09.0

Updated: 2018-05-03 15:45:39.0

Pushed: 2018-05-03 15:45:37.0

Homepage: https://developer.ibm.com/code/patterns/determine-trending-topics-with-clickstream-analysis/

Size: 504

Language: Jupyter Notebook

GitHub Committers

UserMost Recent Commit# Commits

Other Committers

UserEmailMost Recent Commit# Commits

README

Clickstream Analysis using Apache Spark and Apache Kafka

Read this in other languages: ??.

Data Science Experience is now Watson Studio. Although some images in this code pattern may show the service as Data Science Experience, the steps and processes will still work.

Clickstream analysis is the process of collecting, analyzing, and reporting about which web pages a user visits, and can offer useful information about the usage characteristics of a website.

Some popular use cases for clickstream analysis include:

In this Code Pattern, we will demonstrate how to detect real-time trending topics on the Wikipedia web site. To perform this task, Apache Kafka will be used as a message queue, and the Apache Spark structured streaming engine will be used to perform the analytics. This combination is well known for its usability, high throughput and low-latency characteristics.

When you complete this Code Pattern, you will understand how to:

Flow
  1. User connects with Apache Kafka service and sets up a running instance of a clickstream.
  2. Run a Jupyter Notebook in IBM's Watson Studio that interacts with the underlying Apache Spark service. Alternatively, this can be done locally by running the Spark Shell.
  3. The Spark service reads and processes data from the Kafka service.
  4. Processed Kafka data is relayed back to the user via the Jupyter Notebook (or console sink if running locally).
Included components

Watch the Video

Steps

There are two modes of exercising this Code Pattern:

Run locally
  1. Install Spark and Kafka
  2. Setup and run a simulated clickstream
  3. Run the script
1. Install Spark and Kafka

Install by downloading and extracting a binary distribution from Apache Kafka (0.10.2.1 is the recommended version) and Apache Spark 2.2.0 on your system.

2. Setup and run a simulated clickstream

Note: These steps can be skipped if you already have a clickstream available for processing. If so, create and stream data to the topic named 'clicks' before proceeding to the next step.

Use the following steps to setup a simulation clickstream that uses data from an external publisher:

  1. Download and extract the Wikipedia Clickstream data from here. Since the schema for this data is ever evolving, you may select the data set that was used to test this Code Pattern - 2017_01_en_clickstream.tsv.gz.

  2. Create and run a local Kafka service instance by following the instructions listed here. Be sure to create a topic named clicks.

  3. The Kafka distribution comes with a handy command line utility for uploading data to the Kafka service. To process the simulated Wikipedia data, run the following commands:

Note: Replace ip:port with the correct values of the running Kafka service, which is defaulted to localhost:9092 when running locally.

 kafka_2.10-0.10.2.1
il -200 data/2017_01_en_clickstream.tsv | bin/kafka-console-producer.sh --broker-list ip:port --topic clicks --producer.config=config/producer.properties

Tip: Unix head or tail utilities can be used for conveniently specifying the range of rows to be sent for simulating the clickstream.

3. Run the script

Go to the Spark install directory and bootstrap the Spark shell specifying the correct versions of Spark and Kafka:

 $SPARK_DIR
n/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

In the spark shell prompt, specify the schema of the incoming wikipedia clickstream and parse method:

Tip: For conveniently copying and pasting commands into the spark shell, spark-shell supports a :paste mode

a> import scala.util.Try

a> case class Click(prev: String, curr: String, link: String, n: Long)

a> def parseVal(x: Array[Byte]): Option[Click] = {
val split: Array[String] = new Predef.String(x).split("\\t")
if (split.length == 4) {
  Try(Click(split(0), split(1), split(2), split(3).toLong)).toOption
} else
  None

Setup structured streaming to read from Kafka:

Note: Replace ip:port with the correct values of ip and port of the running Kafka service, which is defaulted to localhost:9092 when running locally.

a> val records = spark.readStream.format("kafka")
                  .option("subscribe", "clicks")
                  .option("failOnDataLoss", "false")
                  .option("kafka.bootstrap.servers", "ip:port").load()

Process the records:

a>
val messages = records.select("value").as[Array[Byte]]
                 .flatMap(x => parseVal(x))
                 .groupBy("curr")
                 .agg(Map("n" -> "sum"))
                 .sort($"sum(n)".desc)

Output to the console and start streaming data (using the tail clickstream command descibed above):

query = messages.writeStream
          .outputMode("complete")
          .option("truncate", "false")
          .format("console")
          .start()

a> -------------------------------------------
h: 0

------------------------------------------+-------+
r                                         |sum(n) |
------------------------------------------+-------+
in_Rossdale                               |1205584|
reakable_(film)                           |1100870|
_Affleck                                  |939473 |
queline_Kennedy_Onassis                   |926204 |
_Cruise                                   |743553 |
kie_Chan                                  |625123 |
rge_Washington                            |622800 |
l_Belichick                               |557286 |
y,_Queen_of_Scots                         |547621 |
_Man_in_the_High_Castle                   |529446 |
nt_Eastwood                               |526275 |
oncé                                      |513177 |
ted_States_presidential_line_of_succession|490999 |
rlock_Holmes                              |477874 |
ona_Ryder                                 |449984 |
anic_(1997_film)                          |400197 |
ergate_scandal                            |381000 |
sica_Biel                                 |379224 |
rick_Swayze                               |373626 |
------------------------------------------+-------+
 showing top 20 rows

The resultant table shows the Wikipedia pages that had the most hits. This table updates automatically whenever more data arrives from Kafka. Unless specified otherwise, structured streaming performs processing as soon as it sees any data.

Here we assume the higher number of clicks indicates a “Hot topic” or “Trending topic”. Please feel free to contribute any ideas on how to improve this, or thoughts on any other types of clickstream analytics that can be done.

Run using a Jupyter notebook in the IBM Watson Studio
  1. Create a new Watson Studio project
  2. Associate a Spark service
  3. Create the notebook
  4. Run the notebook
  5. Upload data
  6. Save and Share

Note: Running this part of the Code Pattern requires a Message Hub service, which charges a nominal fee.

1. Create a new Watson Studio project

2. Associate a Spark service

It should now appear in your Services list.

3. Create the notebook

s://raw.githubusercontent.com/IBM/kafka-streaming-click-analysis/master/notebooks/Clickstream_Analytics_using_Apache_Spark_and_Message_Hub.ipynb/pixiedust_facebook_analysis.ipynb
4. Run the notebook

Before running the notebook, you will need to setup a Message Hub service.

The notebook is now ready to be run. The first step in the notebook is to insert credentials for the Message Hub connection you just created. To do this, start the notebook in edit mode and select code cell '[1]'. Then click on the 1001 button located in the top right corner of the notebook. Select the Connections tab to see your Message Hub connector. Click the Insert to code button to download the Message Hub credentials data into code cell [1].

Note: Make sure you rename the credentials object to credentials_1.

When a notebook is executed, what is actually happening is that each code cell in the notebook is executed, in order, from top to bottom.

Each code cell is selectable and is preceded by a tag in the left margin. The tag format is In [x]:. Depending on the state of the notebook, the x can be:

There are several ways to execute the code cells in your notebook:

5. Upload data

For uploading data to the Message Hub or Apache Kafka as a service, use the kafka command line utility. Using the detailed instructions found in the Setup and run a simulated clickstream section above, you need to:

1) Download the Wikipedia data. 2) Download the Kafka distribution binary. 3) Download config/messagehub.properties config file and update message hub credentials, found in the credentials section of the notebook. (Please note: Ignore extra set of double quotes in the password(if any), while copying it.)

After downloading and extracting the Kafka distribution binary and the data, run the command as follows:

Note: Replace ip:port with the kafka_brokers_sasl value found in the credentials section of the notebook, described in previous step.

 kafka_2.10-0.10.2.1
il -200 data/2017_01_en_clickstream.tsv | bin/kafka-console-producer.sh --broker-list ip:port --request-timeout-ms 30000 --topic clicks --producer.config=config/messagehub.properties
6. Save and Share
How to save your work:

Under the File menu, there are several ways to save your notebook:

How to share your work:

You can share your notebook by selecting the ?Share? button located in the top right section of your notebook panel. The end result of this action will be a URL link that will display a ?read-only? version of your notebook. You have several options to specify exactly what you want shared from your notebook:

Learn more

License

Apache 2.0


This work is supported by the National Institutes of Health's National Center for Advancing Translational Sciences, Grant Number U24TR002306. This work is solely the responsibility of the creators and does not necessarily represent the official views of the National Institutes of Health.