Skip to content

Commit

Permalink
Fix Python epoll1 Fork Support (#32196)
Browse files Browse the repository at this point in the history
* WIP. A seemingly properly failing test

* WIP. Pre-fork handlers now work

* Roughly working.

* Clean up

* Clean up more

* Add to CI

* Format

* Ugh. Remove swap file

* And another

* clean up

* Add copyright

* Format

* Remove another debug line

* Add stub forkable methods

* Remove use of 3.9+ function

* Remove unintentional double copyright

* drfloob review comments

* Only hold lock during Close once

* Create separate job for fork test

* Bump up gdb timeout

* Format
  • Loading branch information
gnossen authored and wanlin31 committed May 18, 2023
1 parent 3462241 commit 2088c69
Show file tree
Hide file tree
Showing 17 changed files with 423 additions and 68 deletions.
1 change: 1 addition & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1685,6 +1685,7 @@ grpc_cc_library(
deps = [
"event_engine_poller",
"event_engine_time_util",
"forkable",
"iomgr_port",
"posix_event_engine_closure",
"posix_event_engine_event_poller",
Expand Down
17 changes: 11 additions & 6 deletions src/core/lib/event_engine/forkable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ namespace experimental {
namespace {
grpc_core::NoDestruct<grpc_core::Mutex> g_mu;
bool g_registered ABSL_GUARDED_BY(g_mu){false};
grpc_core::NoDestruct<absl::flat_hash_set<Forkable*>> g_forkables
ABSL_GUARDED_BY(g_mu);

// This must be ordered because there are ordering dependencies between
// certain fork handlers.
grpc_core::NoDestruct<std::vector<Forkable*>> g_forkables ABSL_GUARDED_BY(g_mu);
} // namespace

Forkable::Forkable() { ManageForkable(this); }
Expand All @@ -48,8 +50,9 @@ void RegisterForkHandlers() {

void PrepareFork() {
grpc_core::MutexLock lock(g_mu.get());
for (auto* forkable : *g_forkables) {
forkable->PrepareFork();
for (auto forkable_iter = g_forkables->rbegin();
forkable_iter != g_forkables->rend(); ++forkable_iter) {
(*forkable_iter)->PrepareFork();
}
}
void PostforkParent() {
Expand All @@ -68,12 +71,14 @@ void PostforkChild() {

void ManageForkable(Forkable* forkable) {
grpc_core::MutexLock lock(g_mu.get());
g_forkables->insert(forkable);
g_forkables->push_back(forkable);
}

void StopManagingForkable(Forkable* forkable) {
grpc_core::MutexLock lock(g_mu.get());
g_forkables->erase(forkable);
auto iter = std::find(g_forkables->begin(), g_forkables->end(), forkable);
GPR_ASSERT(iter != g_forkables->end());
g_forkables->erase(iter);
}

} // namespace experimental
Expand Down
42 changes: 30 additions & 12 deletions src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ void ResetEventManagerOnFork() {
while (!fork_poller_list.empty()) {
Epoll1Poller* poller = fork_poller_list.front();
fork_poller_list.pop_front();
delete poller;
poller->Close();
}
gpr_mu_unlock(&fork_fd_list_mu);
if (grpc_core::Fork::Enabled()) {
Expand Down Expand Up @@ -354,7 +354,7 @@ void Epoll1EventHandle::HandleShutdownInternal(absl::Status why,
}

Epoll1Poller::Epoll1Poller(Scheduler* scheduler)
: scheduler_(scheduler), was_kicked_(false) {
: scheduler_(scheduler), was_kicked_(false), closed_(false) {
g_epoll_set_.epfd = EpollCreateAndCloexec();
wakeup_fd_ = *CreateWakeupFd();
GPR_ASSERT(wakeup_fd_ != nullptr);
Expand All @@ -375,22 +375,26 @@ void Epoll1Poller::Shutdown() {
delete this;
}

Epoll1Poller::~Epoll1Poller() {
void Epoll1Poller::Close() {
grpc_core::MutexLock lock(&mu_);
if (closed_) return;

if (g_epoll_set_.epfd >= 0) {
close(g_epoll_set_.epfd);
g_epoll_set_.epfd = -1;
}
{
grpc_core::MutexLock lock(&mu_);
while (!free_epoll1_handles_list_.empty()) {
Epoll1EventHandle* handle = reinterpret_cast<Epoll1EventHandle*>(
free_epoll1_handles_list_.front());
free_epoll1_handles_list_.pop_front();
delete handle;
}

while (!free_epoll1_handles_list_.empty()) {
Epoll1EventHandle* handle =
reinterpret_cast<Epoll1EventHandle*>(free_epoll1_handles_list_.front());
free_epoll1_handles_list_.pop_front();
delete handle;
}
closed_ = true;
}

Epoll1Poller::~Epoll1Poller() { Close(); }

EventHandle* Epoll1Poller::CreateHandle(int fd, absl::string_view /*name*/,
bool track_err) {
Epoll1EventHandle* new_handle = nullptr;
Expand Down Expand Up @@ -556,7 +560,7 @@ Poller::WorkResult Epoll1Poller::Work(

void Epoll1Poller::Kick() {
grpc_core::MutexLock lock(&mu_);
if (was_kicked_) {
if (was_kicked_ || closed_) {
return;
}
was_kicked_ = true;
Expand All @@ -571,6 +575,14 @@ Epoll1Poller* MakeEpoll1Poller(Scheduler* scheduler) {
return nullptr;
}

void Epoll1Poller::PrepareFork() { Kick(); }

// TODO(vigneshbabu): implement
void Epoll1Poller::PostforkParent() {}

// TODO(vigneshbabu): implement
void Epoll1Poller::PostforkChild() {}

} // namespace experimental
} // namespace grpc_event_engine

Expand Down Expand Up @@ -617,6 +629,12 @@ void Epoll1Poller::Kick() { grpc_core::Crash("unimplemented"); }
// nullptr.
Epoll1Poller* MakeEpoll1Poller(Scheduler* /*scheduler*/) { return nullptr; }

void Epoll1Poller::PrepareFork() {}

void Epoll1Poller::PostforkParent() {}

void Epoll1Poller::PostforkChild() {}

} // namespace experimental
} // namespace grpc_event_engine

Expand Down
12 changes: 11 additions & 1 deletion src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include <grpc/event_engine/event_engine.h>

#include "src/core/lib/event_engine/forkable.h"
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/internal_errqueue.h"
Expand All @@ -46,7 +47,7 @@ namespace experimental {
class Epoll1EventHandle;

// Definition of epoll1 based poller.
class Epoll1Poller : public PosixEventPoller {
class Epoll1Poller : public PosixEventPoller, public Forkable {
public:
explicit Epoll1Poller(Scheduler* scheduler);
EventHandle* CreateHandle(int fd, absl::string_view name,
Expand All @@ -67,6 +68,13 @@ class Epoll1Poller : public PosixEventPoller {
}
~Epoll1Poller() override;

// Forkable
void PrepareFork() override;
void PostforkParent() override;
void PostforkChild() override;

void Close();

private:
// This initial vector size may need to be tuned
using Events = absl::InlinedVector<Epoll1EventHandle*, 5>;
Expand All @@ -79,6 +87,7 @@ class Epoll1Poller : public PosixEventPoller {
// on file descriptors that became readable/writable.
bool ProcessEpollEvents(int max_epoll_events_to_handle,
Events& pending_events);

// Do epoll_wait and store the events in g_epoll_set.events field. This does
// not "process" any of the events yet; that is done in ProcessEpollEvents().
// See ProcessEpollEvents() function for more details. It returns the number
Expand Down Expand Up @@ -117,6 +126,7 @@ class Epoll1Poller : public PosixEventPoller {
bool was_kicked_ ABSL_GUARDED_BY(mu_);
std::list<EventHandle*> free_epoll1_handles_list_ ABSL_GUARDED_BY(mu_);
std::unique_ptr<WakeupFd> wakeup_fd_;
bool closed_;
};

// Return an instance of a epoll1 based poller tied to the specified event
Expand Down
7 changes: 6 additions & 1 deletion src/core/lib/event_engine/posix_engine/posix_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ void PosixEventEngine::OnConnectFinishInternal(int connection_handle) {
PosixEnginePollerManager::PosixEnginePollerManager(
std::shared_ptr<ThreadPool> executor)
: poller_(grpc_event_engine::experimental::MakeDefaultPoller(this)),
executor_(std::move(executor)) {}
executor_(std::move(executor)),
trigger_shutdown_called_(false) {}

PosixEnginePollerManager::PosixEnginePollerManager(PosixEventPoller* poller)
: poller_(poller),
Expand All @@ -316,6 +317,8 @@ void PosixEnginePollerManager::Run(absl::AnyInvocable<void()> cb) {
}

void PosixEnginePollerManager::TriggerShutdown() {
GPR_DEBUG_ASSERT(trigger_shutdown_called_ == false);
trigger_shutdown_called_ = true;
// If the poller is external, dont try to shut it down. Otherwise
// set poller state to PollerState::kShuttingDown.
if (poller_state_.exchange(PollerState::kShuttingDown) ==
Expand Down Expand Up @@ -347,6 +350,8 @@ PosixEventEngine::PosixEventEngine()
timer_manager_(executor_) {
if (NeedPosixEngine()) {
poller_manager_ = std::make_shared<PosixEnginePollerManager>(executor_);
// The threadpool must be instantiated after the poller otherwise, the
// process will deadlock when forking.
if (poller_manager_->Poller() != nullptr) {
executor_->Run([poller_manager = poller_manager_]() {
PollerWorkInternal(poller_manager);
Expand Down
1 change: 1 addition & 0 deletions src/core/lib/event_engine/posix_engine/posix_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ class PosixEnginePollerManager
grpc_event_engine::experimental::PosixEventPoller* poller_ = nullptr;
std::atomic<PollerState> poller_state_{PollerState::kOk};
std::shared_ptr<ThreadPool> executor_;
bool trigger_shutdown_called_;
};
#endif // GRPC_POSIX_SOCKET_TCP

Expand Down
18 changes: 14 additions & 4 deletions src/core/lib/gprpp/fork.cc
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,19 @@ void Fork::DoDecExecCtxCount() {

void Fork::SetResetChildPollingEngineFunc(
Fork::child_postfork_func reset_child_polling_engine) {
reset_child_polling_engine_ = reset_child_polling_engine;
if (reset_child_polling_engine_ == nullptr) {
reset_child_polling_engine_ = new std::vector<Fork::child_postfork_func>();
}
if (reset_child_polling_engine == nullptr) {
reset_child_polling_engine_->clear();
} else {
reset_child_polling_engine_->emplace_back(reset_child_polling_engine);
}
}
Fork::child_postfork_func Fork::GetResetChildPollingEngineFunc() {
return reset_child_polling_engine_;

const std::vector<Fork::child_postfork_func>&
Fork::GetResetChildPollingEngineFunc() {
return *reset_child_polling_engine_;
}

bool Fork::BlockExecCtx() {
Expand Down Expand Up @@ -235,5 +244,6 @@ void Fork::AwaitThreads() {

std::atomic<bool> Fork::support_enabled_(false);
bool Fork::override_enabled_ = false;
Fork::child_postfork_func Fork::reset_child_polling_engine_ = nullptr;
std::vector<Fork::child_postfork_func>* Fork::reset_child_polling_engine_ =
nullptr;
} // namespace grpc_core
6 changes: 4 additions & 2 deletions src/core/lib/gprpp/fork.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <grpc/support/port_platform.h>

#include <atomic>
#include <vector>

//
// NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
Expand Down Expand Up @@ -58,7 +59,8 @@ class Fork {
// reset the polling engine's internal state.
static void SetResetChildPollingEngineFunc(
child_postfork_func reset_child_polling_engine);
static child_postfork_func GetResetChildPollingEngineFunc();
static const std::vector<child_postfork_func>&
GetResetChildPollingEngineFunc();

// Check if there is a single active ExecCtx
// (the one used to invoke this function). If there are more,
Expand Down Expand Up @@ -87,7 +89,7 @@ class Fork {

static std::atomic<bool> support_enabled_;
static bool override_enabled_;
static child_postfork_func reset_child_polling_engine_;
static std::vector<child_postfork_func>* reset_child_polling_engine_;
};

} // namespace grpc_core
Expand Down
9 changes: 5 additions & 4 deletions src/core/lib/iomgr/fork_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ void grpc_postfork_child() {
if (!skipped_handler) {
grpc_core::Fork::AllowExecCtx();
grpc_core::ExecCtx exec_ctx;
grpc_core::Fork::child_postfork_func reset_polling_engine =
grpc_core::Fork::GetResetChildPollingEngineFunc();
if (reset_polling_engine != nullptr) {
reset_polling_engine();
for (auto* reset_polling_engine :
grpc_core::Fork::GetResetChildPollingEngineFunc()) {
if (reset_polling_engine != nullptr) {
reset_polling_engine();
}
}
grpc_timer_manager_set_threading(true);
grpc_core::Executor::SetThreadingAll(true);
Expand Down
37 changes: 37 additions & 0 deletions src/python/grpcio_tests/tests/fork/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:cython_library.bzl", "pyx_library")

pyx_library(
name = "native_debug",
srcs = ["native_debug.pyx"],
deps = ["@com_google_absl//absl/debugging:failure_signal_handler"],
)

py_test(
name = "fork_test",
srcs = glob(["*.py"]),
imports = ["../.."],
main = "_fork_interop_test.py",
python_version = "PY3",
deps = [
":native_debug",
"//src/proto/grpc/testing:empty_py_pb2",
"//src/proto/grpc/testing:py_messages_proto",
"//src/proto/grpc/testing:test_py_pb2_grpc",
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_tests/tests/interop:service",
"//src/python/grpcio_tests/tests/unit:test_common",
],
)

0 comments on commit 2088c69

Please sign in to comment.