Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

joblib+dask: Cooperative scattering between concurrent dispatching coroutines #1133

Merged
merged 5 commits into from
Dec 18, 2020

Conversation

pierreglaser
Copy link
Contributor

@pierreglaser pierreglaser commented Nov 17, 2020

The coroutines in charge of scattering and submitting tasks are not aware of other async scattering operations until they complete which could lead to potential duplication of work (if the same argument is dispatched in different joblib batches) at least during the pre_dispatch phase, if not more.

This PR fixes this by enabling "cooperative" scattering through asycio.Tasks.

Simple benchmark on my machine:

import logging
import time

import numpy as np

from distributed import LocalCluster, Client
from joblib import Parallel, delayed, parallel_backend


def my_sum(array):
    return np.sum(array)


if __name__ == "__main__":

    cluster = LocalCluster(
        n_workers=2, threads_per_worker=1, silence_logs=logging.WARNING
    )
    client = Client(cluster)

    # warm up the workers
    client.scatter([1], broadcast=True)
    time.sleep(1)
    print("OK")

    t0 = time.time()

    my_array = np.ones((200000000,), dtype=np.int8)
    my_arrays = 10 * [my_array]

    with parallel_backend("dask"):
        t0 = time.time()
        results = Parallel(pre_dispatch="all")(
            delayed(my_sum)(a) for a in my_arrays
        )
    print('time: {:.3f}'.format(time.time() - t0))                                                                                                                                                                                                                                                     
  • master: time: 4.789
  • this PR: time: 2.378

Still WIP, needs more benchmarks.

@codecov
Copy link

codecov bot commented Nov 18, 2020

Codecov Report

Merging #1133 (ddcfeb0) into master (311d8be) will decrease coverage by 0.06%.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #1133      +/-   ##
==========================================
- Coverage   94.52%   94.46%   -0.07%     
==========================================
  Files          47       47              
  Lines        6960     6937      -23     
==========================================
- Hits         6579     6553      -26     
- Misses        381      384       +3     
Impacted Files Coverage Δ
joblib/_dask.py 94.08% <100.00%> (-0.44%) ⬇️
joblib/test/test_dask.py 98.88% <100.00%> (+<0.01%) ⬆️
joblib/test/test_store_backends.py 91.42% <0.00%> (-5.72%) ⬇️
joblib/test/test_hashing.py 98.95% <0.00%> (-0.13%) ⬇️
joblib/hashing.py 91.15% <0.00%> (-0.08%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 311d8be...946b17d. Read the comment docs.

@pierreglaser
Copy link
Contributor Author

(Cc @ogrisel just in case)

@ogrisel
Copy link
Contributor

ogrisel commented Dec 4, 2020

Thanks for the PR. I tried it on the following benchmark script:

https://gist.github.com/ogrisel/955234a13d9d6bb9bf35e2b2fe65e296

This script works on master but fail on this branch with the following traceback:

distributed.worker - WARNING -  Compute Failed
Function:  execute_task
args:      ((<function apply at 0x10ad51dc0>, batch_of__fit_and_score_1_calls, [], {'tasks': [(<sklearn.utils.fixes._FuncWrapper object at 0x119825a00>, [HistGradientBoostingRegressor(max_iter=30, max_leaf_nodes=7,
                              n_iter_no_change=100), [array([[-0.67885287, -1.05965934, -0.60467059, ...,  2.47452688,
        -0.09351459,  0.51911364],
       [ 1.16726154, -0.04240112,  0.06380132, ...,  0.00915058,
         1.82090361,  0.22041717],
       [-1.25323969,  0.65264794, -0.01676085, ...,  1.88854758,
         0.92526669, -0.3215536 ],
       ...,
       [-0.35525905, -0.46243697,  0.26059262, ..., -2.42458887,
         1.23001437,  0.23771077],
       [-0.56377843,  0.08320411, -0.14621276, ...,  0.52777378,
        -1.61083837, -0.76994802],
       [-0.13997404, -1.24875029,  0.08204633, ...,  0.41603879,
         0.79690959, -1.00197347]])], [array([  70.01719008, -318.68404847,  120.90432641, ..., -229.19487595,
        283.31245382,   88.63293123])]], {'train': [ar
kwargs:    {}
Exception: TypeError("unhashable type: 'numpy.ndarray'")

Traceback (most recent call last):
  File "benchmarks/bench_dask_randomized_search.py", line 41, in <module>
    search.fit(X, y)
  File "/Users/ogrisel/code/scikit-learn/sklearn/utils/validation.py", line 63, in inner_f
    return f(*args, **kwargs)
  File "/Users/ogrisel/code/scikit-learn/sklearn/model_selection/_search.py", line 841, in fit
    self._run_search(evaluate_candidates)
  File "/Users/ogrisel/code/scikit-learn/sklearn/model_selection/_search.py", line 1619, in _run_search
    evaluate_candidates(ParameterSampler(
  File "/Users/ogrisel/code/scikit-learn/sklearn/model_selection/_search.py", line 795, in evaluate_candidates
    out = parallel(delayed(_fit_and_score)(clone(base_estimator),
  File "/Users/ogrisel/code/joblib/joblib/parallel.py", line 1061, in __call__
    self.retrieve()
  File "/Users/ogrisel/code/joblib/joblib/parallel.py", line 940, in retrieve
    self._output.extend(job.get(timeout=self.timeout))
  File "/Users/ogrisel/miniforge3/envs/dev/lib/python3.8/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/Users/ogrisel/miniforge3/envs/dev/lib/python3.8/concurrent/futures/_base.py", line 388, in __get_result
    raise self._exception
  File "/Users/ogrisel/code/distributed/distributed/worker.py", line 3415, in apply_function
    result = function(*args, **kwargs)
  File "/Users/ogrisel/code/distributed/distributed/worker.py", line 3308, in execute_task
    return func(*map(execute_task, args))
  File "/Users/ogrisel/miniforge3/envs/dev/lib/python3.8/site-packages/dask/utils.py", line 30, in apply
    return func(*args, **kwargs)
  File "/Users/ogrisel/code/joblib/joblib/_dask.py", line 124, in __call__
    results.append(func(*args, **kwargs))
  File "/Users/ogrisel/code/scikit-learn/sklearn/utils/fixes.py", line 222, in __call__
    return self.function(*args, **kwargs)
  File "/Users/ogrisel/code/scikit-learn/sklearn/model_selection/_validation.py", line 585, in _fit_and_score
    X_train, y_train = _safe_split(estimator, X, y, train)
  File "/Users/ogrisel/code/scikit-learn/sklearn/utils/metaestimators.py", line 210, in _safe_split
    X_subset = _safe_indexing(X, indices)
  File "/Users/ogrisel/code/scikit-learn/sklearn/utils/__init__.py", line 321, in _safe_indexing
    indices_dtype = _determine_key_type(indices)
  File "/Users/ogrisel/code/scikit-learn/sklearn/utils/__init__.py", line 256, in _determine_key_type
    unique_key = set(key)
TypeError: unhashable type: 'numpy.ndarray'

It looks like a problem in scikit-learn but the fact that it only occurs with this PR makes me think that this might be more complicated.

@ogrisel
Copy link
Contributor

ogrisel commented Dec 4, 2020

I modified my version of scikit-learn to see the value of key in set(key) and I get list of arrays of ints:

Exception: TypeError('Invalid indexing key: [array([3334, 3335, 3336, ..., 9997, 9998, 9999])]')

Copy link
Contributor

@ogrisel ogrisel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I benchmarked it with the script I linked a bout on a machine with 44 dask workers, one per physical core.

I looked at the dask dashboard when it was running and it looked good: there are no idle workers waiting for too long to fetch the data. The total runtime stays approximately the same w.r.t. master: in this case the ration of compute vs data transfer is such that we do not observe the speed-up of your synthetic edge case code snippet at the beginning of the PR.

Note that if I start many workers (e.g. 88 dask workers), I still get too many open files errors. It does not seem to improve the situation w.r.t. master.

I suspect that many of those open files are dynlib ".so" files that Python processes have to open when importing scikit-learn and its dependencies.

Please add an entry to the changelog (stating that the dask backend now reduces the number of unnecessary scatter calls to optimize cluster bandwidth usage.

@ogrisel
Copy link
Contributor

ogrisel commented Dec 17, 2020

When running the linked stress test script on joblib master with 4 worker processes each with 2 threads on my laptop, I can trigger the following crash:

% python bench_dask_randomized_search.py
OMP_NUM_THREADS on dask workers: 1
Fitting 5 folds for each of 100 candidates, totalling 500 fits
[CV 5/5; 2/100] START max_depth=31, max_features=0.5, min_samples_leaf=50, n_estimators=500
[CV 5/5; 3/100] START max_depth=77, max_features=0.2, min_samples_leaf=30, n_estimators=500
[CV 5/5; 1/100] START max_depth=31, max_features=0.8, min_samples_leaf=3, n_estimators=50
[CV 1/5; 1/100] START max_depth=31, max_features=0.8, min_samples_leaf=3, n_estimators=50
[CV 4/5; 1/100] START max_depth=31, max_features=0.8, min_samples_leaf=3, n_estimators=50
[CV 1/5; 2/100] START max_depth=31, max_features=0.5, min_samples_leaf=50, n_estimators=500
[CV 3/5; 2/100] START max_depth=31, max_features=0.5, min_samples_leaf=50, n_estimators=500
[CV 2/5; 2/100] START max_depth=31, max_features=0.5, min_samples_leaf=50, n_estimators=500
[CV 1/5; 3/100] START max_depth=77, max_features=0.2, min_samples_leaf=30, n_estimators=500
[CV 2/5; 1/100] START max_depth=31, max_features=0.8, min_samples_leaf=3, n_estimators=50
[CV 4/5; 2/100] START max_depth=31, max_features=0.5, min_samples_leaf=50, n_estimators=500
[CV 1/5; 4/100] START max_depth=77, max_features=0.1, min_samples_leaf=20, n_estimators=30
[CV 4/5; 3/100] START max_depth=77, max_features=0.2, min_samples_leaf=30, n_estimators=500
[CV 3/5; 3/100] START max_depth=77, max_features=0.2, min_samples_leaf=30, n_estimators=500
[CV 3/5; 1/100] START max_depth=31, max_features=0.8, min_samples_leaf=3, n_estimators=50
[CV 2/5; 3/100] START max_depth=77, max_features=0.2, min_samples_leaf=30, n_estimators=500
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x103e2fc40>>, <Task finished name='Task-304' coro=<DaskDistributedBackend.apply_async.<locals>.f() done, defined at /Users/ogrisel/code/joblib/joblib/_dask.py:316> exception=CommClosedError('in <closed TCP>: OSError: [Errno 55] No buffer space available')>)
Traceback (most recent call last):
  File "/Users/ogrisel/miniforge3/envs/dev/lib/python3.9/site-packages/tornado/iostream.py", line 971, in _handle_write
    num_bytes = self.write_to_fd(self._write_buffer.peek(size))
  File "/Users/ogrisel/miniforge3/envs/dev/lib/python3.9/site-packages/tornado/iostream.py", line 1148, in write_to_fd
    return self.socket.send(data)  # type: ignore
OSError: [Errno 55] No buffer space available

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/ogrisel/miniforge3/envs/dev/lib/python3.9/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/Users/ogrisel/miniforge3/envs/dev/lib/python3.9/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/Users/ogrisel/code/joblib/joblib/_dask.py", line 317, in f
    batch, tasks = await self._to_func_args(func)
  File "/Users/ogrisel/code/joblib/joblib/_dask.py", line 304, in _to_func_args
    args = list(await maybe_to_futures(args))
  File "/Users/ogrisel/code/joblib/joblib/_dask.py", line 289, in maybe_to_futures
    [f] = await self.client.scatter(
  File "/Users/ogrisel/code/distributed/distributed/client.py", line 2076, in _scatter
    _, who_has, nbytes = await scatter_to_workers(
  File "/Users/ogrisel/code/distributed/distributed/utils_comm.py", line 144, in scatter_to_workers
    out = await All(
  File "/Users/ogrisel/code/distributed/distributed/utils.py", line 229, in All
    result = await tasks.next()
  File "/Users/ogrisel/code/distributed/distributed/core.py", line 878, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/Users/ogrisel/code/distributed/distributed/core.py", line 659, in send_recv
    await comm.write(msg, serializers=serializers, on_error="raise")
  File "/Users/ogrisel/code/distributed/distributed/comm/tcp.py", line 265, in write
    convert_stream_closed_error(self, e)
  File "/Users/ogrisel/code/distributed/distributed/comm/tcp.py", line 122, in convert_stream_closed_error
    raise CommClosedError(
distributed.comm.core.CommClosedError: in <closed TCP>: OSError: [Errno 55] No buffer space available

The script has a dataset in a numpy array of size 80 MB. So this is not that big but I do not observe the problem for smaller arrays. Note that my laptop still had several GB of free RAM space when the above crash happens, so the buffer in question is probably in the network stack of the OS (macOS in this case).

When using this cooperative scattering implemented in this PR, everything runs smoothly. So indeed over-scattering many times the same large array (with hash=False) in nested joblib.Parallel calls is a problem for dask/distributed and the coroutine based mitigation of this PR seems to fix it as expected.

ping @mrocklin as you might be interested in this.

@ogrisel
Copy link
Contributor

ogrisel commented Dec 17, 2020

Based on the above experience, I think it's a good idea to merge this PR. @pierreglaser please document it in the changelog for 1.1.0.

@mrocklin
Copy link
Contributor

Thanks for pinging me. This was an interesting read. No comments from me otherwise.

I'm glad that you all figured this out instead of me ;)

@ogrisel ogrisel merged commit a3f63bf into joblib:master Dec 18, 2020
@ogrisel
Copy link
Contributor

ogrisel commented Dec 18, 2020

Thanks for the fix @pierreglaser !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants