Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Python epoll1 Fork Support #32196

Merged
merged 23 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1599,6 +1599,7 @@ grpc_cc_library(
deps = [
"event_engine_poller",
"event_engine_time_util",
"forkable",
"iomgr_port",
"posix_event_engine_closure",
"posix_event_engine_event_poller",
Expand Down
17 changes: 11 additions & 6 deletions src/core/lib/event_engine/forkable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ namespace experimental {
namespace {
grpc_core::NoDestruct<grpc_core::Mutex> g_mu;
bool g_registered ABSL_GUARDED_BY(g_mu){false};
grpc_core::NoDestruct<absl::flat_hash_set<Forkable*>> g_forkables
ABSL_GUARDED_BY(g_mu);

// This must be ordered because there are ordering dependencies between
// certain fork handlers.
grpc_core::NoDestruct<std::vector<Forkable*>> g_forkables ABSL_GUARDED_BY(g_mu);
} // namespace

Forkable::Forkable() { ManageForkable(this); }
Expand All @@ -48,8 +50,9 @@ void RegisterForkHandlers() {

void PrepareFork() {
grpc_core::MutexLock lock(g_mu.get());
for (auto* forkable : *g_forkables) {
forkable->PrepareFork();
for (auto forkable_iter = g_forkables->rbegin();
forkable_iter != g_forkables->rend(); ++forkable_iter) {
(*forkable_iter)->PrepareFork();
}
}
void PostforkParent() {
Expand All @@ -68,12 +71,14 @@ void PostforkChild() {

void ManageForkable(Forkable* forkable) {
grpc_core::MutexLock lock(g_mu.get());
g_forkables->insert(forkable);
g_forkables->push_back(forkable);
}

void StopManagingForkable(Forkable* forkable) {
grpc_core::MutexLock lock(g_mu.get());
g_forkables->erase(forkable);
auto iter = std::find(g_forkables->begin(), g_forkables->end(), forkable);
GPR_ASSERT(iter != g_forkables->end());
g_forkables->erase(iter);
}

} // namespace experimental
Expand Down
30 changes: 26 additions & 4 deletions src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ void ResetEventManagerOnFork() {
while (!fork_poller_list.empty()) {
Epoll1Poller* poller = fork_poller_list.front();
fork_poller_list.pop_front();
delete poller;
poller->Close();
}
gpr_mu_unlock(&fork_fd_list_mu);
if (grpc_core::Fork::Enabled()) {
Expand Down Expand Up @@ -353,7 +353,7 @@ void Epoll1EventHandle::HandleShutdownInternal(absl::Status why,
}

Epoll1Poller::Epoll1Poller(Scheduler* scheduler)
: scheduler_(scheduler), was_kicked_(false) {
: scheduler_(scheduler), was_kicked_(false), closed_(false) {
g_epoll_set_.epfd = EpollCreateAndCloexec();
wakeup_fd_ = *CreateWakeupFd();
GPR_ASSERT(wakeup_fd_ != nullptr);
Expand All @@ -374,7 +374,12 @@ void Epoll1Poller::Shutdown() {
delete this;
}

Epoll1Poller::~Epoll1Poller() {
void Epoll1Poller::Close() {
{
grpc_core::MutexLock lock(&mu_);
if (closed_) return;
}

if (g_epoll_set_.epfd >= 0) {
close(g_epoll_set_.epfd);
gnossen marked this conversation as resolved.
Show resolved Hide resolved
g_epoll_set_.epfd = -1;
Expand All @@ -387,9 +392,12 @@ Epoll1Poller::~Epoll1Poller() {
free_epoll1_handles_list_.pop_front();
delete handle;
}
closed_ = true;
}
}

Epoll1Poller::~Epoll1Poller() { Close(); }

EventHandle* Epoll1Poller::CreateHandle(int fd, absl::string_view /*name*/,
bool track_err) {
Epoll1EventHandle* new_handle = nullptr;
Expand Down Expand Up @@ -555,7 +563,7 @@ Poller::WorkResult Epoll1Poller::Work(

void Epoll1Poller::Kick() {
grpc_core::MutexLock lock(&mu_);
if (was_kicked_) {
if (was_kicked_ || closed_) {
return;
}
was_kicked_ = true;
Expand All @@ -570,6 +578,14 @@ Epoll1Poller* MakeEpoll1Poller(Scheduler* scheduler) {
return nullptr;
}

void Epoll1Poller::PrepareFork() { Kick(); }

// TODO(vigneshbabu): implement
void Epoll1Poller::PostforkParent() {}
gnossen marked this conversation as resolved.
Show resolved Hide resolved

// TODO(vigneshbabu): implement
void Epoll1Poller::PostforkChild() {}

} // namespace experimental
} // namespace grpc_event_engine

Expand Down Expand Up @@ -616,6 +632,12 @@ void Epoll1Poller::Kick() { grpc_core::Crash("unimplemented"); }
// nullptr.
Epoll1Poller* MakeEpoll1Poller(Scheduler* /*scheduler*/) { return nullptr; }

void Epoll1Poller::PrepareFork() {}

void Epoll1Poller::PostforkParent() {}

void Epoll1Poller::PostforkChild() {}

} // namespace experimental
} // namespace grpc_event_engine

Expand Down
12 changes: 11 additions & 1 deletion src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include <grpc/event_engine/event_engine.h>

#include "src/core/lib/event_engine/forkable.h"
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/internal_errqueue.h"
Expand All @@ -46,7 +47,7 @@ namespace experimental {
class Epoll1EventHandle;

// Definition of epoll1 based poller.
class Epoll1Poller : public PosixEventPoller {
class Epoll1Poller : public PosixEventPoller, public Forkable {
public:
explicit Epoll1Poller(Scheduler* scheduler);
EventHandle* CreateHandle(int fd, absl::string_view name,
Expand All @@ -67,6 +68,13 @@ class Epoll1Poller : public PosixEventPoller {
}
~Epoll1Poller() override;

// Forkable
void PrepareFork() override;
void PostforkParent() override;
void PostforkChild() override;

void Close();

private:
// This initial vector size may need to be tuned
using Events = absl::InlinedVector<Epoll1EventHandle*, 5>;
Expand All @@ -79,6 +87,7 @@ class Epoll1Poller : public PosixEventPoller {
// on file descriptors that became readable/writable.
bool ProcessEpollEvents(int max_epoll_events_to_handle,
Events& pending_events);

// Do epoll_wait and store the events in g_epoll_set.events field. This does
// not "process" any of the events yet; that is done in ProcessEpollEvents().
// See ProcessEpollEvents() function for more details. It returns the number
Expand Down Expand Up @@ -117,6 +126,7 @@ class Epoll1Poller : public PosixEventPoller {
bool was_kicked_ ABSL_GUARDED_BY(mu_);
std::list<EventHandle*> free_epoll1_handles_list_ ABSL_GUARDED_BY(mu_);
std::unique_ptr<WakeupFd> wakeup_fd_;
bool closed_;
};

// Return an instance of a epoll1 based poller tied to the specified event
Expand Down
7 changes: 6 additions & 1 deletion src/core/lib/event_engine/posix_engine/posix_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ void PosixEventEngine::OnConnectFinishInternal(int connection_handle) {
PosixEnginePollerManager::PosixEnginePollerManager(
std::shared_ptr<ThreadPool> executor)
: poller_(grpc_event_engine::experimental::MakeDefaultPoller(this)),
executor_(std::move(executor)) {}
executor_(std::move(executor)),
trigger_shutdown_called_(false) {}

PosixEnginePollerManager::PosixEnginePollerManager(PosixEventPoller* poller)
: poller_(poller),
Expand All @@ -316,6 +317,8 @@ void PosixEnginePollerManager::Run(absl::AnyInvocable<void()> cb) {
}

void PosixEnginePollerManager::TriggerShutdown() {
GPR_DEBUG_ASSERT(trigger_shutdown_called_ == false);
gnossen marked this conversation as resolved.
Show resolved Hide resolved
trigger_shutdown_called_ = true;
// If the poller is external, dont try to shut it down. Otherwise
// set poller state to PollerState::kShuttingDown.
if (poller_state_.exchange(PollerState::kShuttingDown) ==
Expand Down Expand Up @@ -347,6 +350,8 @@ PosixEventEngine::PosixEventEngine()
timer_manager_(executor_) {
if (NeedPosixEngine()) {
poller_manager_ = std::make_shared<PosixEnginePollerManager>(executor_);
// The threadpool must be instantiated after the poller otherwise, the
// process will deadlock when forking.
if (poller_manager_->Poller() != nullptr) {
executor_->Run([poller_manager = poller_manager_]() {
PollerWorkInternal(poller_manager);
Expand Down
1 change: 1 addition & 0 deletions src/core/lib/event_engine/posix_engine/posix_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ class PosixEnginePollerManager
grpc_event_engine::experimental::PosixEventPoller* poller_ = nullptr;
std::atomic<PollerState> poller_state_{PollerState::kOk};
std::shared_ptr<ThreadPool> executor_;
bool trigger_shutdown_called_;
};
#endif // GRPC_POSIX_SOCKET_TCP

Expand Down
18 changes: 14 additions & 4 deletions src/core/lib/gprpp/fork.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,19 @@ void Fork::DoDecExecCtxCount() {

void Fork::SetResetChildPollingEngineFunc(
Fork::child_postfork_func reset_child_polling_engine) {
reset_child_polling_engine_ = reset_child_polling_engine;
if (reset_child_polling_engine_ == nullptr) {
reset_child_polling_engine_ = new std::vector<Fork::child_postfork_func>();
}
if (reset_child_polling_engine == nullptr) {
reset_child_polling_engine_->clear();
} else {
reset_child_polling_engine_->emplace_back(reset_child_polling_engine);
}
}
Fork::child_postfork_func Fork::GetResetChildPollingEngineFunc() {
return reset_child_polling_engine_;

const std::vector<Fork::child_postfork_func>&
Fork::GetResetChildPollingEngineFunc() {
return *reset_child_polling_engine_;
}

bool Fork::BlockExecCtx() {
Expand Down Expand Up @@ -228,5 +237,6 @@ void Fork::AwaitThreads() {

std::atomic<bool> Fork::support_enabled_(false);
bool Fork::override_enabled_ = false;
Fork::child_postfork_func Fork::reset_child_polling_engine_ = nullptr;
std::vector<Fork::child_postfork_func>* Fork::reset_child_polling_engine_ =
gnossen marked this conversation as resolved.
Show resolved Hide resolved
nullptr;
} // namespace grpc_core
6 changes: 4 additions & 2 deletions src/core/lib/gprpp/fork.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <grpc/support/port_platform.h>

#include <atomic>
#include <vector>

//
// NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
Expand Down Expand Up @@ -58,7 +59,8 @@ class Fork {
// reset the polling engine's internal state.
static void SetResetChildPollingEngineFunc(
child_postfork_func reset_child_polling_engine);
static child_postfork_func GetResetChildPollingEngineFunc();
static const std::vector<child_postfork_func>&
GetResetChildPollingEngineFunc();

// Check if there is a single active ExecCtx
// (the one used to invoke this function). If there are more,
Expand Down Expand Up @@ -87,7 +89,7 @@ class Fork {

static std::atomic<bool> support_enabled_;
static bool override_enabled_;
static child_postfork_func reset_child_polling_engine_;
static std::vector<child_postfork_func>* reset_child_polling_engine_;
};

} // namespace grpc_core
Expand Down
9 changes: 5 additions & 4 deletions src/core/lib/iomgr/fork_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ void grpc_postfork_child() {
if (!skipped_handler) {
grpc_core::Fork::AllowExecCtx();
grpc_core::ExecCtx exec_ctx;
grpc_core::Fork::child_postfork_func reset_polling_engine =
grpc_core::Fork::GetResetChildPollingEngineFunc();
if (reset_polling_engine != nullptr) {
reset_polling_engine();
for (auto* reset_polling_engine :
grpc_core::Fork::GetResetChildPollingEngineFunc()) {
if (reset_polling_engine != nullptr) {
reset_polling_engine();
}
}
grpc_timer_manager_set_threading(true);
grpc_core::Executor::SetThreadingAll(true);
Expand Down
37 changes: 37 additions & 0 deletions src/python/grpcio_tests/tests/fork/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:cython_library.bzl", "pyx_library")

pyx_library(
name = "native_debug",
srcs = ["native_debug.pyx"],
deps = ["@com_google_absl//absl/debugging:failure_signal_handler"],
)

py_test(
name = "fork_test",
srcs = glob(["*.py"]),
imports = ["../.."],
main = "_fork_interop_test.py",
python_version = "PY3",
deps = [
":native_debug",
"//src/proto/grpc/testing:empty_py_pb2",
"//src/proto/grpc/testing:py_messages_proto",
"//src/proto/grpc/testing:test_py_pb2_grpc",
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_tests/tests/interop:service",
"//src/python/grpcio_tests/tests/unit:test_common",
],
)