Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move http cache writing to cacheitem. #4919

Merged
merged 3 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
79 changes: 79 additions & 0 deletions .github/workflows/turborepo-compare-cache-item.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
name: Turborepo Compare Cache Item

on:
workflow_dispatch:
inputs:
version:
description: Turborepo release to test.
type: string
default: "canary"

jobs:
generate_cache_artifact:
strategy:
matrix:
os: [macos-latest, ubuntu-latest, windows-latest]
runs-on: ${{ matrix.os }}

steps:
- name: Setup Node.js
uses: actions/setup-node@v3
with:
node-version: 18

- name: create-turbo
run: |
npm install -g pnpm turbo@${{ inputs.version }}
pnpm dlx create-turbo@${{ inputs.version }} my-turborepo pnpm

- name: Run build
run: |
cd my-turborepo
turbo run build --filter=docs --filter=web --summarize --skip-infer -vvv

- name: Grab Turborepo artifacts
uses: actions/upload-artifact@v3
with:
name: cache-item-${{ matrix.os }}-${{ inputs.version }}
path: |
my-turborepo/node_modules/.cache/turbo
my-turborepo/.turbo/runs
retention-days: 1

use_cache_artifact:
needs: generate_cache_artifact
strategy:
fail-fast: false
matrix:
os: [macos-latest, ubuntu-latest, windows-latest]
cache_os: [macos-latest, ubuntu-latest, windows-latest]
runs-on: ${{ matrix.os }}

steps:
- name: Setup Node.js
uses: actions/setup-node@v3
with:
node-version: 18

- name: create-turbo
run: |
npm install -g pnpm turbo@${{ inputs.version }}
pnpm dlx create-turbo@${{ inputs.version }} my-turborepo pnpm

- name: Download cache artifacts
uses: actions/download-artifact@v3
with:
name: cache-item-${{ matrix.cache_os }}-${{ inputs.version }}
path: my-turborepo

- name: Check for cache hit
run: |
cd my-turborepo
rm .turbo/runs/*.json
turbo run build --filter=docs --filter=web --summarize --skip-infer -vvv
cat .turbo/runs/*.json | jq -e '.execution.cached == 2'

- name: Check for functional server
run: |
curl https://raw.githubusercontent.com/vercel/turbo/main/scripts/server.js -O
node server.js my-turborepo/apps/docs
90 changes: 19 additions & 71 deletions cli/internal/cache/cache_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,16 @@
package cache

import (
"archive/tar"
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
log "log"
"net/http"
"os"
"path/filepath"
"strconv"
"time"

"github.com/DataDog/zstd"

"github.com/vercel/turbo/cli/internal/analytics"
"github.com/vercel/turbo/cli/internal/cacheitem"
"github.com/vercel/turbo/cli/internal/tarpatch"
"github.com/vercel/turbo/cli/internal/turbopath"
)

Expand Down Expand Up @@ -51,19 +43,15 @@ func (l limiter) release() {
<-l
}

// mtime is the time we attach for the modification time of all files.
var mtime = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)

// nobody is the usual uid / gid of the 'nobody' user.
const nobody = 65534

func (cache *httpCache) Put(_ turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error {
func (cache *httpCache) Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error {
// if cache.writable {
cache.requestLimiter.acquire()
defer cache.requestLimiter.release()

r, w := io.Pipe()
go cache.write(w, hash, files)

cacheErrorChan := make(chan error, 1)
go cache.write(w, anchor, files, cacheErrorChan)

// Read the entire artifact tar into memory so we can easily compute the signature.
// Note: retryablehttp.NewRequest reads the files into memory anyways so there's no
Expand All @@ -79,69 +67,29 @@ func (cache *httpCache) Put(_ turbopath.AbsoluteSystemPath, hash string, duratio
return fmt.Errorf("failed to store files in HTTP cache: %w", err)
}
}

cacheCreateError := <-cacheErrorChan
if cacheCreateError != nil {
return cacheCreateError
}

return cache.client.PutArtifact(hash, artifactBody, duration, tag)
}

// write writes a series of files into the given Writer.
func (cache *httpCache) write(w io.WriteCloser, hash string, files []turbopath.AnchoredSystemPath) {
defer w.Close()
defer func() { _ = w.Close() }()
zw := zstd.NewWriter(w)
defer func() { _ = zw.Close() }()
tw := tar.NewWriter(zw)
defer func() { _ = tw.Close() }()
for _, file := range files {
// log.Printf("caching file %v", file)
if err := cache.storeFile(tw, file); err != nil {
log.Printf("[ERROR] Error uploading artifact %s to HTTP cache due to: %s", file, err)
// TODO(jaredpalmer): How can we cancel the request at this point?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fixed by using the channel. Previously failing here didn't actually blow up the artifact.

}
}
}
func (cache *httpCache) write(w io.WriteCloser, anchor turbopath.AbsoluteSystemPath, files []turbopath.AnchoredSystemPath, cacheErrorChan chan error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now 1:1 with cache_fs's implementation.

cacheItem := cacheitem.CreateWriter(w)

func (cache *httpCache) storeFile(tw *tar.Writer, repoRelativePath turbopath.AnchoredSystemPath) error {
absoluteFilePath := repoRelativePath.RestoreAnchor(cache.repoRoot)
info, err := absoluteFilePath.Lstat()
if err != nil {
return err
}
target := ""
if info.Mode()&os.ModeSymlink != 0 {
target, err = absoluteFilePath.Readlink()
for _, file := range files {
err := cacheItem.AddFile(anchor, file)
if err != nil {
return err
_ = cacheItem.Close()
cacheErrorChan <- err
return
}
}
hdr, err := tarpatch.FileInfoHeader(repoRelativePath.ToUnixPath(), info, filepath.ToSlash(target))
if err != nil {
return err
}
// Ensure posix path for filename written in header.
hdr.Name = repoRelativePath.ToUnixPath().ToString()
// Zero out all timestamps.
hdr.ModTime = mtime
hdr.AccessTime = mtime
hdr.ChangeTime = mtime
// Strip user/group ids.
hdr.Uid = nobody
hdr.Gid = nobody
hdr.Uname = "nobody"
hdr.Gname = "nobody"
if err := tw.WriteHeader(hdr); err != nil {
return err
} else if info.IsDir() || target != "" {
return nil // nothing to write
}
f, err := absoluteFilePath.Open()
if err != nil {
return err
}
defer func() { _ = f.Close() }()
_, err = io.Copy(tw, f)
if errors.Is(err, tar.ErrWriteTooLong) {
log.Printf("Error writing %v to tar file, info: %v, mode: %v, is regular: %v", repoRelativePath, info, info.Mode(), info.Mode().IsRegular())
}
return err

cacheErrorChan <- cacheItem.Close()
}

func (cache *httpCache) Fetch(_ turbopath.AbsoluteSystemPath, key string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
Expand Down
49 changes: 45 additions & 4 deletions cli/internal/cache/cache_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"bytes"
"errors"
"net/http"
"os"
"testing"

"github.com/DataDog/zstd"

"github.com/vercel/turbo/cli/internal/cacheitem"
"github.com/vercel/turbo/cli/internal/fs"
"github.com/vercel/turbo/cli/internal/turbopath"
"github.com/vercel/turbo/cli/internal/util"
Expand All @@ -17,9 +18,21 @@ import (

type errorResp struct {
err error
t *testing.T
}

func (sr *errorResp) PutArtifact(hash string, body []byte, duration int, tag string) error {
sr.t.Helper()
outdir := turbopath.AbsoluteSystemPathFromUpstream(sr.t.TempDir())
cache := cacheitem.FromReader(bytes.NewReader(body), true)
restored, err := cache.Restore(outdir)

sr.t.Log(restored)
assert.Equal(sr.t, restored[0].ToString(), "one")
assert.Equal(sr.t, restored[1].ToString(), "two")
assert.Equal(sr.t, len(restored), 2)
assert.NilError(sr.t, err, "Restoration was successful.")

return sr.err
}

Expand Down Expand Up @@ -240,6 +253,34 @@ func TestRestoreInvalidTar(t *testing.T) {
assert.Equal(t, string(contents), string(expectedContents), "expected to not overwrite file")
}

// Note that testing Put will require mocking the filesystem and is not currently the most
// interesting test. The current implementation directly returns the error from PutArtifact.
// We should still add the test once feasible to avoid future breakage.
func Test_httpCache_Put(t *testing.T) {
root := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
_ = root.Join("one").WriteFile(nil, 0644)
_ = root.Join("two").WriteFile(nil, 0644)

clientErr := errors.New("PutArtifact")
client := &errorResp{err: clientErr, t: t}

cache := newHTTPCache(Opts{}, client, nil, root)

assert.ErrorIs(
t,
cache.Put(root, "000", 10, []turbopath.AnchoredSystemPath{"one", "two"}),
clientErr,
"Succeeds at writing, cache item is successfully passed through.",
)

assert.ErrorIs(
t,
cache.Put(root, "000", 10, []turbopath.AnchoredSystemPath{"one", "two", "missing"}),
os.ErrNotExist,
"Errors with missing file.",
)

assert.ErrorIs(
t,
cache.Put(root, "000", 10, []turbopath.AnchoredSystemPath{"missing", "one", "two"}),
os.ErrNotExist,
"Errors with missing file at first load.",
)
}
10 changes: 8 additions & 2 deletions cli/internal/cacheitem/cacheitem.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type CacheItem struct {
tw *tar.Writer
zw io.WriteCloser
fileBuffer *bufio.Writer
handle io.Reader
handle interface{}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caches are Reader | Writer | Closer but you can't express that in Go. So this goes to interface{} and each place that needs it does a type assertion.

#4634 made this partially generic, this finishes the work.

compressed bool
}

Expand Down Expand Up @@ -72,7 +72,13 @@ func (ci *CacheItem) Close() error {
// GetSha returns the SHA-512 hash for the CacheItem.
func (ci *CacheItem) GetSha() ([]byte, error) {
sha := sha512.New()
if _, err := io.Copy(sha, ci.handle); err != nil {

reader, isReader := ci.handle.(io.Reader)
if !isReader {
panic("can't read from this cache item")
}

if _, err := io.Copy(sha, reader); err != nil {
return nil, err
}

Expand Down
11 changes: 11 additions & 0 deletions cli/internal/cacheitem/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ func Create(path turbopath.AbsoluteSystemPath) (*CacheItem, error) {
return cacheItem, nil
}

// CreateWriter makes a new CacheItem using the specified writer.
func CreateWriter(writer io.WriteCloser) *CacheItem {
cacheItem := &CacheItem{
handle: writer,
compressed: true,
}

cacheItem.init()
return cacheItem
}

// init prepares the CacheItem for writing.
// Wires all the writers end-to-end:
// tar.Writer -> zstd.Writer -> fileBuffer -> file
Expand Down
9 changes: 7 additions & 2 deletions cli/internal/cacheitem/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,14 @@ func (ci *CacheItem) Restore(anchor turbopath.AbsoluteSystemPath) ([]turbopath.A
var tr *tar.Reader
var closeError error

reader, isReader := ci.handle.(io.Reader)
if !isReader {
panic("can't read from this cache item")
}

// We're reading a tar, possibly wrapped in zstd.
if ci.compressed {
zr := zstd.NewReader(ci.handle)
zr := zstd.NewReader(reader)

// The `Close` function for compression effectively just returns the singular
// error field on the decompressor instance. This is extremely unlikely to be
Expand All @@ -52,7 +57,7 @@ func (ci *CacheItem) Restore(anchor turbopath.AbsoluteSystemPath) ([]turbopath.A
defer func() { closeError = zr.Close() }()
tr = tar.NewReader(zr)
} else {
tr = tar.NewReader(ci.handle)
tr = tar.NewReader(reader)
}

// On first attempt to restore it's possible that a link target doesn't exist.
Expand Down
42 changes: 42 additions & 0 deletions scripts/server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env node

const { spawn } = require("child_process");
const { platform } = require("process");

const path = process.argv[2];

async function main() {
let errored = false;

await new Promise((resolve) => {
const command = platform === "win32" ? "pnpm.cmd" : "pnpm";
const server = spawn(command, ["run", "start"], { cwd: path });

server.stdout.on("data", (data) => {
console.log("stdout:");
console.log(`${data}`);

// Stable for 5s.
setTimeout(() => {
server.kill();
}, 5000);
});

server.stderr.on("data", (data) => {
console.log("stderr:");
console.log(`${data}`);

errored = true;
server.kill();
});

server.on("exit", () => {
console.log(`exit: ${+errored}`);
resolve();
});
});

process.exit(errored);
}

main();