dask/dask-gke

Name: dask-gke

Owner: dask

Description: kubernetes setup to bootstrap distributed on google container engine

Created: 2017-03-23 18:58:38.0

Updated: 2018-04-20 02:46:27.0

Pushed: 2018-03-05 21:49:14.0

Homepage: null

Size: 689

Language: Python

GitHub Committers

UserMost Recent Commit# Commits

Other Committers

UserEmailMost Recent Commit# Commits

README

Google Container Engine provisioning of a Dask Distributed cluster

Historical Note: this repository is somewhat old and not actively maintained. For more general documentation on deploying Dask on Kubernetes or Cloud clusters we recommend referring to the Dask setup documentation

This repo hosts some sample configuration to set up Kubernetes containerized environments for interactive cluster computing in Python with Jupyter notebook Dask and other tools from the PyData and SciPy ecosystems.

The Kubernetes API is provided as a managed service by the following public cloud providers:

Alternatively it is possible to install and manage Kubernetes by yourself.

We will briefly describe usage assuming Google Container Engine (GKE).

The dask-gke image

The Dockerfile file in this repo can be used to build a docker image with all the necessary tools to run our cluster, in particular:

This image will be used to run 3 types of services:

Setup with Google Container Engine

You will need to install the following:

Register on the Google Cloud Platform, setup a billing account and create a project with the Google Compute Engine API enabled.

Ensure that your client SDK is up to date:

loud components update

Install dask-gke CLI via:

thon setup.py install
Usage

Default settings for the cluster are stored in defaults.yaml

The easiest way to customize the cluster to your own purposes is to make a copy of this file, edit it, and supply it on the command line. The settings used for a new cluster are a combination of the built-in settings, any new values in a supplied file, and command-line options

To launch with default values only (where NAME is the label for the cluster):

-gke create NAME

To launch with a provided file:

-gke create NAME settings.yaml

To launch with a single override parameter

-gke create -s jupyter.port=443 NAME

By default, the process will block until done, and then print details about the created cluster to the screen, including the addresses of the dask-scheduler, the jupyter notebook, and the Bokeh status monitor. This same information can be retrieved again with the info command. Most users will want to navigate to the notebook first, which can also be achieved by calling

-gke notebook NAME

and similarly, the status command opens the cluster status page, or lab brings up the new “jupyterlab” IDE.

From within the cluster, you can connect to the distributed scheduler by doing the following:

 dask.distributed import Client
Client('dask-scheduler:8786')

When you are done, delete the cluster with the following:

-gke delete NAME

Note that this asks for confirmation potentially multiple times - you might wish to prepend with yes | (bash syntax) for automatic confirmation.

Extras

dask-gke work by calling kubectl. For those who want finer control or to investigate the state of the cluster, kubectl commands can be entered on the command line as for any other Kubernetes cluster. Furthermore, the Kubernetes dashboard is available using

-gke dashboard NAME

(note that, unlike the other commands which open browser tabs, this command is blocking on the command line, since it needs to maintain a proxy connection.)

Resize cluster

The dask workers live within containers on Google virtual machines. To get more processing power and memory, you must both increase the number of machines and the number of containers.

To add machines to the cluster, you may do the following

-gke resize nodes NAME COUNT

(of course, the more machines, the higher the bill will be)

To add worker containers, you may do the following

-gke resize pods NAME COUNT

or resize both while keeping the workers:nodes ratio constant

-gke resize both NAME COUNT

(you give the new number of workers requested).

Note that if you allocate more resources than your cluster can handle, some pods will not start.

To see the state of the worker pods, use kubectl or the Kubernetes dashboard.

Node Autoscaling

Kubernetes can automatically add or remove nodes to your cluster if you create the cluster with autoscaling enabled. Nodes will be added if worker pods can't be scheduled on the existing cluster, and removed if nodes are going unused.

Note that autoscaling affects the number of machines in the cluster (and consequently the cost of the cluster!), not the number of Dask workers, and must be turned on when the cluster is created.

To enable autoscaling, change the appropriate line in defaults.yaml or run:

-gke create NAME -s cluster.autoscaling=True -s cluster.min_nodes=MIN -s cluster.max_nodes=MAX
Logs

we can get the logs of a specific pod with kubectl logs:

bectl logs -f dask-scheduler-hebul
ributed.scheduler - INFO - Scheduler at:       10.115.249.189:8786
ributed.scheduler - INFO -      http at:       10.115.249.189:9786
ributed.scheduler - INFO -  Bokeh UI at:  http://10.115.249.189:8787/status/
ributed.core - INFO - Connection from 10.112.2.3:50873 to Scheduler
ributed.scheduler - INFO - Register 10.112.2.3:59918
ributed.scheduler - INFO - Starting worker compute stream, 10.112.2.3:59918
ributed.core - INFO - Connection from 10.112.0.6:55149 to Scheduler
ributed.scheduler - INFO - Register 10.112.0.6:55103
ributed.scheduler - INFO - Starting worker compute stream, 10.112.0.6:55103
h.command.subcommands.serve - INFO - Check for unused sessions every 50 milliseconds
h.command.subcommands.serve - INFO - Unused sessions last for 1 milliseconds
h.command.subcommands.serve - INFO - Starting Bokeh server on port 8787 with applications at paths ['/status', '/tasks']
ributed.core - INFO - Connection from 10.112.1.1:59452 to Scheduler
ributed.core - INFO - Connection from 10.112.1.1:59453 to Scheduler
ributed.core - INFO - Connection from 10.112.1.4:48952 to Scheduler
ributed.scheduler - INFO - Register 10.112.1.4:54760
ributed.scheduler - INFO - Starting worker compute stream, 10.112.1.4:54760
Run commands

we can also execute arbitrary commands inside the running containers with kubectl exec, for instance to open an interactive shell session for debugging purposes:

bectl exec -ti dask-scheduler-hebul bash
@dscheduler-hebul:/work# ls -l examples/
l 56
r--r-- 1 basicuser root  1344 May 17 11:29 distributed_joblib_backend.py
r--r-- 1 basicuser root 33712 May 17 11:29 sklearn_parameter_search.ipynb
r--r-- 1 basicuser root 14407 May 17 11:29 sklearn_parameter_search_joblib.ipynb

where “dask-scheduler-hebul” is the specific pod name of the scheduler.

It is, of course, also possible to run shell commands directly in the Jupyter notebook, or to use python's subprocess with dask's Client.run to programmatically call commands on the worker containers.

Alternate docker image

Each type of pod in dask-gke currently is founded on the docker image mdurant/dask-gke:latest. The Dockerfile is included in this repo. Users may wish to alter particularly the conda/pip installations in the middle of the work-flow.

There are two ways to apply changes made to a dask cluster:

To create a new password for the jupyter interface, execute the following in locally, using a jupyter of similar version to the Dockerfile (currently 4.2)

1]: from notebook.auth import passwd
2]: passwd()
r password:
fy password:
2]: '...'

and place the created output string into config/jupyter_notebook_config.py before rebuildign the docker image.

History

The original work was completed by @ogrisel.


This work is supported by the National Institutes of Health's National Center for Advancing Translational Sciences, Grant Number U24TR002306. This work is solely the responsibility of the creators and does not necessarily represent the official views of the National Institutes of Health.