Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GODRIVER-3172 Read response in the background after an op timeout. #1589

Merged
merged 11 commits into from
Apr 12, 2024
Merged
6 changes: 4 additions & 2 deletions internal/csot/csot.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ type timeoutKey struct{}
// TODO default behavior.
func MakeTimeoutContext(ctx context.Context, to time.Duration) (context.Context, context.CancelFunc) {
// Only use the passed in Duration as a timeout on the Context if it
// is non-zero.
// is non-zero and if the Context doesn't already have a timeout.
cancelFunc := func() {}
if to != 0 {
if _, deadlineSet := ctx.Deadline(); to != 0 && !deadlineSet {
ctx, cancelFunc = context.WithTimeout(ctx, to)
}

// Add timeoutKey either way to indicate CSOT is enabled.
return context.WithValue(ctx, timeoutKey{}, true), cancelFunc
}

Expand Down
8 changes: 4 additions & 4 deletions mongo/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,10 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err
cs.aggregate.Pipeline(plArr)
}

// If no deadline is set on the passed-in context, cs.client.timeout is set, and context is not already
// a Timeout context, honor cs.client.timeout in new Timeout context for change stream operation execution
// and potential retry.
if _, deadlineSet := ctx.Deadline(); !deadlineSet && cs.client.timeout != nil && !csot.IsTimeoutContext(ctx) {
// If cs.client.timeout is set and context is not already a Timeout context,
// honor cs.client.timeout in new Timeout context for change stream
// operation execution and potential retry.
if cs.client.timeout != nil && !csot.IsTimeoutContext(ctx) {
newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *cs.client.timeout)
// Redefine ctx to be the new timeout-derived context.
ctx = newCtx
Expand Down
30 changes: 28 additions & 2 deletions mongo/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,15 @@ func aggregate(a aggregateParams) (cur *Cursor, err error) {
Timeout(a.client.timeout).
MaxTime(ao.MaxTime)

// Omit "maxTimeMS" from operations that return a user-managed cursor to
// prevent confusing "cursor not found" errors. To maintain existing
// behavior for users who set "timeoutMS" with no context deadline, only
// omit "maxTimeMS" when a context deadline is set.
//
// See DRIVERS-2722 for more detail.
_, deadlineSet := a.ctx.Deadline()
op.OmitCSOTMaxTimeMS(deadlineSet)

if ao.AllowDiskUse != nil {
op.AllowDiskUse(*ao.AllowDiskUse)
}
Expand Down Expand Up @@ -1191,6 +1200,22 @@ func (coll *Collection) Distinct(ctx context.Context, fieldName string, filter i
// For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/find/.
func (coll *Collection) Find(ctx context.Context, filter interface{},
opts ...*options.FindOptions) (cur *Cursor, err error) {
// Omit "maxTimeMS" from operations that return a user-managed cursor to
// prevent confusing "cursor not found" errors. To maintain existing
// behavior for users who set "timeoutMS" with no context deadline, only
// omit "maxTimeMS" when a context deadline is set.
//
// See DRIVERS-2722 for more detail.
_, deadlineSet := ctx.Deadline()
return coll.find(ctx, filter, deadlineSet, opts...)
}

func (coll *Collection) find(
ctx context.Context,
filter interface{},
omitCSOTMaxTimeMS bool,
opts ...*options.FindOptions,
) (cur *Cursor, err error) {

if ctx == nil {
ctx = context.Background()
Expand Down Expand Up @@ -1230,7 +1255,8 @@ func (coll *Collection) Find(ctx context.Context, filter interface{},
CommandMonitor(coll.client.monitor).ServerSelector(selector).
ClusterClock(coll.client.clock).Database(coll.db.name).Collection(coll.name).
Deployment(coll.client.deployment).Crypt(coll.client.cryptFLE).ServerAPI(coll.client.serverAPI).
Timeout(coll.client.timeout).MaxTime(fo.MaxTime).Logger(coll.client.logger)
Timeout(coll.client.timeout).MaxTime(fo.MaxTime).Logger(coll.client.logger).
OmitCSOTMaxTimeMS(omitCSOTMaxTimeMS)

cursorOpts := coll.client.createBaseCursorOptions()

Expand Down Expand Up @@ -1408,7 +1434,7 @@ func (coll *Collection) FindOne(ctx context.Context, filter interface{},
// by the server.
findOpts = append(findOpts, options.Find().SetLimit(-1))

cursor, err := coll.Find(ctx, filter, findOpts...)
cursor, err := coll.find(ctx, filter, false, findOpts...)
return &SingleResult{
ctx: ctx,
cur: cursor,
Expand Down
12 changes: 6 additions & 6 deletions mongo/gridfs/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,10 @@ func (b *Bucket) Delete(fileID interface{}) error {
//
// Use the context parameter to time-out or cancel the delete operation. The deadline set by SetWriteDeadline is ignored.
func (b *Bucket) DeleteContext(ctx context.Context, fileID interface{}) error {
// If no deadline is set on the passed-in context, Timeout is set on the Client, and context is
// not already a Timeout context, honor Timeout in new Timeout context for operation execution to
// If Timeout is set on the Client and context is not already a Timeout
// context, honor Timeout in new Timeout context for operation execution to
// be shared by both delete operations.
if _, deadlineSet := ctx.Deadline(); !deadlineSet && b.db.Client().Timeout() != nil && !csot.IsTimeoutContext(ctx) {
if b.db.Client().Timeout() != nil && !csot.IsTimeoutContext(ctx) {
newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *b.db.Client().Timeout())
// Redefine ctx to be the new timeout-derived context.
ctx = newCtx
Expand Down Expand Up @@ -384,10 +384,10 @@ func (b *Bucket) Drop() error {
//
// Use the context parameter to time-out or cancel the drop operation. The deadline set by SetWriteDeadline is ignored.
func (b *Bucket) DropContext(ctx context.Context) error {
// If no deadline is set on the passed-in context, Timeout is set on the Client, and context is
// not already a Timeout context, honor Timeout in new Timeout context for operation execution to
// If Timeout is set on the Client and context is not already a Timeout
// context, honor Timeout in new Timeout context for operation execution to
// be shared by both drop operations.
if _, deadlineSet := ctx.Deadline(); !deadlineSet && b.db.Client().Timeout() != nil && !csot.IsTimeoutContext(ctx) {
if b.db.Client().Timeout() != nil && !csot.IsTimeoutContext(ctx) {
newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *b.db.Client().Timeout())
// Redefine ctx to be the new timeout-derived context.
ctx = newCtx
Expand Down