hammerlab/dask-distributed-on-kubernetes

Name: dask-distributed-on-kubernetes

Owner: Hammer Lab

Description: Deploy dask-distributed on google container engine using kubernetes

Created: 2016-07-12 17:59:05.0

Updated: 2017-12-03 18:32:15.0

Pushed: 2016-09-21 22:21:14.0

Homepage:

Size: 1672

Language: Jupyter Notebook

GitHub Committers

UserMost Recent Commit# Commits

Other Committers

UserEmailMost Recent Commit# Commits

README

Running on kubernetes on google container engine

This small repo gives an example Kubernetes configuration for running dask.distributed on Google Container Engine.

Start a cluster if needed

If you don't already have a cluster running, use a command like the following to start one (here it is called “daskd-cluster”):

ud container clusters create daskd-cluster \
--zone us-east1-b \
--num-nodes=2 \
--enable-autoscaling --min-nodes=1 --max-nodes=100 \
--machine-type=n1-highmem-16

You should see your cluster: https://console.cloud.google.com/kubernetes/list

Then run this to set it as the default for your session:

ud config set container/cluster daskd-cluster
ud container clusters get-credentials daskd-cluster
Deploy dask distributed

You will want to edit spec.yaml to use the docker image appropriate for your task. You may also want to customize the CPU and memory thresholds requested based on what's required for your task.

This will launch a dask.distributed scheduler and one worker:

ctl create -f spec.yaml

You can check how many workers are running with:

ctl get pods

Now, scale up the deployment. Here we request 100 workers:

ctl scale deployment daskd-worker --replicas=100

You can now run kubectl get pods again to check when the workers are started.

You can check on a worker's stdin/stdout with (replace the name with a pod name from kubectl get pods):

ctl logs daskd-scheduler-3680716393-j19xr
Run your analysis

First, get the IP of the scheduler (you want the external ip of daskd-scheduler):

bectl get service
              CLUSTER-IP    EXTERNAL-IP       PORT(S)    AGE
d-scheduler   10.3.249.60   104.196.185.187   8786/TCP   4m
rnetes        10.3.240.1    <none>            443/TCP    17h

For scripting, here's a one-liner for getting the IP:

_IP=$(kubectl get service | grep daskd-scheduler | tr -s ' ' | cut -d ' ' -f 3)

When you instantiate your dask Executor, just pass in the IP and port:

 math import sqrt
 dask.distributed import Executor
 dask import delayed

nt = Executor("104.196.185.187:8786")
s = [dask.delayed(sqrt)(i) for i in range(100)]
lts = client.compute(tasks, sync=True)
t(results)
Tearing it down

When you're done, shut down the service and cluster:

ctl delete -f spec.yaml
ud container clusters delete daskd-cluster
Running a benchmark

We also include a simple benchmark script that will test performance of the cluster with varying numbers of workers (it issues kubectl calls itself to change the number of workers). See the script for details. Here's an example invocation:

_IP=$(kubectl get service | grep daskd-scheduler | tr -s ' ' | cut -d ' ' -f 3)
on benchmark.py \
--tasks 5000 \
--task-time .05 \
--dask-scheduler $DASK_IP:8786 \
--jobs-range 200 800 200 \
--replicas 1 \
--out results2.csv

This work is supported by the National Institutes of Health's National Center for Advancing Translational Sciences, Grant Number U24TR002306. This work is solely the responsibility of the creators and does not necessarily represent the official views of the National Institutes of Health.