Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interoperability with Django ORM #1137

Open
chrisconlan opened this issue Dec 13, 2020 · 7 comments
Open

Interoperability with Django ORM #1137

chrisconlan opened this issue Dec 13, 2020 · 7 comments

Comments

@chrisconlan
Copy link

There is no sense in posting a comprehensive example, because setting up a Django project requires a good number of files. I will give this simple example.

The way I have to write parallelized Django code (non-Pythonically) ...

import joblib
import os
import sys
sys.path.append(DJANGO_PROJECT_DIR)
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "my_django_project.settings")

import django
django.setup()
from django.conf import settings

from custom_models.models import MyModel

def query_dependent_function(x: str):
	import django
	django.setup()
	from custom_models.models import MyModel 
	return MyModel.objects.get(attr=x)

def main():
	parallel = joblib.Parallel(n_jobs=2)
	parallel_function = joblib.delayed(query_dependent_function)
	results = parallel(parallel_function(x) for x in ['attr_one', 'attr_two'])
	return results

if __name__ == '__main__':
	results = main()

The way I should be able to write parallelized Django code (Pythonically) ...

import joblib
import os
import sys
sys.path.append(DJANGO_PROJECT_DIR)
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "my_django_project.settings")

import django
django.setup()
from django.conf import settings

from custom_models.models import MyModel

def query_dependent_function(x: str):
	return MyModel.objects.get(attr=x)

def main():
	parallel = joblib.Parallel(n_jobs=2)
	parallel_function = joblib.delayed(query_dependent_function)
	results = parallel(parallel_function(x) for x in ['attr_one', 'attr_two'])
	return results

if __name__ == '__main__':
	results = main()

When I write it in the Pythonic way, I get the following traceback which makes basically no sense and provides no guidance on debugging it.

---------------------------------------------------------------------------
_RemoteTraceback                          Traceback (most recent call last)
_RemoteTraceback: 
"""
Traceback (most recent call last):
  File "site-packages/joblib/externals/loky/process_executor.py", line 404, in _process_worker
    call_item = call_queue.get(block=True, timeout=timeout)
  File "lib/python3.7/multiprocessing/queues.py", line 113, in get
    return _ForkingPickler.loads(res)
  File "base_dir/my_project/my_project/custom_models/models.py", line 2, in <module>
    from custom_models.models import MyModel
  File "base_dir/my_project/my_project/custom_models/models.py", line 10, in <module>
    class MyModel(models.Model):
  File "site-packages/django/db/models/base.py", line 107, in __new__
    app_config = apps.get_containing_app_config(module)
  File "site-packages/django/apps/registry.py", line 252, in get_containing_app_config
    self.check_apps_ready()
  File "site-packages/django/apps/registry.py", line 135, in check_apps_ready
    raise AppRegistryNotReady("Apps aren't loaded yet.")
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
"""

The above exception was the direct cause of the following exception:

BrokenProcessPool                         Traceback (most recent call last)
base_dir/my_project/my_project/joblib_counterexample.py in <module>
     22 
     23 if __name__ == '__main__':
---> 24         results = main()
     25 

base_dir/my_project/my_project/joblib_counterexample.py in main()
     18         parallel = joblib.Parallel(n_jobs=2)
     19         parallel_function = joblib.delayed(query_dependent_function)
---> 20         results = parallel(parallel_function(x) for x in ['AAPL', 'AMZN'])
     21         return results
     22 

site-packages/joblib/parallel.py in __call__(self, iterable)
   1059 
   1060             with self._backend.retrieval_context():
-> 1061                 self.retrieve()
   1062             # Make sure that we get a last message telling us we are done
   1063             elapsed_time = time.time() - self._start_time

site-packages/joblib/parallel.py in retrieve(self)
    938             try:
    939                 if getattr(self._backend, 'supports_timeout', False):
--> 940                     self._output.extend(job.get(timeout=self.timeout))
    941                 else:
    942                     self._output.extend(job.get())

site-packages/joblib/_parallel_backends.py in wrap_future_result(future, timeout)
    540         AsyncResults.get from multiprocessing."""
    541         try:
--> 542             return future.result(timeout=timeout)
    543         except CfTimeoutError as e:
    544             raise TimeoutError from e

lib/python3.7/concurrent/futures/_base.py in result(self, timeout)
    433                 raise CancelledError()
    434             elif self._state == FINISHED:
--> 435                 return self.__get_result()
    436             else:
    437                 raise TimeoutError()

lib/python3.7/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.

As you can imagine, once a project gets pretty complex, it requires me to insert random imports all over the place until the error disappears. I know that Django does something fancy with the PATH variable in order to set up app paths for the ORM, and this issue is not unique to joblib. It happens in multiprocessing as well.

I would love if we could do something to increase interoperability here, because the tracebacks are very frustrating.

Thanks

@ogrisel
Copy link
Contributor

ogrisel commented Dec 15, 2020

We would indeed need to introduce some joblib level public API to make it possible for the users to register a callable used to initialize the workers, depending on the backend type (in particular to call django.setup() for django).

@ogrisel
Copy link
Contributor

ogrisel commented Dec 15, 2020

This is related to #1071 for which we have no clean way to do it in joblib either.

The stopgap work around we used in scikit-learn is to use a custom alternative to the joblib.delayed wrapping callable that is in charge of checking if the worker state is correct or not and initialize the worker state if not.

@chrisconlan
Copy link
Author

Thanks for your response. That would be helpful.

In response to your first comment... there is also something more specific going on here. I'm not sure why, but I need to re-import all of the Django models within the parallelized function to avoid the error.

I will check out the linked issue and see if I can roll a solution.

@ogrisel
Copy link
Contributor

ogrisel commented Dec 15, 2020

In response to your first comment... there is also something more specific going on here. I'm not sure why, but I need to re-import all of the Django models within the parallelized function to avoid the error.

Hum, that's fishy. If django provides a way to introspect all the (active) model classes, this could be used to ship the list of model classes to initialized in a custom delayed implementation.

@chrisconlan
Copy link
Author

chrisconlan commented Dec 15, 2020

That's what I am thinking. I'll let you know how it goes.

It might have to do with Python interpreter. The way imports have to be done in this forked context kind of breaks Python's assumptions about the accessibility of imported modules.

@ogrisel
Copy link
Contributor

ogrisel commented Dec 15, 2020

It might have to do with Python interpreter. The way imports have to be done in this forked context kind of breaks Python's assumptions about the accessibility of imported modules.

joblib uses loky and cloudpickle under the hood. They work fine all the common objects in regular and dynamic modules (e.g. interactively defined modules, as long as the object is picklable by value).

Feel free to open an issue on https://github.com/cloudpipe/cloudpickle with a minimal reproduction case with a custom django model class if you can suggest one.

@chrisconlan
Copy link
Author

After a lot of tweaking and experimenting, I came up with nothing. I have successfully moved to Luigi for this part of my ETL pipeline. It has a higher initial startup time per-task, but it allows me to parallelize this part of my workflow without sacrificing code quality.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants