Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update XGBoost + Dask overview documentation #5961

Merged
merged 5 commits into from
Jul 31, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 63 additions & 60 deletions doc/tutorials/dask.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ Distributed XGBoost with Dask
#############################

`Dask <https://dask.org>`_ is a parallel computing library built on Python. Dask allows
easy management of distributed workers and excels handling large distributed data science
easy management of distributed workers and excels at handling large distributed data science
workflows. The implementation in XGBoost originates from `dask-xgboost
<https://github.com/dask/dask-xgboost>`_ with some extended functionalities and a
different interface. Right now it is still under construction and may change (with proper
warnings) in the future. The tutorial here focus on basic usage of dask with CPU tree
algorithm. For an overview of GPU based training and internal working, see `A New,
warnings) in the future. The tutorial here focuses on basic usage of dask with CPU tree
algorithms. For an overview of GPU based training and internal workings, see `A New,
Official Dask API for XGBoost
<https://medium.com/rapids-ai/a-new-official-dask-api-for-xgboost-e8b10f3d1eb7>`_.

Expand All @@ -22,25 +22,29 @@ Official Dask API for XGBoost
Requirements
************

Dask is trivial to install using either pip or conda. `See here for official install
documentation <https://docs.dask.org/en/latest/install.html>`_. For accelerating XGBoost
with GPU, `dask-cuda <https://github.com/rapidsai/dask-cuda>`_ is recommended for creating
GPU clusters.
Dask can be installed using either pip or conda (see the dask `installation
documentation <https://docs.dask.org/en/latest/install.html>`_ for more information). For
accelerating XGBoost with GPUs, `dask-cuda <https://github.com/rapidsai/dask-cuda>`_ is
recommended for creating GPU clusters.


********
Overview
********

There are 3 different components in dask from a user's perspective, namely a scheduler,
bunch of workers and some clients connecting to the scheduler. For using XGBoost with
dask, one needs to call XGBoost dask interface from the client side. A small example
illustrates the basic usage:
A dask cluster consists of three different components: a centralized scheduler, one or
more workers, and one or more clients which act as the user-facing entry point for submitting
tasks to the cluster. When using XGBoost with dask, one needs to call the XGBoost dask interface
from the client side. Below is a small example which illustrates basic usage of running XGBoost
on a dask cluster:

.. code-block:: python

cluster = LocalCluster(n_workers=4, threads_per_worker=1)
client = Client(cluster)
import xgboost as xgb
import dask.distributed

cluster = dask.distributed.LocalCluster(n_workers=4, threads_per_worker=1)
client = dask.distributed.Client(cluster)

dtrain = xgb.dask.DaskDMatrix(client, X, y) # X and y are dask dataframes or arrays

Expand All @@ -50,23 +54,24 @@ illustrates the basic usage:
dtrain,
num_boost_round=4, evals=[(dtrain, 'train')])

Here we first create a cluster in single-node mode wtih ``distributed.LocalCluster``, then
connect a ``client`` to this cluster, setting up environment for later computation.
Similar to non-distributed interface, we create a ``DMatrix`` object and pass it to
``train`` along with some other parameters. Except in dask interface, client is an extra
argument for carrying out the computation, when set to ``None`` XGBoost will use the
default client returned from dask.
Here we first create a cluster in single-node mode with ``dask.distributed.LocalCluster``, then
connect a ``dask.distributed.Client`` to this cluster, setting up an environment for later computation.

We then create a ``DMatrix`` object and pass it to ``train``, along with some other parameters,
much like XGBoost's normal, non-dask interface. The primary difference with XGBoost's dask interface is
we pass our dask client as an additional argument for carrying out the computation. Note that if
client is set to ``None``, XGBoost will use the default client returned by dask.

There are two sets of APIs implemented in XGBoost. The first set is functional API
illustrated in above example. Given the data and a set of parameters, `train` function
returns a model and the computation history as Python dictionary
illustrated in above example. Given the data and a set of parameters, the ``train`` function
returns a model and the computation history as a Python dictionary:

.. code-block:: python

{'booster': Booster,
'history': dict}

For prediction, pass the ``output`` returned by ``train`` into ``xgb.dask.predict``
For prediction, pass the ``output`` returned by ``train`` into ``xgb.dask.predict``:

.. code-block:: python

Expand All @@ -80,9 +85,8 @@ Or equivalently, pass ``output['booster']``:

Here ``prediction`` is a dask ``Array`` object containing predictions from model.

Another set of API is a Scikit-Learn wrapper, which mimics the stateful Scikit-Learn
interface with ``DaskXGBClassifier`` and ``DaskXGBRegressor``. See ``xgboost/demo/dask``
for more examples.
Alternatively, XGBoost also implements the Scikit-Learn interface with ``DaskXGBClassifier``
and ``DaskXGBRegressor``. See ``xgboost/demo/dask`` for more examples.

*******
Threads
Expand All @@ -94,7 +98,7 @@ will override the configuration in Dask. For example:

.. code-block:: python

with LocalCluster(n_workers=7, threads_per_worker=4) as cluster:
with dask.distributed.LocalCluster(n_workers=7, threads_per_worker=4) as cluster:

There are 4 threads allocated for each dask worker. Then by default XGBoost will use 4
threads in each process for both training and prediction. But if ``nthread`` parameter is
Expand All @@ -117,21 +121,21 @@ Working with asyncio

.. versionadded:: 1.2.0

XGBoost dask interface supports the new ``asyncio`` in Python and can be integrated into
XGBoost's dask interface supports the new ``asyncio`` in Python and can be integrated into
asynchronous workflows. For using dask with asynchronous operations, please refer to
`dask example <https://examples.dask.org/applications/async-await.html>`_ and document in
`distributed <https://distributed.dask.org/en/latest/asynchronous.html>`_. As XGBoost
takes ``Client`` object as an argument for both training and prediction, so when
``asynchronous=True`` is specified when creating ``Client``, the dask interface can adapt
the change accordingly. All functions provided by the functional interface returns a
coroutine when called in async function, and hence require awaiting to get the result,
including ``DaskDMatrix``.
`this dask example <https://examples.dask.org/applications/async-await.html>`_ and document in
`distributed <https://distributed.dask.org/en/latest/asynchronous.html>`_. To use XGBoost's
dask interface asynchronously, the ``client`` which is passed as an argument for training and
prediction must be operating in asynchronous mode by specifying ``asynchronous=True`` when the
``client`` is created (example below). All functions (including ``DaskDMatrix``) provided
by the functional interface will then return coroutines which can then be awaited to retrieve
their result.

Functional interface:

.. code-block:: python

async with Client(scheduler_address, asynchronous=True) as client:
async with dask.distributed.Client(scheduler_address, asynchronous=True) as client:
X, y = generate_array()
m = await xgb.dask.DaskDMatrix(client, X, y)
output = await xgb.dask.train(client, {}, dtrain=m)
Expand All @@ -144,13 +148,13 @@ Functional interface:
print(await client.compute(with_m))


While for Scikit Learn interface, trivial methods like ``set_params`` and accessing class
While for the Scikit-Learn interface, trivial methods like ``set_params`` and accessing class
attributes like ``evals_result_`` do not require ``await``. Other methods involving
actual computation will return a coroutine and hence require awaiting:

.. code-block:: python

async with Client(scheduler_address, asynchronous=True) as client:
async with dask.distributed.Client(scheduler_address, asynchronous=True) as client:
X, y = generate_array()
regressor = await xgb.dask.DaskXGBRegressor(verbosity=1, n_estimators=2)
regressor.set_params(tree_method='hist') # trivial method, synchronous operation
Expand All @@ -169,39 +173,38 @@ return 2 workers.
Why is the initialization of ``DaskDMatrix`` so slow and throws weird errors
*****************************************************************************

The dask API in XGBoost requires construction of ``DaskDMatrix``. With ``Scikit-Learn``
interface, ``DaskDMatrix`` is implicitly constructed for each input data during `fit` or
`predict`. You might have observed its construction is taking incredible amount of time,
and sometimes throws error that doesn't seem to be relevant to `DaskDMatrix`. Here is a
brief explanation for why. By default most of dask's computation is `lazy
The dask API in XGBoost requires construction of ``DaskDMatrix``. With the Scikit-Learn
interface, ``DaskDMatrix`` is implicitly constructed for all input data during the ``fit`` or
``predict`` steps. You might have observed that ``DaskDMatrix`` construction can take large amounts of time,
and sometimes throws errors that don't seem to be relevant to ``DaskDMatrix``. Here is a
brief explanation for why. By default most dask computations are `lazily evaluated
<https://docs.dask.org/en/latest/user-interfaces.html#laziness-and-computing>`_, which
means the computation is not carried out until you explicitly ask for result, either by
calling `compute()` or `wait()`. See above link for details in dask, and `this wiki
<https://en.wikipedia.org/wiki/Lazy_evaluation>`_ for general concept of lazy evaluation.
The `DaskDMatrix` constructor forces all lazy computation to materialize, which means it's
means that computation is not carried out until you explicitly ask for a result by, for example,
calling ``compute()``. See the previous link for details in dask, and `this wiki
<https://en.wikipedia.org/wiki/Lazy_evaluation>`_ for information on the general concept of lazy evaluation.
The ``DaskDMatrix`` constructor forces lazy computations to be evaluated, which means it's
where all your earlier computation actually being carried out, including operations like
`dd.read_csv()`. To isolate the computation in `DaskDMatrix` from other lazy
computations, one can explicitly wait for results of input data before calling constructor
of `DaskDMatrix`. Also dask's `web interface
<https://distributed.dask.org/en/latest/web.html>`_ can be used to monitor what operations
are currently being performed.
``dd.read_csv()``. To isolate the computation in ``DaskDMatrix`` from other lazy
computations, one can explicitly wait for results of input data before constructing a ``DaskDMatrix``.
Also dask's `diagnostics dashboard <https://distributed.dask.org/en/latest/web.html>`_ can be used to
monitor what operations are currently being performed.

***********
Limitations
***********

Basic functionalities including training and generating predictions for regression and
classification are implemented. But there are still some other limitations we haven't
addressed yet.
Basic functionality including model training and generating classification and regression predictions
have been implemented. However, there are still some other limitations we haven't
addressed yet:

- Label encoding for Scikit-Learn classifier may not be supported. Meaning that user need
- Label encoding for the ``DaskXGBClassifier`` classifier may not be supported. So users need
to encode their training labels into discrete values first.
- Ranking is not supported right now.
- Ranking is not yet supported.
- Empty worker is not well supported by classifier. If the training hangs for classifier
with a warning about empty DMatrix, please consider balancing your data first. But
regressor works fine with empty DMatrix.
- Callback functions are not tested.
- Only ``GridSearchCV`` from ``scikit-learn`` is supported for dask interface. Meaning
that we can distribute data among workers but have to train one model at a time. If you
want to scale up grid searching with model parallelism by ``dask-ml``, please consider
using normal ``scikit-learn`` interface like `xgboost.XGBRegressor` for now.
- Only ``GridSearchCV`` from Scikit-Learn is supported. Meaning that we can distribute data
among workers but have to train one model at a time. If you want to scale up grid searching with
model parallelism with `Dask-ML <https://ml.dask.org/>`_, please consider using XGBoost's non-dask
Scikit-Learn interface, for example ``xgboost.XGBRegressor``, for now.