pinterest/pinlater

Name: pinlater

Owner: Pinterest

Description: PinLater is a Thrift service to manage scheduling and execution of asynchronous jobs.

Created: 2015-10-27 22:12:27.0

Updated: 2018-05-11 07:51:16.0

Pushed: 2017-05-12 22:33:26.0

Homepage: https://engineering.pinterest.com/blog/open-sourcing-pinlater-asynchronous-job-execution-system

Size: 142

Language: Java

GitHub Committers

UserMost Recent Commit# Commits

Other Committers

UserEmailMost Recent Commit# Commits

README

Note: This project is no longer actively maintained by Pinterest.


Pinlater

PinLater is a Thrift service to manage scheduling and execution of asynchronous jobs.

Key features
QuickStart
Build
clean compile
Create and install jars
clean package
r ${PINLATER_INSTALL_DIR}
-zxvf target/pinlater-0.1-SNAPSHOT-bin.tar.gz -C ${PINLATER_INSTALL_DIR}
Run Server Locally with MySQL backend
{PINLATER_INSTALL_DIR}
ke sure you have a MySQL instance running locally
 -server -cp .:./*:./lib/* -Dserver_config=pinlater.local.properties -Dbackend_config=mysql.local.json -Dlog4j.configuration=log4j.local.properties com.pinterest.pinlater.PinLaterServer
Run Server Locally with Redis backend
ke sure you have a Redis instance running locally
{PINLATER_INSTALL_DIR}
 -server -cp .:./*:./lib/* -Dserver_config=pinlater.redis.local.properties -Dbackend_config=redis.local.json -Dlog4j.configuration=log4j.local.properties com.pinterest.pinlater.PinLaterServer
Client Tool

A PinLater client tool for correctness and test/performance testing.

Create Queue
 -cp .:./*:./lib/* -Dlog4j.configuration=log4j.local.properties com.pinterest.pinlater.client.PinLaterClientTool --host localhost --port 9010 --mode create  --queue test_queue
Enqueue jobs
 -cp .:./*:./lib/* -Dlog4j.configuration=log4j.local.properties  com.pinterest.pinlater.client.PinLaterClientTool --host localhost --port 9010 --mode enqueue --queue test_queue --num_queries -1 --batch_size 1 --concurrency 5
Dequeue/Ack jobs
 -cp .:./*:./lib/* -Dlog4j.configuration=log4j.local.properties com.pinterest.pinlater.client.PinLaterClientTool --host localhost --port 9010 --mode dequeue --queue test_queue --num_queries -1 --batch_size 50 --concurrency 5 --dequeue_success_percent 95
Look up job
 -cp .:./*:./lib/* -Dlog4j.configuration=log4j.local.properties com.pinterest.pinlater.client.PinLaterClientTool --host localhost --port 9010 --mode lookup --queue test_queue --job_descriptor test_queue:s1d1:p1:1

PinLater job descriptor is formatted as [queue_name][shard_id][priority][local_id].

Get queue names
 -cp .:./*:./lib/* -Dlog4j.configuration=log4j.local.properties com.pinterest.pinlater.client.PinLaterClientTool --host localhost --port 9010 --mode get_queue_names
Get job count
 -cp .:./*:./lib/* -Dlog4j.configuration=log4j.local.properties com.pinterest.pinlater.client.PinLaterClientTool --host localhost --port 9010 --mode get_job_count --queue test_queue --job_state 0 --priority 1 --count_future_jobs false

The job state can be 0, 1, 2, or 3 (corresponds to pending, running, succeeded, and failed respectively).

Customizing your setup

PinLater comes with sample run scripts, properties files and backend configs which can be customized to your setup. Each run script (run_server_local_mysql.sh, run_server_local_redis.sh) contains environment variables which you may modify according to your setup. It?s highly likely that you would need to modify the following variables:

Before running the PinLater service, you?ll need to modify the backend JSON configs to point to the desired backend storage. Note that the MySQL and Redis implementation uses config files with different JSON schema. Check out mysql.local.json and redis.local.json for examples. You will also need to modify the log4j properties file to point to the desired directories for the application logs for the controller, server and thrift server respectively.

There are several important properties that you might also want to modify:

meout for preventing jobs from running forever. Timeout jobs will be 
tried or marked as failed depending on the number of attempts remained.
END_MONITOR_JOB_CLAIMED_TIMEOUT_SECONDS=600
L of failed jobs before they are cleaned up
END_MONITOR_JOB_FAILED_GC_TTL_HOURS=168
L of succeeded jobs before they are cleaned up
END_MONITOR_JOB_SUCCEEDED_GC_TTL_HOURS=24

To start and stop the service:

art the PinLater service with MySQL
pts/run_server_local_mysql.sh start

op the PinLater service with MySQL
pts/run_server_local_mysql.sh stop
h
art the PinLater service with Redis
pts/run_server_local_redis.sh start

op the PinLater service with Redis
pts/run_server_local_redis.sh stop
Usage
Build a worker

A PinLater worker is responsible for continuously dequeuing jobs, execute them and then reply to the PinLater server with a positive or negative ACK, depending on whether the execution succeeded or failed. PinLater allows the worker to optionally specify a retry delay for a job failure. It can be used to implement arbitrary retry policies per job, e.g. constant delay retry, exponential backoff, or a combination thereof.

We provide an example worker as a reference implementation which handles all above. It executes an example PinLater job defined in PinLaterExampleJob.java. For more detail, check out PinLaterExampleWorker.java. To run the example worker:

{PINLATER_INSTALL_DIR}
e the client tool to create test_queue and enqueue jobs
 -server -cp .:./*:./lib/* -Dserverset_path=discovery.pinlater.local -Dlog4j.configuration=log4j.local.properties com.pinterest.pinlater.example.PinLaterExampleWorker

We also provide a Java client used by the example worker and client tool. Client can also be implemented in any Thrift-supported language with the thrift interface. A job is language agnostic and its job body is just a sequence of bytes from the PinLater service?s perspective. Therefore jobs can even be enqueued and dequeued by clients in different languages.

Dequeue rate limiting

PinLater provides per-queue rate limiting that allows an operator to limit the dequeue rate on any queue in the system, or even stop dequeues completely, which can help alleviate load quickly on a struggling backend system, or prevent a slow job from affecting other jobs.

Rate limiting is configured via a JSON config file, which PinLater automatically reloads when any change happen. Here is an example queue config file:


"queues": [
    {
        "name": "pinlater_test_queue",
        "queueConfig": {
            "maxJobsPerSecond": 100
        }
    },
    {
        "name": "pinlater_test_slow_queue",
        "queueConfig": {
            "maxJobsPerSecond": 0.1
        }
    },
    {
        "name": "pinlater_test_paused_queue",
        "queueConfig": {
            "maxJobsPerSecond": 0
        }
    }
]

Rate limiting also depends on ServerSet to figure out how many PinLater servers are active and compute the per-server rate limit. We use a file based ServerSet implementation as an example, which uses a local file that contains a [HOSTNAME]:[PORT] pair on each line. ServerSet can be configured by setting SERVER_SET_ENABLED and SERVER_SET_PATH. If serverset is disable, rate limits will be applied per server.

Storage backends

PinLater has two implementations: one built on top of Redis and one built on top of MySQL. In general services should default to use the MySQL backend as long as the QPS is in the lower to mid range (no more than 1000 QPS per shard). If the QPS is expected to be higher than this, then the Redis implementation should be used. The main advantage using MySQL over Redis is the amount of available space to store jobs is far greater; Redis backed services run the risk of incurring data loss if pending job back ups are not tended to in a few hours. Both MySQL and Redis implementations use a JSON file for backend configuration. (Note: the MySQL backend supports automatic reload of backend configuration, which can help implement features like auto failover. It?s not yet implemented in the Redis backend).

PinLater also supports a dequeue-only mode where a shard only receives dequeue requests but no new jobs will be enqueued. It can help dealing with struggling backend system, or drain a shard before downsizing the cluster. Check out mysql.local.json and redis.local.json for examples.

Monitoring

PinLater provides a set of APIs through the thrift interface for monitoring queue status and managing jobs, including getQueueNames, getJobCount, lookupJob, scanJobs and etc. For more details, please check out the thrift interface

You can also retrieve detailed metrics by running curl localhost:9999/stats.txt on the server. These metrics are exported using Twitter's Ostrich library and are easy to parse. The port can be modified by setting the ostrich_metrics_port property.

Design

If you are interested in the detailed design, check out our blog post about PinLater


This work is supported by the National Institutes of Health's National Center for Advancing Translational Sciences, Grant Number U24TR002306. This work is solely the responsibility of the creators and does not necessarily represent the official views of the National Institutes of Health.