GoogleCloudPlatform/nyc-taxirides-stream-feeder

Name: nyc-taxirides-stream-feeder

Owner: Google Cloud Platform

Description: Tool generates emulated data stream based on the NYC Taxi & Limousine Commission?s open dataset expanded with additional routing information using the Google Maps Direction API and interpolated timestamps to simulate a real time scenario.

Created: 2017-11-03 14:03:00.0

Updated: 2018-05-17 00:20:04.0

Pushed: 2018-04-03 19:56:12.0

Homepage: https://cloud.google.com/blog/big-data/2017/01/learn-real-time-processing-with-a-new-public-data-stream-and-google-cloud-dataflow-codelab

Size: 1214

Language: Go

GitHub Committers

UserMost Recent Commit# Commits

Other Committers

UserEmailMost Recent Commit# Commits

README

PubSub Taxi Rides feeder program written in Go

The emulated data stream generated by this tool is based on the NYC Taxi & Limousine Commission?s open dataset expanded with additional routing information using the Google Maps Direction API and interpolated timestamps to simulate a real time scenario.

Codelab

You can find a Cloud Dataflow codelab using the same message format this feeder app is generating at https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon.

Public Pub/Sub Topic

There is a public Cloud Pub/Sub topic driven by this app.

Sample Data & Prerequisites

We generated one week worth of taxi rides data (from 2015) that you can find in the pubsub-public-billion-taxi-rides bucket with prefix 'json/yellow_tripdata_2015-01-ext…'

To see all available files and check the format you can do gsutil ls gs://pubsub-public-billion-taxi-rides/json

You need a Google Cloud Platform Project with Cloud Pub/Sub APIs enabled.

Create a Pub/Sub Topic to stream the taxi rides to. In the docs we use realtime-feed for the real time streaming.

Running the program locally

If you run the feeder locally it uses your gcloud auth context. Make sure to set the right user and project. You might need to do gcloud auth login after you switched to the user you want to use.

When using the Google Cloud SDK gcloud tool with a normal user account, Pub/Sub operations are limited to a rate suitable for manual operations. You can activate a service account described in the GCloud Docs

The program uses default credentials from gcloud env if no service-account.json is present.

To use a service account for high QPS publishing, create a Compute Engine default service account in JSON format in Google Cloud Developer Console and save it to service-account.json.

First build the feeder binary with

uild feeder.go publisher.go scheduler.go taxirides.go debug.go storage.go pubsub.go

It's possible to use environment variables, a config -config properties.conf file or parameters. You can then use the following command to run the feeder (detailed help with ./feeder --help).

Replace <YOUR_PROJECT_ID> with your Google Cloud Platform Project ID.

ET=pubsub-public-billion-taxi-rides \
ECT=<YOUR_PROJECT_ID> \
PREFIX=json/yellow_tripdata_2015-01-ext0000 \
UBTOPIC=realtime-feed \
G=true \
eder

Remove DEBUG=true if you don't want to get the debug output on stdout.

To run the binary with a config file do

eder -config properties.conf

The config file has the following format.

Replace <YOUR_PROJECT_ID> with your Google Cloud Platform Project ID.

et pubsub-public-billion-taxi-rides
ect <YOUR_PROJECT_ID>
prefix json/yellow_tripdata_2015-01-ext0000
ubtopic realtime-feed
g true

Alternatively you can build and run a Docker container. Build the container with make container. This uses multi-stage docker builds to build a small container based on alpine running the feeder.

To run the container locally create a properties.env as follows:

Replace <YOUR_PROJECT_ID> with your Google Cloud Platform Project ID.

 -e "\
ET=pubsub-public-billion-taxi-rides\n\
ECT=<YOUR_PROJECT_ID>\n\
PREFIX=json/yellow_tripdata_2015-01-ext0000\n\
UBTOPIC=realtime-feed\n\
G=true" > properties.env

and then run the container. You'll need a service account if you don't run it on GCE.

er run --rm -v $PWD/service-account.json:/service-account.json --env-file=properties.env feeder:latest
Example config reading from JSON formatted input

Replace <YOUR_PROJECT_ID> with your Google Cloud Platform Project ID.

ET=pubsub-public-billion-taxi-rides \
ECT=<REPLACER_YOUR_PROJECT_ID> \
PREFIX=json/yellow_tripdata_2015-01-ext000 \
UBTOPIC=realtime-feed \
G=true
Running the program on GCE

Install gcloud auth helper for docker with gcloud auth configure-docker.\ Build the container and push to Google Cloud Container Registry with make push-gcr.

Create a GCE instance selecting CoreOS stable. Make sure to give the instance API access to Google Cloud Pub/Sub and Google Cloud Storage.

Run the following commands under root (sudo su -) after you ssh into your new GCE instance. Replace <YOUR_PROJECT_ID> with your Google Cloud Platform Project ID.

 -e "\
ET=pubsub-public-billion-taxi-rides\n\
ECT=<YOUR_PROJECT_ID>\n\
PREFIX=json/yellow_tripdata_2015-01-ext000\n\
UBTOPIC=realtime-feed\n\
G=true" > /root/properties.env

To fetch and run the container from the Container Registry run the following commands. Replace <YOUR_PROJECT_ID> with your Google Cloud Platform Project ID, <MEMORY_BOUNDARY> e.g. with '10g' within the limits of your GCE instance and <TOKEN_OUTPUT_OF_PREVIOUS_CMD> after you run gcloud auth before you run docker login.

 su -
ud auth application-default print-access-token
er login -u oauth2accesstoken -p "<TOKEN_OUTPUT_OF_PREVIOUS_CMD>" https://gcr.io
er -- pull gcr.io/<YOUR_PROJECT_ID>/feeder
er run --restart unless-stopped -m=<MEMORY_BOUNDARY> -d \
v-file=properties.env gcr.io/<PROJECT_NAME>/feeder:latest

To stop your container, find the running container id with docker ps and run docker stop <container-id>

Advanced Configuration, Speedup, Scale-Out, Memory Limit

There are a couple configuration parameters to simulate a high rate of Cloud PubSub message ingestion. The config parameters SPEEDUP, SKIPRIDES, SKIPOFFSET, STARTREFTIME and LOOP help you with running a long running high rate message stream.

SPEEDUP is used to speedup the realtime feed that is generated from the input dataset by a factor X.

SKIPRIDES is used to skip any n-th ride from the input dataset. This is used to lower the rate of messages for a single feeder instance and distribute the load to multiple feeder instances.

SKIPOFFSET is used to shift the modulo of SKIPRIDES by n. It should be between 0 and SKIPRIDES - 1.

STARTREFTIME is used to syncronize all feeder instances for a realistic scalable rides streaming. The format is 2015-01-04 20:00:00 based on the timezone of the dataset, by default America/New_York. The dataset timezone can be set with DATASETLOCATION.

LOOP is used for long running rides streaming. It resets the refTime by the same duration on all instances and re-reads the input dataset in a loop.

MAXSCHEDULERS restricts the maximum parallel ride schedulers and thus restricting how much memory is used for in-memory pending schedulers. It'll slow down file parsing.

DUP=<SPEEDUP_FACTOR>\n\
RIDES=<MODULO_N_TO_SKIP_RIDES_AND_LOWER_QPS>\n\
OFFSET=<SKIPRIDES_MODULO_OFFSET_FOR_SCALEOUT>\n\
TREFTIME=<START_DATASET_REFTIME>\n\
=[true|false]\n\

There are more options available if you use your own dataset. Please refer to ./feeder --help for details.

Filtering invalid data

The source dataset contains several rides with very long durations. To avoid running out of memory for continuous running feeders we filter those with an upper limit of 6 hours. You can find those rides by using BigQuery on the public available NYC Taxi Rides dataset.

ndardSQL
CT TIMESTAMP_DIFF(dropoff_datetime, pickup_datetime, MINUTE) as trip_time, *
 `bigquery-public-data.new_york.tlc_yellow_trips_2015`
E TIMESTAMP_DIFF(dropoff_datetime, pickup_datetime, MINUTE) > 360
Vendor Packaging

We use govendor (go get -u github.com/kardianos/govendor) as the vendor package manager.

Contributing

Contributions to this repository are always welcome and highly encouraged.

See CONTRIBUTING for more information on how to get started.

License

Apache 2.0 - See LICENSE for more information.

Use: The NYC Taxi & Limousine Commission?s dataset is publicly available for anyone to use under the following terms provided by the Dataset Source ?https://data.cityofnewyork.us/? and is provided “AS IS” without any warranty, express or implied, from Google. Google disclaims all liability for any damages, direct or indirect, resulting from the use of the dataset.

This is not an official Google product


This work is supported by the National Institutes of Health's National Center for Advancing Translational Sciences, Grant Number U24TR002306. This work is solely the responsibility of the creators and does not necessarily represent the official views of the National Institutes of Health.