Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GODRIVER-2910 Add durations to connection pool events. #1590

Merged
merged 3 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions event/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ type PoolEvent struct {
Address string `json:"address"`
ConnectionID uint64 `json:"connectionId"`
PoolOptions *MonitorPoolOptions `json:"options"`
Duration time.Duration `json:"duration"`
Reason string `json:"reason"`
// ServiceID is only set if the Type is PoolCleared and the server is deployed behind a load balancer. This field
// can be used to distinguish between individual servers in a load balanced deployment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@
"int",
"long"
]
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
}
}
},
Expand All @@ -162,6 +169,13 @@
"int",
"long"
]
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
}
}
},
Expand Down Expand Up @@ -222,6 +236,13 @@
"int",
"long"
]
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
}
}
},
Expand Down Expand Up @@ -444,7 +465,7 @@
{
"level": "debug",
"component": "connection",
"unordered": true,
"unordered": true,
"data": {
"message": "Connection closed",
"driverConnectionId": {
Expand All @@ -471,7 +492,7 @@
{
"level": "debug",
"component": "connection",
"unordered": true,
"unordered": true,
"data": {
"message": "Connection checkout failed",
"serverHost": {
Expand All @@ -486,6 +507,13 @@
"reason": "An error occurred while trying to establish a new connection",
"error": {
"$$exists": true
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ tests:
driverConnectionId: { $$type: [int, long] }
serverHost: { $$type: string }
serverPort: { $$type: [int, long] }
durationMS: { $$type: [double, int, long] }

- level: debug
component: connection
Expand All @@ -74,6 +75,7 @@ tests:
driverConnectionId: { $$type: [int, long] }
serverHost: { $$type: string }
serverPort: { $$type: [int, long] }
durationMS: { $$type: [double, int, long] }

- level: debug
component: connection
Expand All @@ -98,6 +100,7 @@ tests:
driverConnectionId: { $$type: [int, long] }
serverHost: { $$type: string }
serverPort: { $$type: [int, long] }
durationMS: { $$type: [double, int, long] }

- level: debug
component: connection
Expand Down Expand Up @@ -219,4 +222,5 @@ tests:
serverPort: { $$type: [int, long] }
reason: "An error occurred while trying to establish a new connection"
error: { $$exists: true }
durationMS: { $$type: [double, int, long] }
unordered: true
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"description": "connection-pool-logging",
"description": "connection-pool-options",
"schemaVersion": "1.13",
"runOnRequirements": [
{
Expand Down Expand Up @@ -128,6 +128,13 @@
"int",
"long"
]
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
description: "connection-pool-logging"
description: "connection-pool-options"

schemaVersion: "1.13"

Expand Down Expand Up @@ -71,6 +71,7 @@ tests:
driverConnectionId: { $$type: [int, long] }
serverHost: { $$type: string }
serverPort: { $$type: [int, long] }
durationMS: { $$type: [double, int, long] }

# Drivers who have not done DRIVERS-1943 will need to skip this test.
- description: "maxConnecting should be included in connection pool created message when specified"
Expand Down
70 changes: 48 additions & 22 deletions x/mongo/driver/topology/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
})
}

start := time.Now()
// Check the pool state while holding a stateMu read lock. If the pool state is not "ready",
// return an error. Do all of this while holding the stateMu read lock to prevent a state change between
// checking the state and entering the wait queue. Not holding the stateMu read lock here may
Expand All @@ -477,8 +478,10 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
case poolClosed:
p.stateMu.RUnlock()

duration := time.Since(start)
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDurationMS, duration.Milliseconds(),
logger.KeyReason, logger.ReasonConnCheckoutFailedPoolClosed,
}

Expand All @@ -487,18 +490,21 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {

if p.monitor != nil {
p.monitor.Event(&event.PoolEvent{
Type: event.GetFailed,
Address: p.address.String(),
Reason: event.ReasonPoolClosed,
Type: event.GetFailed,
Address: p.address.String(),
Duration: duration,
Reason: event.ReasonPoolClosed,
})
}
return nil, ErrPoolClosed
case poolPaused:
err := poolClearedError{err: p.lastClearErr, address: p.address}
p.stateMu.RUnlock()

duration := time.Since(start)
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDurationMS, duration.Milliseconds(),
logger.KeyReason, logger.ReasonConnCheckoutFailedError,
}

Expand All @@ -507,10 +513,11 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {

if p.monitor != nil {
p.monitor.Event(&event.PoolEvent{
Type: event.GetFailed,
Address: p.address.String(),
Reason: event.ReasonConnectionErrored,
Error: err,
Type: event.GetFailed,
Address: p.address.String(),
Duration: duration,
Reason: event.ReasonConnectionErrored,
Error: err,
})
}
return nil, err
Expand Down Expand Up @@ -539,9 +546,11 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
// or an error, so unlock the stateMu lock here.
p.stateMu.RUnlock()

duration := time.Since(start)
if w.err != nil {
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDurationMS, duration.Milliseconds(),
logger.KeyReason, logger.ReasonConnCheckoutFailedError,
}

Expand All @@ -550,18 +559,21 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {

if p.monitor != nil {
p.monitor.Event(&event.PoolEvent{
Type: event.GetFailed,
Address: p.address.String(),
Reason: event.ReasonConnectionErrored,
Error: w.err,
Type: event.GetFailed,
Address: p.address.String(),
Duration: duration,
Reason: event.ReasonConnectionErrored,
Error: w.err,
})
}
return nil, w.err
}

duration = time.Since(start)
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDriverConnectionID, w.conn.driverConnectionID,
logger.KeyDurationMS, duration.Milliseconds(),
}

logPoolMessage(p, logger.ConnectionCheckedOut, keysAndValues...)
Expand All @@ -572,6 +584,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
Type: event.GetSucceeded,
Address: p.address.String(),
ConnectionID: w.conn.driverConnectionID,
Duration: duration,
})
}

Expand All @@ -584,12 +597,14 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
p.stateMu.RUnlock()

// Wait for either the wantConn to be ready or for the Context to time out.
start := time.Now()
waitQueueStart := time.Now()
select {
case <-w.ready:
if w.err != nil {
duration := time.Since(start)
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDurationMS, duration.Milliseconds(),
logger.KeyReason, logger.ReasonConnCheckoutFailedError,
logger.KeyError, w.err.Error(),
}
Expand All @@ -599,19 +614,22 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {

if p.monitor != nil {
p.monitor.Event(&event.PoolEvent{
Type: event.GetFailed,
Address: p.address.String(),
Reason: event.ReasonConnectionErrored,
Error: w.err,
Type: event.GetFailed,
Address: p.address.String(),
Duration: duration,
Reason: event.ReasonConnectionErrored,
Error: w.err,
})
}

return nil, w.err
}

duration := time.Since(start)
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDriverConnectionID, w.conn.driverConnectionID,
logger.KeyDurationMS, duration.Milliseconds(),
}

logPoolMessage(p, logger.ConnectionCheckedOut, keysAndValues...)
Expand All @@ -622,14 +640,17 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
Type: event.GetSucceeded,
Address: p.address.String(),
ConnectionID: w.conn.driverConnectionID,
Duration: duration,
})
}
return w.conn, nil
case <-ctx.Done():
duration := time.Since(start)
waitQueueDuration := time.Since(waitQueueStart)

duration := time.Since(start)
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDurationMS, duration.Milliseconds(),
logger.KeyReason, logger.ReasonConnCheckoutFailedTimout,
}

Expand All @@ -638,10 +659,11 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {

if p.monitor != nil {
p.monitor.Event(&event.PoolEvent{
Type: event.GetFailed,
Address: p.address.String(),
Reason: event.ReasonTimedOut,
Error: ctx.Err(),
Type: event.GetFailed,
Address: p.address.String(),
Duration: duration,
Reason: event.ReasonTimedOut,
Error: ctx.Err(),
})
}

Expand All @@ -650,7 +672,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
maxPoolSize: p.maxSize,
totalConnections: p.totalConnectionCount(),
availableConnections: p.availableConnectionCount(),
waitDuration: duration,
waitDuration: waitQueueDuration,
}
if p.loadBalanced {
err.pinnedConnections = &pinnedConnections{
Expand Down Expand Up @@ -1085,6 +1107,7 @@ func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) {
})
}

start := time.Now()
// Pass the createConnections context to connect to allow pool close to cancel connection
// establishment so shutdown doesn't block indefinitely if connectTimeout=0.
err := conn.connect(ctx)
Expand All @@ -1111,9 +1134,11 @@ func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) {
continue
}

duration := time.Since(start)
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDriverConnectionID, conn.driverConnectionID,
logger.KeyDurationMS, duration.Milliseconds(),
}

logPoolMessage(p, logger.ConnectionReady, keysAndValues...)
Expand All @@ -1124,6 +1149,7 @@ func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) {
Type: event.ConnectionReady,
Address: p.address.String(),
ConnectionID: conn.driverConnectionID,
Duration: duration,
})
}

Expand Down