Skip to content

Commit

Permalink
[FIXED] Set read deadline in object store to a configurable value
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed Jan 23, 2024
1 parent f42d2be commit 376b04e
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 8 deletions.
6 changes: 5 additions & 1 deletion jetstream/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,11 @@ func (obs *obs) Status(ctx context.Context) (ObjectStoreStatus, error) {
func (o *objResult) Read(p []byte) (n int, err error) {
o.Lock()
defer o.Unlock()
readDeadline := time.Now().Add(defaultAPITimeout)
if ctx := o.ctx; ctx != nil {
if deadline, ok := ctx.Deadline(); ok {
readDeadline = deadline
}
select {
case <-ctx.Done():
if ctx.Err() == context.Canceled {
Expand All @@ -1139,7 +1143,7 @@ func (o *objResult) Read(p []byte) (n int, err error) {
}

r := o.r.(net.Conn)
_ = r.SetReadDeadline(time.Now().Add(2 * time.Second))
_ = r.SetReadDeadline(readDeadline)
n, err = r.Read(p)
if err, ok := err.(net.Error); ok && err.Timeout() {
if ctx := o.ctx; ctx != nil {
Expand Down
28 changes: 28 additions & 0 deletions jetstream/test/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1077,3 +1077,31 @@ func TestDecodeObjectDigest(t *testing.T) {
})
}
}

func TestObjectStoreGetObjectContextTimeout(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

obs, err := js.CreateObjectStore(context.Background(), jetstream.ObjectStoreConfig{Bucket: "OBJS"})
expectOk(t, err)

blob := make([]byte, 1024)
_, err = rand.Read(blob)
expectOk(t, err)

_, err = obs.PutBytes(context.Background(), "blob", blob)
expectOk(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()

r, err := obs.Get(ctx, "blob")
time.Sleep(15 * time.Millisecond)
var res []byte
_, err = r.Read(res)
expectErr(t, err, nats.ErrTimeout)
r.Close()
}
20 changes: 13 additions & 7 deletions object.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,11 +535,12 @@ func DecodeObjectDigest(data string) ([]byte, error) {
// ObjectResult impl.
type objResult struct {
sync.Mutex
info *ObjectInfo
r io.ReadCloser
err error
ctx context.Context
digest hash.Hash
info *ObjectInfo
r io.ReadCloser
err error
ctx context.Context
digest hash.Hash
readTimeout time.Duration
}

func (info *ObjectInfo) isLink() bool {
Expand Down Expand Up @@ -623,7 +624,7 @@ func (obs *obs) Get(name string, opts ...GetObjectOpt) (ObjectResult, error) {
return lobs.Get(info.ObjectMeta.Opts.Link.Name)
}

result := &objResult{info: info, ctx: ctx}
result := &objResult{info: info, ctx: ctx, readTimeout: obs.js.opts.wait}
if info.Size == 0 {
return result, nil
}
Expand Down Expand Up @@ -1242,7 +1243,11 @@ func (obs *obs) Status() (ObjectStoreStatus, error) {
func (o *objResult) Read(p []byte) (n int, err error) {
o.Lock()
defer o.Unlock()
readDeadline := time.Now().Add(o.readTimeout)
if ctx := o.ctx; ctx != nil {
if deadline, ok := ctx.Deadline(); ok {
readDeadline = deadline
}
select {
case <-ctx.Done():
if ctx.Err() == context.Canceled {
Expand All @@ -1261,7 +1266,8 @@ func (o *objResult) Read(p []byte) (n int, err error) {
}

r := o.r.(net.Conn)
r.SetReadDeadline(time.Now().Add(2 * time.Second))
fmt.Println("read deadline", o.readTimeout)
r.SetReadDeadline(readDeadline)
n, err = r.Read(p)
if err, ok := err.(net.Error); ok && err.Timeout() {
if ctx := o.ctx; ctx != nil {
Expand Down
28 changes: 28 additions & 0 deletions test/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,3 +1058,31 @@ func TestDecodeObjectDigest(t *testing.T) {
})
}
}

func TestObjectStoreGetObjectContextTimeout(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS"})
expectOk(t, err)

blob := make([]byte, 1024)
_, err = rand.Read(blob)
expectOk(t, err)

_, err = obs.PutBytes("blob", blob)
expectOk(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()

r, err := obs.Get("blob", nats.Context(ctx))
time.Sleep(15 * time.Millisecond)
var res []byte
_, err = r.Read(res)
expectErr(t, err, nats.ErrTimeout)
r.Close()
}

0 comments on commit 376b04e

Please sign in to comment.