Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt: refactor streaming decoder to fully use buffer #550

Merged
merged 5 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions fuzz/fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func fuzzMain(t *testing.T, data []byte) {
if !json.Valid(data) {
return
}
fuzzStream(t, data)
for i, typ := range []func() interface{}{
func() interface{} { return new(interface{}) },
func() interface{} { return new(map[string]interface{}) },
Expand Down
38 changes: 38 additions & 0 deletions fuzz/other_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
`unicode/utf8`

`github.com/bytedance/sonic/encoder`
`github.com/bytedance/sonic/decoder`
`github.com/stretchr/testify/require`
// `github.com/davecgh/go-spew/spew`
)
Expand All @@ -44,4 +45,41 @@ func fuzzHtmlEscape(t *testing.T, data []byte){
json.HTMLEscape(&jdst, data)
sdst = encoder.HTMLEscape(sdst, data)
require.Equalf(t, string(jdst.Bytes()), string(sdst), "different htmlescape results")
}

// data is random, check whether is panic
func fuzzStream(t *testing.T, data []byte) {
r := bytes.NewBuffer(data)
dc := decoder.NewStreamDecoder(r)
dc.ValidateString()
r1 := bytes.NewBuffer(data)
dc1 := decoder.NewStreamDecoder(r1)

w := bytes.NewBuffer(nil)
ec := encoder.NewStreamEncoder(w)
ec.SetCompactMarshaler(true)
ec.SetValidateString(true)
ec.SetEscapeHTML(true)
ec.SortKeys()
w1 := bytes.NewBuffer(nil)
ec1 := encoder.NewStreamEncoder(w1)

for dc1.More() {
if !dc.More() {
t.Fatal()
}
var obj interface{}
err := dc.Decode(&obj)
var obj1 interface{}
err1 := dc1.Decode(&obj1)
require.Equal(t, err1 == nil, err == nil)
// require.Equal(t, obj, obj1)
if err1 != nil {
return
}

ee := ec.Encode(obj)
ee1 := ec1.Encode(obj1)
require.Equal(t, ee == nil, ee1 == nil)
}
}
205 changes: 117 additions & 88 deletions internal/decoder/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import (

`github.com/bytedance/sonic/internal/native`
`github.com/bytedance/sonic/internal/native/types`
`github.com/bytedance/sonic/internal/rt`
`github.com/bytedance/sonic/option`
)

var (
minLeftBufferShift uint = 1
minLeftBufferShift uint = 1
)

// StreamDecoder is the decoder context object for streaming input.
Expand Down Expand Up @@ -58,95 +59,71 @@ func NewStreamDecoder(r io.Reader) *StreamDecoder {
// Either io error from underlying io.Reader (except io.EOF)
// or syntax error from data will be recorded and stop subsequently decoding.
func (self *StreamDecoder) Decode(val interface{}) (err error) {
if self.err != nil {
return self.err
}

var buf = self.buf[self.scanp:]
var p = 0
var recycle bool
if cap(buf) == 0 {
buf = bufPool.Get().([]byte)
recycle = true
}

var first = true
var repeat = true

read_more:
for {
l := len(buf)
realloc(&buf)
n, err := self.r.Read(buf[l:cap(buf)])
buf = buf[:l+n]
if err != nil {
repeat = false
if err == io.EOF {
if len(buf) == 0 {
return err
}
break
}
self.err = err
return err
}
if n > 0 || first {
break
}
}
first = false

l := len(buf)
if l > 0 {
self.Decoder.Reset(string(buf))

var x int
if ret := native.SkipOneFast(&self.s, &x); ret < 0 {
if repeat {
goto read_more
// read more data into buf
if self.More() {
// println(string(self.buf))
var s = self.scanp
try_skip:
var e = len(self.buf)
// println("s:", s, "e:", e, "scanned:",self.scanned, "scanp:",self.scanp, self.buf)
var src = rt.Mem2Str(self.buf[s:e])
// if len(src) > 5 {
// println(src[:5], src[len(src)-5:])
// } else {
// println(src)
// }
// try skip
var x = 0;
if y := native.SkipOneFast(&src, &x); y < 0 {
if self.readMore() {
// println("more")
goto try_skip
} else {
err = SyntaxError{x, self.s, types.ParsingError(-ret), ""}
self.err = err
// println("no more")
err = SyntaxError{e, self.s, types.ParsingError(-s), ""}
self.setErr(err)
return
}
} else {
s = y + s
e = x + s
}


// println("decode: ", s, e)
// must copy string here for safety
self.Decoder.Reset(string(self.buf[s:e]))
err = self.Decoder.Decode(val)
if err != nil {
self.err = err
self.setErr(err)
return
}

p = self.Decoder.Pos()
self.scanned += int64(p)
self.scanp = 0
}

if l > p {
// remain undecoded bytes, so copy them into self.buf
self.buf = append(self.buf[:0], buf[p:]...)
} else {
self.buf = nil
recycle = true
}
self.scanp = e
_, empty := self.scan()
if empty {
// println("recycle")
// no remain valid bytes, thus we just recycle buffer
mem := self.buf
self.buf = nil
bufPool.Put(mem[:0])
} else {
// println("keep")
// remain undecoded bytes, move them onto head
n := copy(self.buf, self.buf[self.scanp:])
self.buf = self.buf[:n]
}

if recycle {
buf = buf[:0]
bufPool.Put(buf)
}
return err
}
self.scanned += int64(self.scanp)
self.scanp = 0
}

func (self StreamDecoder) repeatable(err error) bool {
if ee, ok := err.(SyntaxError); ok &&
(ee.Code == types.ERR_EOF || (ee.Code == types.ERR_INVALID_CHAR && self.i >= len(self.s)-1)) {
return true
}
return false
return self.err
}

// InputOffset returns the input stream byte offset of the current decoder position.
// The offset gives the location of the end of the most recently returned token and the beginning of the next token.
func (self *StreamDecoder) InputOffset() int64 {
// println("input offset",self.scanned, self.scanp)
return self.scanned + int64(self.scanp)
}

Expand All @@ -166,28 +143,72 @@ func (self *StreamDecoder) More() bool {
return err == nil && c != ']' && c != '}'
}

// More reports whether there is another element in the
// current array or object being parsed.
func (self *StreamDecoder) readMore() bool {
if self.err != nil {
return false
}

var err error
var n int
for {
// Grow buffer if not large enough.
l := len(self.buf)
realloc(&self.buf)

n, err = self.r.Read(self.buf[l:cap(self.buf)])
self.buf = self.buf[: l+n]

self.scanp = l
_, empty := self.scan()
if !empty {
return true
}

// buffer has been scanned, now report any error
if err != nil {
self.setErr(err)
return false
}
}
}

func (self *StreamDecoder) setErr(err error) {
self.err = err
mem := self.buf[:0]
self.buf = nil
bufPool.Put(mem)
}

func (self *StreamDecoder) peek() (byte, error) {
var err error
for {
for i := self.scanp; i < len(self.buf); i++ {
c := self.buf[i]
if isSpace(c) {
continue
}
self.scanp = i
return c, nil
c, empty := self.scan()
if !empty {
return byte(c), nil
}
// buffer has been scanned, now report any error
if err != nil {
if err != io.EOF {
self.err = err
}
self.setErr(err)
return 0, err
}
err = self.refill()
}
}

func (self *StreamDecoder) scan() (byte, bool) {
for i := self.scanp; i < len(self.buf); i++ {
c := self.buf[i]
if isSpace(c) {
continue
}
self.scanp = i
return c, false
}
return 0, true
}

func isSpace(c byte) bool {
return types.SPACE_MASK & (1 << c) != 0
}
Expand All @@ -212,17 +233,25 @@ func (self *StreamDecoder) refill() error {
return err
}

func realloc(buf *[]byte) {
func realloc(buf *[]byte) bool {
l := uint(len(*buf))
c := uint(cap(*buf))
if c == 0 {
// println("use pool!")
*buf = bufPool.Get().([]byte)
return true
}
if c - l <= c >> minLeftBufferShift {
// println("realloc!")
e := l+(l>>minLeftBufferShift)
if e < option.DefaultDecoderBufferSize {
e = option.DefaultDecoderBufferSize
if e <= c {
e = c*2
}
tmp := make([]byte, l, e)
copy(tmp, *buf)
*buf = tmp
return true
}
return false
}