Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

c8d/list: Generate image summary concurrently #47580

Merged
merged 2 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
65 changes: 41 additions & 24 deletions daemon/containerd/image_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package containerd
import (
"context"
"encoding/json"
"runtime"
"sort"
"strings"
"sync"
"time"

"github.com/containerd/containerd/content"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/opencontainers/image-spec/identity"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)

// Subset of ocispec.Image that only contains Labels
Expand Down Expand Up @@ -91,16 +94,6 @@ func (i *ImageService) Images(ctx context.Context, opts imagetypes.ListOptions)
return usage.Size, nil
}

var (
summaries = make([]*imagetypes.Summary, 0, len(imgs))
root []*[]digest.Digest
layers map[digest.Digest]int
)
if opts.SharedSize {
root = make([]*[]digest.Digest, 0, len(imgs))
layers = make(map[digest.Digest]int)
}

uniqueImages := map[digest.Digest]images.Image{}
tagsByDigest := map[digest.Digest][]string{}
intermediateImages := map[digest.Digest]struct{}{}
Expand Down Expand Up @@ -152,24 +145,48 @@ func (i *ImageService) Images(ctx context.Context, opts imagetypes.ListOptions)
tagsByDigest[dgst] = append(tagsByDigest[dgst], reference.FamiliarString(ref))
}

resultsMut := sync.Mutex{}
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(runtime.NumCPU() * 2)

var (
summaries = make([]*imagetypes.Summary, 0, len(imgs))
root []*[]digest.Digest
layers map[digest.Digest]int
)
if opts.SharedSize {
root = make([]*[]digest.Digest, 0, len(imgs))
layers = make(map[digest.Digest]int)
}

for _, img := range uniqueImages {
image, allChainsIDs, err := i.imageSummary(ctx, img, platformMatcher, opts, tagsByDigest)
if err != nil {
return nil, err
}
// No error, but image should be skipped.
if image == nil {
continue
}
img := img
eg.Go(func() error {
image, allChainsIDs, err := i.imageSummary(egCtx, img, platformMatcher, opts, tagsByDigest)
if err != nil {
return err
}
// No error, but image should be skipped.
if image == nil {
return nil
}

summaries = append(summaries, image)
resultsMut.Lock()
summaries = append(summaries, image)

if opts.SharedSize {
root = append(root, &allChainsIDs)
for _, id := range allChainsIDs {
layers[id] = layers[id] + 1
if opts.SharedSize {
root = append(root, &allChainsIDs)
for _, id := range allChainsIDs {
layers[id] = layers[id] + 1
}
}
}
resultsMut.Unlock()
return nil
})
}

if err := eg.Wait(); err != nil {
return nil, err
}

if opts.SharedSize {
Expand Down
177 changes: 146 additions & 31 deletions daemon/containerd/image_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@ package containerd
import (
"context"
"fmt"
"math/rand"
"os"
"path/filepath"
"sort"
"strconv"
"testing"
"time"

"github.com/containerd/containerd"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/snapshots"
"github.com/containerd/log/logtest"
imagetypes "github.com/docker/docker/api/types/image"
Expand All @@ -37,6 +41,52 @@ func imagesFromIndex(index ...*ocispec.Index) []images.Image {
return imgs
}

func BenchmarkImageList(b *testing.B) {
populateStore := func(ctx context.Context, is *ImageService, dir string, count int) {
// Use constant seed for reproducibility
src := rand.NewSource(1982731263716)

for i := 0; i < count; i++ {
platform := platforms.DefaultSpec()

// 20% is other architecture than the host
if i%5 == 0 {
platform.Architecture = "other"
}

idx, err := specialimage.RandomSinglePlatform(dir, platform, src)
assert.NilError(b, err)

imgs := imagesFromIndex(idx)
for _, desc := range imgs {
_, err := is.images.Create(ctx, desc)
assert.NilError(b, err)
}
}
}

for _, count := range []int{10, 100, 1000} {
csDir := b.TempDir()

ctx := namespaces.WithNamespace(context.TODO(), "testing-"+strconv.Itoa(count))

cs := &delayedStore{
store: &blobsDirContentStore{blobs: filepath.Join(csDir, "blobs/sha256")},
overhead: 500 * time.Microsecond,
}

is := fakeImageService(b, ctx, cs)
populateStore(ctx, is, csDir, count)

b.Run(strconv.Itoa(count)+"-images", func(b *testing.B) {
for i := 0; i < b.N; i++ {
_, err := is.Images(ctx, imagetypes.ListOptions{All: true})
assert.NilError(b, err)
}
})
}
}

func TestImageList(t *testing.T) {
ctx := namespaces.WithNamespace(context.TODO(), "testing")

Expand All @@ -53,19 +103,17 @@ func TestImageList(t *testing.T) {

cs := &blobsDirContentStore{blobs: filepath.Join(blobsDir, "blobs/sha256")}

snapshotter := &testSnapshotterService{}

for _, tc := range []struct {
name string
images []images.Image
opts imagetypes.ListOptions

check func(*testing.T, []*imagetypes.Summary) // Change the type of the check function
check func(*testing.T, []*imagetypes.Summary)
}{
{
name: "one multi-layer image",
images: imagesFromIndex(multilayer),
check: func(t *testing.T, all []*imagetypes.Summary) { // Change the type of the check function
check: func(t *testing.T, all []*imagetypes.Summary) {
assert.Check(t, is.Len(all, 1))

assert.Check(t, is.Equal(all[0].ID, multilayer.Manifests[0].Digest.String()))
Expand All @@ -75,7 +123,7 @@ func TestImageList(t *testing.T) {
{
name: "one image with two platforms is still one entry",
images: imagesFromIndex(twoplatform),
check: func(t *testing.T, all []*imagetypes.Summary) { // Change the type of the check function
check: func(t *testing.T, all []*imagetypes.Summary) {
assert.Check(t, is.Len(all, 1))

assert.Check(t, is.Equal(all[0].ID, twoplatform.Manifests[0].Digest.String()))
Expand All @@ -85,7 +133,7 @@ func TestImageList(t *testing.T) {
{
name: "two images are two entries",
images: imagesFromIndex(multilayer, twoplatform),
check: func(t *testing.T, all []*imagetypes.Summary) { // Change the type of the check function
check: func(t *testing.T, all []*imagetypes.Summary) {
assert.Check(t, is.Len(all, 2))

assert.Check(t, is.Equal(all[0].ID, multilayer.Manifests[0].Digest.String()))
Expand All @@ -106,31 +154,7 @@ func TestImageList(t *testing.T) {
tc := tc
t.Run(tc.name, func(t *testing.T) {
ctx := logtest.WithT(ctx, t)
mdb := newTestDB(ctx, t)

snapshotters := map[string]snapshots.Snapshotter{
containerd.DefaultSnapshotter: snapshotter,
}

service := &ImageService{
images: metadata.NewImageStore(mdb),
containers: emptyTestContainerStore(),
content: cs,
eventsService: daemonevents.New(),
snapshotterServices: snapshotters,
snapshotter: containerd.DefaultSnapshotter,
}

// containerd.Image gets the services directly from containerd.Client
// so we need to create a "fake" containerd.Client with the test services.
c8dCli, err := containerd.New("", containerd.WithServices(
containerd.WithImageStore(service.images),
containerd.WithContentStore(cs),
containerd.WithSnapshotters(snapshotters),
))
assert.NilError(t, err)

service.client = c8dCli
service := fakeImageService(t, ctx, cs)

for _, img := range tc.images {
_, err := service.images.Create(ctx, img)
Expand All @@ -156,6 +180,37 @@ func TestImageList(t *testing.T) {

}

func fakeImageService(t testing.TB, ctx context.Context, cs content.Store) *ImageService {
snapshotter := &testSnapshotterService{}

mdb := newTestDB(ctx, t)

snapshotters := map[string]snapshots.Snapshotter{
containerd.DefaultSnapshotter: snapshotter,
}

service := &ImageService{
images: metadata.NewImageStore(mdb),
containers: emptyTestContainerStore(),
content: cs,
eventsService: daemonevents.New(),
snapshotterServices: snapshotters,
snapshotter: containerd.DefaultSnapshotter,
}

// containerd.Image gets the services directly from containerd.Client
// so we need to create a "fake" containerd.Client with the test services.
c8dCli, err := containerd.New("", containerd.WithServices(
containerd.WithImageStore(service.images),
containerd.WithContentStore(cs),
containerd.WithSnapshotters(snapshotters),
))
assert.NilError(t, err)

service.client = c8dCli
return service
}

type blobsDirContentStore struct {
blobs string
}
Expand Down Expand Up @@ -251,3 +306,63 @@ func (s *blobsDirContentStore) Info(ctx context.Context, dgst digest.Digest) (co
func (s *blobsDirContentStore) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
return content.Info{}, fmt.Errorf("read-only")
}

// delayedStore is a content store wrapper that adds a constant delay to all
// operations in order to imitate gRPC overhead.
//
// The delay is constant to make the benchmark results more reproducible
// Since content store may be accessed concurrently random delay would be
// order-dependent.
type delayedStore struct {
store content.Store
overhead time.Duration
}

func (s *delayedStore) delay() {
time.Sleep(s.overhead)
}

func (s *delayedStore) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
s.delay()
return s.store.ReaderAt(ctx, desc)
}

func (s *delayedStore) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
s.delay()
return s.store.Writer(ctx, opts...)
}

func (s *delayedStore) Status(ctx context.Context, st string) (content.Status, error) {
s.delay()
return s.store.Status(ctx, st)
}

func (s *delayedStore) Delete(ctx context.Context, dgst digest.Digest) error {
s.delay()
return s.store.Delete(ctx, dgst)
}

func (s *delayedStore) ListStatuses(ctx context.Context, filters ...string) ([]content.Status, error) {
s.delay()
return s.store.ListStatuses(ctx, filters...)
}

func (s *delayedStore) Abort(ctx context.Context, ref string) error {
s.delay()
return s.store.Abort(ctx, ref)
}

func (s *delayedStore) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error {
s.delay()
return s.store.Walk(ctx, fn, filters...)
}

func (s *delayedStore) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
s.delay()
return s.store.Info(ctx, dgst)
}

func (s *delayedStore) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
s.delay()
return s.store.Update(ctx, info, fieldpaths...)
}
2 changes: 1 addition & 1 deletion daemon/containerd/image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func digestFor(i int64) digest.Digest {
return dgstr.Digest()
}

func newTestDB(ctx context.Context, t *testing.T) *metadata.DB {
func newTestDB(ctx context.Context, t testing.TB) *metadata.DB {
t.Helper()

p := filepath.Join(t.TempDir(), "metadata")
Expand Down