Skip to content

Commit

Permalink
Avoid circular reference with defer.inlineCallbacks (#11886)
Browse files Browse the repository at this point in the history
  • Loading branch information
adiroiban committed Oct 4, 2023
2 parents 8cd855a + 3440936 commit 88172f7
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 87 deletions.
107 changes: 71 additions & 36 deletions src/twisted/internet/defer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1919,6 +1919,33 @@ class _CancellationStatus(Generic[_SelfResultT]):
waitingOn: Optional[Deferred[_SelfResultT]] = None


def _gotResultInlineCallbacks(
r: object,
waiting: List[Any],
gen: Union[
Generator[Deferred[object], object, _T],
Coroutine[Deferred[object], object, _T],
],
status: _CancellationStatus[_T],
context: _Context,
) -> None:
"""
Helper for L{_inlineCallbacks} to handle a nested L{Deferred} firing.
@param r: The result of the L{Deferred}
@param waiting: Whether the L{_inlineCallbacks} was waiting, and the result.
@param gen: a generator object returned by calling a function or method
decorated with C{@}L{inlineCallbacks}
@param status: a L{_CancellationStatus} tracking the current status of C{gen}
@param context: the contextvars context to run `gen` in
"""
if waiting[0]:
waiting[0] = False
waiting[1] = r
else:
_inlineCallbacks(r, gen, status, context)


@_extraneous
def _inlineCallbacks(
result: object,
Expand Down Expand Up @@ -2060,14 +2087,7 @@ def _inlineCallbacks(

if isinstance(result, Deferred):
# a deferred was yielded, get the result.
def gotResult(r: object) -> None:
if waiting[0]:
waiting[0] = False
waiting[1] = r
else:
_inlineCallbacks(r, gen, status, context)

result.addBoth(gotResult)
result.addBoth(_gotResultInlineCallbacks, waiting, gen, status, context)
if waiting[0]:
# Haven't called back yet, set flag so that we get reinvoked
# and return from the loop
Expand All @@ -2085,6 +2105,48 @@ def gotResult(r: object) -> None:
waiting[1] = None


def _addCancelCallbackToDeferred(
it: Deferred[_T], status: _CancellationStatus[_T]
) -> None:
"""
Helper for L{_cancellableInlineCallbacks} to add
L{_handleCancelInlineCallbacks} as the first errback.
@param it: The L{Deferred} to add the errback to.
@param status: a L{_CancellationStatus} tracking the current status of C{gen}
"""
it.callbacks, tmp = [], it.callbacks
it.addErrback(_handleCancelInlineCallbacks, status)
it.callbacks.extend(tmp)
it.errback(_InternalInlineCallbacksCancelledError())


def _handleCancelInlineCallbacks(
result: Failure,
status: _CancellationStatus[_T],
) -> Deferred[_T]:
"""
Propagate the cancellation of an C{@}L{inlineCallbacks} to the
L{Deferred} it is waiting on.
@param result: An L{_InternalInlineCallbacksCancelledError} from
C{cancel()}.
@param status: a L{_CancellationStatus} tracking the current status of C{gen}
@return: A new L{Deferred} that the C{@}L{inlineCallbacks} generator
can callback or errback through.
"""
result.trap(_InternalInlineCallbacksCancelledError)
status.deferred = Deferred(lambda d: _addCancelCallbackToDeferred(d, status))

# We would only end up here if the inlineCallback is waiting on
# another Deferred. It needs to be cancelled.
awaited = status.waitingOn
assert awaited is not None
awaited.cancel()

return status.deferred


def _cancellableInlineCallbacks(
gen: Union[
Generator["Deferred[object]", object, _T],
Expand All @@ -2100,36 +2162,9 @@ def _cancellableInlineCallbacks(
@return: L{Deferred} for the C{@}L{inlineCallbacks} that is cancellable.
"""

def cancel(it: Deferred[_T]) -> None:
it.callbacks, tmp = [], it.callbacks
it.addErrback(handleCancel)
it.callbacks.extend(tmp)
it.errback(_InternalInlineCallbacksCancelledError())

deferred: Deferred[_T] = Deferred(cancel)
deferred: Deferred[_T] = Deferred(lambda d: _addCancelCallbackToDeferred(d, status))
status = _CancellationStatus(deferred)

def handleCancel(result: Failure) -> Deferred[_T]:
"""
Propagate the cancellation of an C{@}L{inlineCallbacks} to the
L{Deferred} it is waiting on.
@param result: An L{_InternalInlineCallbacksCancelledError} from
C{cancel()}.
@return: A new L{Deferred} that the C{@}L{inlineCallbacks} generator
can callback or errback through.
"""
result.trap(_InternalInlineCallbacksCancelledError)
status.deferred = Deferred(cancel)

# We would only end up here if the inlineCallback is waiting on
# another Deferred. It needs to be cancelled.
awaited = status.waitingOn
assert awaited is not None
awaited.cancel()

return status.deferred

_inlineCallbacks(None, gen, status, _copy_context())

return deferred
Expand Down
1 change: 1 addition & 0 deletions src/twisted/newsfragments/11885.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
When using `CPython`, functions wrapped by `twisted.internet.defer.inlineCallbacks` can have their arguments and return values freed immediately after completion (due to there no longer being circular references).
225 changes: 174 additions & 51 deletions src/twisted/test/test_defer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
Mapping,
NoReturn,
Optional,
Set,
Tuple,
Type,
TypeVar,
Expand Down Expand Up @@ -1684,57 +1685,6 @@ def raiseError(ignored: object) -> None:
self.assertNotEqual([], localz)
self.assertNotEqual([], globalz)

def test_inlineCallbacksTracebacks(self) -> None:
"""
L{defer.inlineCallbacks} that re-raise tracebacks into their deferred
should not lose their tracebacks.
"""
f = getDivisionFailure()
d: Deferred[None] = Deferred()
try:
f.raiseException()
except BaseException:
d.errback()

def ic(d: object) -> Generator[Any, Any, None]:
yield d

defer.inlineCallbacks(ic)
newFailure = self.failureResultOf(d)
tb = traceback.extract_tb(newFailure.getTracebackObject())

self.assertEqual(len(tb), 3)
self.assertIn("test_defer", tb[2][0])
self.assertEqual("getDivisionFailure", tb[2][2])
self.assertEqual("1 / 0", tb[2][3])

self.assertIn("test_defer", tb[0][0])
self.assertEqual("test_inlineCallbacksTracebacks", tb[0][2])
self.assertEqual("f.raiseException()", tb[0][3])

def test_fromCoroutineRequiresCoroutine(self) -> None:
"""
L{Deferred.fromCoroutine} requires a coroutine object or a generator,
and will reject things that are not that.
"""
thingsThatAreNotCoroutines = [
# Lambda
lambda x: x,
# Int
1,
# Boolean
True,
# Function
self.test_fromCoroutineRequiresCoroutine,
# None
None,
# Module
defer,
]

for thing in thingsThatAreNotCoroutines:
self.assertRaises(defer.NotACoroutineError, Deferred.fromCoroutine, thing)

@pyunit.skipIf(_PYPY, "GC works differently on PyPy.")
def test_canceller_circular_reference_callback(self) -> None:
"""
Expand Down Expand Up @@ -3950,3 +3900,176 @@ async def testFunction() -> bool:
clock.advance(1)

self.assertEqual(self.successResultOf(d), True)


class InlineCallbackTests(unittest.SynchronousTestCase):
def test_inlineCallbacksTracebacks(self) -> None:
"""
L{defer.inlineCallbacks} that re-raise tracebacks into their deferred
should not lose their tracebacks.
"""
f = getDivisionFailure()
d: Deferred[None] = Deferred()
try:
f.raiseException()
except BaseException:
d.errback()

def ic(d: object) -> Generator[Any, Any, None]:
yield d

defer.inlineCallbacks(ic)
newFailure = self.failureResultOf(d)
tb = traceback.extract_tb(newFailure.getTracebackObject())

self.assertEqual(len(tb), 3)
self.assertIn("test_defer", tb[2][0])
self.assertEqual("getDivisionFailure", tb[2][2])
self.assertEqual("1 / 0", tb[2][3])

self.assertIn("test_defer", tb[0][0])
self.assertEqual("test_inlineCallbacksTracebacks", tb[0][2])
self.assertEqual("f.raiseException()", tb[0][3])

def test_fromCoroutineRequiresCoroutine(self) -> None:
"""
L{Deferred.fromCoroutine} requires a coroutine object or a generator,
and will reject things that are not that.
"""
thingsThatAreNotCoroutines = [
# Lambda
lambda x: x,
# Int
1,
# Boolean
True,
# Function
self.test_fromCoroutineRequiresCoroutine,
# None
None,
# Module
defer,
]

for thing in thingsThatAreNotCoroutines:
self.assertRaises(defer.NotACoroutineError, Deferred.fromCoroutine, thing)

def test_inlineCallbacksCancelCaptured(self) -> None:
"""
Cancelling an L{defer.inlineCallbacks} correctly handles the function
catching the L{defer.CancelledError}.
The desired behavior is:
1. If the function is waiting on an inner deferred, that inner
deferred is cancelled, and a L{defer.CancelledError} is raised
within the function.
2. If the function catches that exception, execution continues, and
the deferred returned by the function is not resolved.
3. Cancelling the deferred again cancels any deferred the function
is waiting on, and the exception is raised.
"""
canceller1Calls: List[Deferred[object]] = []
canceller2Calls: List[Deferred[object]] = []
d1: Deferred[object] = Deferred(canceller1Calls.append)
d2: Deferred[object] = Deferred(canceller2Calls.append)

@defer.inlineCallbacks
def testFunc() -> Generator[Deferred[object], object, None]:
try:
yield d1
except Exception:
pass

yield d2

# Call the function, and ensure that none of the deferreds have
# completed or been cancelled yet.
funcD = testFunc()

self.assertNoResult(d1)
self.assertNoResult(d2)
self.assertNoResult(funcD)
self.assertEqual(canceller1Calls, [])
self.assertEqual(canceller1Calls, [])

# Cancel the deferred returned by the function, and check that the first
# inner deferred has been cancelled, but the returned deferred has not
# completed (as the function catches the raised exception).
funcD.cancel()

self.assertEqual(canceller1Calls, [d1])
self.assertEqual(canceller2Calls, [])
self.assertNoResult(funcD)

# Cancel the returned deferred again, this time the returned deferred
# should have a failure result, as the function did not catch the cancel
# exception raised by `d2`.
funcD.cancel()
failure = self.failureResultOf(funcD)
self.assertEqual(failure.type, defer.CancelledError)
self.assertEqual(canceller2Calls, [d2])

@pyunit.skipIf(_PYPY, "GC works differently on PyPy.")
def test_inlineCallbacksNoCircularReference(self) -> None:
"""
When using L{defer.inlineCallbacks}, after the function exits, it will
not keep references to the function itself or the arguments.
This ensures that the machinery gets deallocated immediately rather than
waiting for a GC, on CPython.
The GC on PyPy works differently (del doesn't immediately deallocate the
object), so we skip the test.
"""

# Create an object and weak reference to track if its gotten freed.
obj: Set[Any] = set()
objWeakRef = weakref.ref(obj)

@defer.inlineCallbacks
def func(a: Any) -> Any:
yield a
return a

# Run the function
funcD = func(obj)
self.assertEqual(obj, self.successResultOf(funcD))

funcDWeakRef = weakref.ref(funcD)

# Delete the local references to obj and funcD.
del obj
del funcD

# The object has been freed if the weak reference returns None.
self.assertIsNone(objWeakRef())
self.assertIsNone(funcDWeakRef())

@pyunit.skipIf(_PYPY, "GC works differently on PyPy.")
def test_coroutineNoCircularReference(self) -> None:
"""
Tests that there is no circular dependency when using
L{Deferred.fromCoroutine}, so that the machinery gets cleaned up
immediately rather than waiting for a GC.
"""

# Create an object and weak reference to track if its gotten freed.
obj: Set[Any] = set()
objWeakRef = weakref.ref(obj)

async def func(a: Any) -> Any:
return a

# Run the function
funcD = Deferred.fromCoroutine(func(obj))
self.assertEqual(obj, self.successResultOf(funcD))

funcDWeakRef = weakref.ref(funcD)

# Delete the local references to obj and funcD.
del obj
del funcD

# The object has been freed if the weak reference returns None.
self.assertIsNone(objWeakRef())
self.assertIsNone(funcDWeakRef())
6 changes: 6 additions & 0 deletions src/twisted/test/test_twistd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,12 @@ def test_setUidSameAsCurrentUid(self):
If the specified UID is the same as the current UID of the process,
then a warning is displayed.
"""

# FIXME:https://github.com/twisted/twisted/issues/10332
# Assert that there were no existing warnings.
existing_warnings = self.flushWarnings()
self.assertEqual([], existing_warnings)

currentUid = os.getuid()
self._setUID("morefoo", currentUid, "morebar", 4343)

Expand Down

0 comments on commit 88172f7

Please sign in to comment.