Skip to content

Commit

Permalink
Merge pull request #47580 from vvoland/c8d-list-slow
Browse files Browse the repository at this point in the history
c8d/list: Generate image summary concurrently
  • Loading branch information
vvoland committed Mar 19, 2024
2 parents 23e1af4 + 731a640 commit 4531a37
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 56 deletions.
65 changes: 41 additions & 24 deletions daemon/containerd/image_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package containerd
import (
"context"
"encoding/json"
"runtime"
"sort"
"strings"
"sync"
"time"

"github.com/containerd/containerd/content"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/opencontainers/image-spec/identity"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)

// Subset of ocispec.Image that only contains Labels
Expand Down Expand Up @@ -91,16 +94,6 @@ func (i *ImageService) Images(ctx context.Context, opts imagetypes.ListOptions)
return usage.Size, nil
}

var (
summaries = make([]*imagetypes.Summary, 0, len(imgs))
root []*[]digest.Digest
layers map[digest.Digest]int
)
if opts.SharedSize {
root = make([]*[]digest.Digest, 0, len(imgs))
layers = make(map[digest.Digest]int)
}

uniqueImages := map[digest.Digest]images.Image{}
tagsByDigest := map[digest.Digest][]string{}
intermediateImages := map[digest.Digest]struct{}{}
Expand Down Expand Up @@ -152,24 +145,48 @@ func (i *ImageService) Images(ctx context.Context, opts imagetypes.ListOptions)
tagsByDigest[dgst] = append(tagsByDigest[dgst], reference.FamiliarString(ref))
}

resultsMut := sync.Mutex{}
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(runtime.NumCPU() * 2)

var (
summaries = make([]*imagetypes.Summary, 0, len(imgs))
root []*[]digest.Digest
layers map[digest.Digest]int
)
if opts.SharedSize {
root = make([]*[]digest.Digest, 0, len(imgs))
layers = make(map[digest.Digest]int)
}

for _, img := range uniqueImages {
image, allChainsIDs, err := i.imageSummary(ctx, img, platformMatcher, opts, tagsByDigest)
if err != nil {
return nil, err
}
// No error, but image should be skipped.
if image == nil {
continue
}
img := img
eg.Go(func() error {
image, allChainsIDs, err := i.imageSummary(egCtx, img, platformMatcher, opts, tagsByDigest)
if err != nil {
return err
}
// No error, but image should be skipped.
if image == nil {
return nil
}

summaries = append(summaries, image)
resultsMut.Lock()
summaries = append(summaries, image)

if opts.SharedSize {
root = append(root, &allChainsIDs)
for _, id := range allChainsIDs {
layers[id] = layers[id] + 1
if opts.SharedSize {
root = append(root, &allChainsIDs)
for _, id := range allChainsIDs {
layers[id] = layers[id] + 1
}
}
}
resultsMut.Unlock()
return nil
})
}

if err := eg.Wait(); err != nil {
return nil, err
}

if opts.SharedSize {
Expand Down
177 changes: 146 additions & 31 deletions daemon/containerd/image_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@ package containerd
import (
"context"
"fmt"
"math/rand"
"os"
"path/filepath"
"sort"
"strconv"
"testing"
"time"

"github.com/containerd/containerd"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/snapshots"
"github.com/containerd/log/logtest"
imagetypes "github.com/docker/docker/api/types/image"
Expand All @@ -37,6 +41,52 @@ func imagesFromIndex(index ...*ocispec.Index) []images.Image {
return imgs
}

func BenchmarkImageList(b *testing.B) {
populateStore := func(ctx context.Context, is *ImageService, dir string, count int) {
// Use constant seed for reproducibility
src := rand.NewSource(1982731263716)

for i := 0; i < count; i++ {
platform := platforms.DefaultSpec()

// 20% is other architecture than the host
if i%5 == 0 {
platform.Architecture = "other"
}

idx, err := specialimage.RandomSinglePlatform(dir, platform, src)
assert.NilError(b, err)

imgs := imagesFromIndex(idx)
for _, desc := range imgs {
_, err := is.images.Create(ctx, desc)
assert.NilError(b, err)
}
}
}

for _, count := range []int{10, 100, 1000} {
csDir := b.TempDir()

ctx := namespaces.WithNamespace(context.TODO(), "testing-"+strconv.Itoa(count))

cs := &delayedStore{
store: &blobsDirContentStore{blobs: filepath.Join(csDir, "blobs/sha256")},
overhead: 500 * time.Microsecond,
}

is := fakeImageService(b, ctx, cs)
populateStore(ctx, is, csDir, count)

b.Run(strconv.Itoa(count)+"-images", func(b *testing.B) {
for i := 0; i < b.N; i++ {
_, err := is.Images(ctx, imagetypes.ListOptions{All: true})
assert.NilError(b, err)
}
})
}
}

func TestImageList(t *testing.T) {
ctx := namespaces.WithNamespace(context.TODO(), "testing")

Expand All @@ -53,19 +103,17 @@ func TestImageList(t *testing.T) {

cs := &blobsDirContentStore{blobs: filepath.Join(blobsDir, "blobs/sha256")}

snapshotter := &testSnapshotterService{}

for _, tc := range []struct {
name string
images []images.Image
opts imagetypes.ListOptions

check func(*testing.T, []*imagetypes.Summary) // Change the type of the check function
check func(*testing.T, []*imagetypes.Summary)
}{
{
name: "one multi-layer image",
images: imagesFromIndex(multilayer),
check: func(t *testing.T, all []*imagetypes.Summary) { // Change the type of the check function
check: func(t *testing.T, all []*imagetypes.Summary) {
assert.Check(t, is.Len(all, 1))

assert.Check(t, is.Equal(all[0].ID, multilayer.Manifests[0].Digest.String()))
Expand All @@ -75,7 +123,7 @@ func TestImageList(t *testing.T) {
{
name: "one image with two platforms is still one entry",
images: imagesFromIndex(twoplatform),
check: func(t *testing.T, all []*imagetypes.Summary) { // Change the type of the check function
check: func(t *testing.T, all []*imagetypes.Summary) {
assert.Check(t, is.Len(all, 1))

assert.Check(t, is.Equal(all[0].ID, twoplatform.Manifests[0].Digest.String()))
Expand All @@ -85,7 +133,7 @@ func TestImageList(t *testing.T) {
{
name: "two images are two entries",
images: imagesFromIndex(multilayer, twoplatform),
check: func(t *testing.T, all []*imagetypes.Summary) { // Change the type of the check function
check: func(t *testing.T, all []*imagetypes.Summary) {
assert.Check(t, is.Len(all, 2))

assert.Check(t, is.Equal(all[0].ID, multilayer.Manifests[0].Digest.String()))
Expand All @@ -106,31 +154,7 @@ func TestImageList(t *testing.T) {
tc := tc
t.Run(tc.name, func(t *testing.T) {
ctx := logtest.WithT(ctx, t)
mdb := newTestDB(ctx, t)

snapshotters := map[string]snapshots.Snapshotter{
containerd.DefaultSnapshotter: snapshotter,
}

service := &ImageService{
images: metadata.NewImageStore(mdb),
containers: emptyTestContainerStore(),
content: cs,
eventsService: daemonevents.New(),
snapshotterServices: snapshotters,
snapshotter: containerd.DefaultSnapshotter,
}

// containerd.Image gets the services directly from containerd.Client
// so we need to create a "fake" containerd.Client with the test services.
c8dCli, err := containerd.New("", containerd.WithServices(
containerd.WithImageStore(service.images),
containerd.WithContentStore(cs),
containerd.WithSnapshotters(snapshotters),
))
assert.NilError(t, err)

service.client = c8dCli
service := fakeImageService(t, ctx, cs)

for _, img := range tc.images {
_, err := service.images.Create(ctx, img)
Expand All @@ -156,6 +180,37 @@ func TestImageList(t *testing.T) {

}

func fakeImageService(t testing.TB, ctx context.Context, cs content.Store) *ImageService {
snapshotter := &testSnapshotterService{}

mdb := newTestDB(ctx, t)

snapshotters := map[string]snapshots.Snapshotter{
containerd.DefaultSnapshotter: snapshotter,
}

service := &ImageService{
images: metadata.NewImageStore(mdb),
containers: emptyTestContainerStore(),
content: cs,
eventsService: daemonevents.New(),
snapshotterServices: snapshotters,
snapshotter: containerd.DefaultSnapshotter,
}

// containerd.Image gets the services directly from containerd.Client
// so we need to create a "fake" containerd.Client with the test services.
c8dCli, err := containerd.New("", containerd.WithServices(
containerd.WithImageStore(service.images),
containerd.WithContentStore(cs),
containerd.WithSnapshotters(snapshotters),
))
assert.NilError(t, err)

service.client = c8dCli
return service
}

type blobsDirContentStore struct {
blobs string
}
Expand Down Expand Up @@ -251,3 +306,63 @@ func (s *blobsDirContentStore) Info(ctx context.Context, dgst digest.Digest) (co
func (s *blobsDirContentStore) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
return content.Info{}, fmt.Errorf("read-only")
}

// delayedStore is a content store wrapper that adds a constant delay to all
// operations in order to imitate gRPC overhead.
//
// The delay is constant to make the benchmark results more reproducible
// Since content store may be accessed concurrently random delay would be
// order-dependent.
type delayedStore struct {
store content.Store
overhead time.Duration
}

func (s *delayedStore) delay() {
time.Sleep(s.overhead)
}

func (s *delayedStore) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
s.delay()
return s.store.ReaderAt(ctx, desc)
}

func (s *delayedStore) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
s.delay()
return s.store.Writer(ctx, opts...)
}

func (s *delayedStore) Status(ctx context.Context, st string) (content.Status, error) {
s.delay()
return s.store.Status(ctx, st)
}

func (s *delayedStore) Delete(ctx context.Context, dgst digest.Digest) error {
s.delay()
return s.store.Delete(ctx, dgst)
}

func (s *delayedStore) ListStatuses(ctx context.Context, filters ...string) ([]content.Status, error) {
s.delay()
return s.store.ListStatuses(ctx, filters...)
}

func (s *delayedStore) Abort(ctx context.Context, ref string) error {
s.delay()
return s.store.Abort(ctx, ref)
}

func (s *delayedStore) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error {
s.delay()
return s.store.Walk(ctx, fn, filters...)
}

func (s *delayedStore) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
s.delay()
return s.store.Info(ctx, dgst)
}

func (s *delayedStore) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
s.delay()
return s.store.Update(ctx, info, fieldpaths...)
}
2 changes: 1 addition & 1 deletion daemon/containerd/image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func digestFor(i int64) digest.Digest {
return dgstr.Digest()
}

func newTestDB(ctx context.Context, t *testing.T) *metadata.DB {
func newTestDB(ctx context.Context, t testing.TB) *metadata.DB {
t.Helper()

p := filepath.Join(t.TempDir(), "metadata")
Expand Down

0 comments on commit 4531a37

Please sign in to comment.