Skip to content

Commit

Permalink
Merge pull request #127 from rvrangel/fix-125
Browse files Browse the repository at this point in the history
fix for issue 125 - lz4 data corruption when concurrency is used
  • Loading branch information
pierrec committed Jun 3, 2021
2 parents f414fbb + 63f912f commit bd2592c
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 6 deletions.
21 changes: 15 additions & 6 deletions writer.go
Expand Up @@ -84,10 +84,8 @@ func (z *Writer) WithConcurrency(n int) *Writer {
z.err = err
}
}
if isCompressed := res.size&compressedBlockFlag == 0; isCompressed {
// It is now safe to release the buffer as no longer in use by any goroutine.
putBuffer(cap(res.data), res.data)
}
// It is now safe to release the buffer as no longer in use by any goroutine.
putBuffer(cap(res.data), res.data)
if h := z.OnBlockDone; h != nil {
h(n)
}
Expand Down Expand Up @@ -231,7 +229,12 @@ func (z *Writer) compressBlock(data []byte) error {
if z.c != nil {
c := make(chan zResult)
z.c <- c // Send now to guarantee order
go writerCompressBlock(c, z.Header, data)

// get a buffer from the pool and copy the data over
block := getBuffer(z.Header.BlockMaxSize)[:len(data)]
copy(block, data)

go writerCompressBlock(c, z.Header, block)
return nil
}

Expand Down Expand Up @@ -299,7 +302,9 @@ func (z *Writer) Flush() error {
return nil
}

data := z.data[:z.idx]
data := getBuffer(z.Header.BlockMaxSize)[:len(z.data[:z.idx])]
copy(data, z.data[:z.idx])

z.idx = 0
if z.c == nil {
return z.compressBlock(data)
Expand Down Expand Up @@ -402,9 +407,13 @@ func writerCompressBlock(c chan zResult, header Header, data []byte) {
if zn > 0 && zn < len(data) {
res.size = uint32(zn)
res.data = zdata[:zn]
// release the uncompressed block since it is not used anymore
putBuffer(header.BlockMaxSize, data)
} else {
res.size = uint32(len(data)) | compressedBlockFlag
res.data = data
// release the compressed block since it was not used
putBuffer(header.BlockMaxSize, zdata)
}
if header.BlockChecksum {
res.checksum = xxh32.ChecksumZero(res.data)
Expand Down
58 changes: 58 additions & 0 deletions writer_test.go
@@ -1,6 +1,7 @@
package lz4_test

import (
"bufio"
"bytes"
"fmt"
"io"
Expand Down Expand Up @@ -183,3 +184,60 @@ func TestIssue71(t *testing.T) {
})
}
}

func TestIssue125(t *testing.T) {
goldenFiles := []string{
"testdata/e.txt",
"testdata/gettysburg.txt",
"testdata/Mark.Twain-Tom.Sawyer.txt",
"testdata/Mark.Twain-Tom.Sawyer_long.txt",
"testdata/pg1661.txt",
"testdata/pi.txt",
"testdata/random.data",
"testdata/repeat.txt",
"testdata/vmlinux_LZ4_19377",
}

for _, fname := range goldenFiles {
for _, concurrency := range []int{0, 4, -1} {
label := fmt.Sprintf("%s(%d)", fname, concurrency)
t.Run(label, func(t *testing.T) {
t.Parallel()

// this issue doesn't happen when read the entire file into a []byte
// and then use bytes.NewReader() since it actually does a copy on its
// own for each call to Read(), so we use *os.File directly with Copy()
f, err := os.Open(fname)
if err != nil {
t.Fatal(err)
}
defer f.Close()

var out bytes.Buffer
writer := lz4.NewWriter(&out).WithConcurrency(concurrency)
writer.Header = lz4.Header{
// we use a small enough block size so it is easier to trigger the issue
// as most examples files are small
BlockMaxSize: 1024 * 64,
}

_, err = io.Copy(writer, f)
if err != nil {
t.Fatal(err)
}
writer.Close()

reader := lz4.NewReader(&out)

var d bytes.Buffer
decompressed := bufio.NewWriter(&d)

_, err = io.Copy(decompressed, reader)
if err != nil {
t.Fail()
t.Log(err)
}
})
}
}
}

0 comments on commit bd2592c

Please sign in to comment.