Name: dask-distributed-on-kubernetes
Owner: Hammer Lab
Description: Deploy dask-distributed on google container engine using kubernetes
Created: 2016-07-12 17:59:05.0
Updated: 2017-12-03 18:32:15.0
Pushed: 2016-09-21 22:21:14.0
Size: 1672
Language: Jupyter Notebook
GitHub Committers
User | Most Recent Commit | # Commits |
---|
Other Committers
User | Most Recent Commit | # Commits |
---|
This small repo gives an example Kubernetes configuration for running dask.distributed on Google Container Engine.
If you don't already have a cluster running, use a command like the following to start one (here it is called “daskd-cluster”):
ud container clusters create daskd-cluster \
--zone us-east1-b \
--num-nodes=2 \
--enable-autoscaling --min-nodes=1 --max-nodes=100 \
--machine-type=n1-highmem-16
You should see your cluster: https://console.cloud.google.com/kubernetes/list
Then run this to set it as the default for your session:
ud config set container/cluster daskd-cluster
ud container clusters get-credentials daskd-cluster
You will want to edit spec.yaml to use the docker image appropriate for your task. You may also want to customize the CPU and memory thresholds requested based on what's required for your task.
This will launch a dask.distributed scheduler and one worker:
ctl create -f spec.yaml
You can check how many workers are running with:
ctl get pods
Now, scale up the deployment. Here we request 100 workers:
ctl scale deployment daskd-worker --replicas=100
You can now run kubectl get pods
again to check when the workers are started.
You can check on a worker's stdin/stdout with (replace the name with a pod name from kubectl get pods
):
ctl logs daskd-scheduler-3680716393-j19xr
First, get the IP of the scheduler (you want the external ip of daskd-scheduler):
bectl get service
CLUSTER-IP EXTERNAL-IP PORT(S) AGE
d-scheduler 10.3.249.60 104.196.185.187 8786/TCP 4m
rnetes 10.3.240.1 <none> 443/TCP 17h
For scripting, here's a one-liner for getting the IP:
_IP=$(kubectl get service | grep daskd-scheduler | tr -s ' ' | cut -d ' ' -f 3)
When you instantiate your dask Executor, just pass in the IP and port:
math import sqrt
dask.distributed import Executor
dask import delayed
nt = Executor("104.196.185.187:8786")
s = [dask.delayed(sqrt)(i) for i in range(100)]
lts = client.compute(tasks, sync=True)
t(results)
When you're done, shut down the service and cluster:
ctl delete -f spec.yaml
ud container clusters delete daskd-cluster
We also include a simple benchmark script that will test performance of the cluster with varying numbers of workers (it issues kubectl
calls itself to change the number of workers). See the script for details. Here's an example invocation:
_IP=$(kubectl get service | grep daskd-scheduler | tr -s ' ' | cut -d ' ' -f 3)
on benchmark.py \
--tasks 5000 \
--task-time .05 \
--dask-scheduler $DASK_IP:8786 \
--jobs-range 200 800 200 \
--replicas 1 \
--out results2.csv