Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mutate: allow for custom compression #348

Merged
merged 2 commits into from
Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/umoci/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func insert(ctx *cli.Context) error {

// TODO: We should add a flag to allow for a new layer to be made
// non-distributable.
if _, err := mutator.Add(context.Background(), reader, history); err != nil {
if _, err := mutator.Add(context.Background(), ispec.MediaTypeImageLayer, reader, history, mutate.GzipCompressor); err != nil {
return errors.Wrap(err, "add diff layer")
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/umoci/raw-add-layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func rawAddLayer(ctx *cli.Context) error {

// TODO: We should add a flag to allow for a new layer to be made
// non-distributable.
if _, err := mutator.Add(context.Background(), newLayer, history); err != nil {
if _, err := mutator.Add(context.Background(), ispec.MediaTypeImageLayer, newLayer, history, mutate.GzipCompressor); err != nil {
return errors.Wrap(err, "add diff layer")
}

Expand Down
69 changes: 69 additions & 0 deletions mutate/compress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package mutate
cyphar marked this conversation as resolved.
Show resolved Hide resolved

import (
"io"
"io/ioutil"
"runtime"

gzip "github.com/klauspost/pgzip"
"github.com/pkg/errors"
)

// Compressor is an interface which users can use to implement different
// compression types.
type Compressor interface {
// Compress sets up the streaming compressor for this compression type.
Compress(io.Reader) (io.ReadCloser, error)

// MediaTypeSuffix returns the suffix to be added to the layer to
// indicate what compression type is used, e.g. "gzip", or "" for no
// compression.
MediaTypeSuffix() string
}

type noopCompressor struct{}

func (nc noopCompressor) Compress(r io.Reader) (io.ReadCloser, error) {
return ioutil.NopCloser(r), nil
}

func (nc noopCompressor) MediaTypeSuffix() string {
return ""
}

// NoopCompressor provides no compression.
var NoopCompressor Compressor = noopCompressor{}

// GzipCompressor provides gzip compression.
var GzipCompressor Compressor = gzipCompressor{}

type gzipCompressor struct{}

func (gz gzipCompressor) Compress(reader io.Reader) (io.ReadCloser, error) {
pipeReader, pipeWriter := io.Pipe()

gzw := gzip.NewWriter(pipeWriter)
if err := gzw.SetConcurrency(256<<10, 2*runtime.NumCPU()); err != nil {
return nil, errors.Wrapf(err, "set concurrency level to %v blocks", 2*runtime.NumCPU())
}
go func() {
if _, err := io.Copy(gzw, reader); err != nil {
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "compressing layer"))
}
if err := gzw.Close(); err != nil {
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "close gzip writer"))
}
if err := pipeWriter.Close(); err != nil {
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "close pipe writer"))
}
}()

return pipeReader, nil
}

func (gz gzipCompressor) MediaTypeSuffix() string {
return "gzip"
}
47 changes: 47 additions & 0 deletions mutate/compress_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package mutate

import (
"bytes"
"io/ioutil"
"testing"

gzip "github.com/klauspost/pgzip"
"github.com/stretchr/testify/assert"
)

const (
fact = "meshuggah rocks!!!"
)

func TestNoopCompressor(t *testing.T) {
assert := assert.New(t)
buf := bytes.NewBufferString(fact)

r, err := NoopCompressor.Compress(buf)
assert.NoError(err)
assert.Equal(NoopCompressor.MediaTypeSuffix(), "")

content, err := ioutil.ReadAll(r)
assert.NoError(err)

assert.Equal(string(content), fact)
}

func TestGzipCompressor(t *testing.T) {
assert := assert.New(t)

buf := bytes.NewBufferString(fact)
c := GzipCompressor

r, err := c.Compress(buf)
assert.NoError(err)
assert.Equal(c.MediaTypeSuffix(), "gzip")

r, err = gzip.NewReader(r)
assert.NoError(err)

content, err := ioutil.ReadAll(r)
assert.NoError(err)

assert.Equal(string(content), fact)
}
63 changes: 12 additions & 51 deletions mutate/mutate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ package mutate
import (
"io"
"reflect"
"runtime"
"time"

"github.com/apex/log"
gzip "github.com/klauspost/pgzip"
"github.com/opencontainers/go-digest"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/opencontainers/umoci/oci/cas"
Expand Down Expand Up @@ -232,38 +230,21 @@ func (m *Mutator) Set(ctx context.Context, config ispec.ImageConfig, meta Meta,
// add adds the given layer to the CAS, and mutates the configuration to
// include the diffID. The returned string is the digest of the *compressed*
// layer (which is compressed by us).
func (m *Mutator) add(ctx context.Context, reader io.Reader, history *ispec.History) (digest.Digest, int64, error) {
func (m *Mutator) add(ctx context.Context, reader io.Reader, history *ispec.History, compressor Compressor) (digest.Digest, int64, error) {
if err := m.cache(ctx); err != nil {
return "", -1, errors.Wrap(err, "getting cache failed")
}

diffidDigester := cas.BlobAlgorithm.Digester()
hashReader := io.TeeReader(reader, diffidDigester.Hash())

pipeReader, pipeWriter := io.Pipe()
defer pipeReader.Close()

gzw := gzip.NewWriter(pipeWriter)
defer gzw.Close()
if err := gzw.SetConcurrency(256<<10, 2*runtime.NumCPU()); err != nil {
return "", -1, errors.Wrapf(err, "set concurrency level to %v blocks", 2*runtime.NumCPU())
compressed, err := compressor.Compress(hashReader)
if err != nil {
return "", -1, errors.Wrapf(err, "couldn't create compression for blob")
}
go func() {
if _, err := io.Copy(gzw, hashReader); err != nil {
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "compressing layer"))
}
if err := gzw.Close(); err != nil {
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "close gzip writer"))
}
if err := pipeWriter.Close(); err != nil {
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "close pipe writer"))
}
}()
defer compressed.Close()

layerDigest, layerSize, err := m.engine.PutBlob(ctx, pipeReader)
layerDigest, layerSize, err := m.engine.PutBlob(ctx, compressed)
if err != nil {
return "", -1, errors.Wrap(err, "put layer blob")
}
Expand Down Expand Up @@ -291,45 +272,25 @@ func (m *Mutator) add(ctx context.Context, reader io.Reader, history *ispec.Hist
// generate the DiffIDs for the image metatadata. The provided history entry is
// appended to the image's history and should correspond to what operations
// were made to the configuration.
func (m *Mutator) Add(ctx context.Context, r io.Reader, history *ispec.History) (ispec.Descriptor, error) {
func (m *Mutator) Add(ctx context.Context, mediaType string, r io.Reader, history *ispec.History, compressor Compressor) (ispec.Descriptor, error) {
desc := ispec.Descriptor{}
if err := m.cache(ctx); err != nil {
return desc, errors.Wrap(err, "getting cache failed")
}

digest, size, err := m.add(ctx, r, history)
digest, size, err := m.add(ctx, r, history, compressor)
if err != nil {
return desc, errors.Wrap(err, "add layer")
}

// Append to layers.
desc = ispec.Descriptor{
// TODO: Detect whether the layer is gzip'd or not...
MediaType: ispec.MediaTypeImageLayerGzip,
Digest: digest,
Size: size,
}
m.manifest.Layers = append(m.manifest.Layers, desc)
return desc, nil
}

// AddNonDistributable is the same as Add, except it adds a non-distributable
// layer to the image.
func (m *Mutator) AddNonDistributable(ctx context.Context, r io.Reader, history *ispec.History) (ispec.Descriptor, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, I didn't consider that we could drop AddNonDistributable entirely. Nice. 😸

desc := ispec.Descriptor{}
if err := m.cache(ctx); err != nil {
return desc, errors.Wrap(err, "getting cache failed")
}

digest, size, err := m.add(ctx, r, history)
if err != nil {
return desc, errors.Wrap(err, "add non-distributable layer")
compressedMediaType := mediaType
if compressor.MediaTypeSuffix() != "" {
compressedMediaType = compressedMediaType + "+" + compressor.MediaTypeSuffix()
}

// Append to layers.
desc = ispec.Descriptor{
// TODO: Detect whether the layer is gzip'd or not...
MediaType: ispec.MediaTypeImageLayerNonDistributableGzip,
MediaType: compressedMediaType,
Digest: digest,
Size: size,
}
Expand Down
88 changes: 2 additions & 86 deletions mutate/mutate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,9 @@ func TestMutateAdd(t *testing.T) {
buffer := bytes.NewBufferString("contents")

// Add a new layer.
newLayerDesc, err := mutator.Add(context.Background(), buffer, &ispec.History{
newLayerDesc, err := mutator.Add(context.Background(), ispec.MediaTypeImageLayer, buffer, &ispec.History{
Comment: "new layer",
})
}, GzipCompressor)
if err != nil {
t.Fatalf("unexpected error adding layer: %+v", err)
}
Expand Down Expand Up @@ -290,90 +290,6 @@ func TestMutateAdd(t *testing.T) {
}
}

func TestMutateAddNonDistributable(t *testing.T) {
dir, err := ioutil.TempDir("", "umoci-TestMutateAddNonDistributable")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)

engine, fromDescriptor := setup(t, dir)
defer engine.Close()

mutator, err := New(engine, casext.DescriptorPath{Walk: []ispec.Descriptor{fromDescriptor}})
if err != nil {
t.Fatal(err)
}

// This isn't a valid image, but whatever.
buffer := bytes.NewBufferString("contents")

// Add a new layer.
newLayerDesc, err := mutator.AddNonDistributable(context.Background(), buffer, &ispec.History{
Comment: "new layer",
})
if err != nil {
t.Fatalf("unexpected error adding layer: %+v", err)
}

newDescriptor, err := mutator.Commit(context.Background())
if err != nil {
t.Fatalf("unexpected error committing changes: %+v", err)
}

if newDescriptor.Descriptor().Digest == fromDescriptor.Digest {
t.Fatalf("new and old descriptors are the same!")
}

mutator, err = New(engine, newDescriptor)
if err != nil {
t.Fatal(err)
}

// Cache the data to check it.
if err := mutator.cache(context.Background()); err != nil {
t.Fatalf("unexpected error getting cache: %+v", err)
}

// Check digests are different.
if mutator.manifest.Config.Digest == expectedConfigDigest {
t.Errorf("manifest.Config.Digest is the same!")
}
if mutator.manifest.Layers[0].Digest != expectedLayerDigest {
t.Errorf("manifest.Layers[0].Digest is not the same!")
}
if mutator.manifest.Layers[1].Digest == expectedLayerDigest {
t.Errorf("manifest.Layers[1].Digest is not the same!")
}
if mutator.manifest.Layers[1].Digest != newLayerDesc.Digest {
t.Fatalf("unexpected digest for new layer: %v %v", mutator.manifest.Layers[1].Digest, newLayerDesc.Digest)
}

// Check layer was added.
if len(mutator.manifest.Layers) != 2 {
t.Errorf("manifest.Layers was not updated")
}
if mutator.manifest.Layers[1].MediaType != ispec.MediaTypeImageLayerNonDistributableGzip {
t.Errorf("manifest.Layers[1].MediaType is the wrong value: %s", mutator.manifest.Layers[1].MediaType)
}

// Check config was also modified.
if len(mutator.config.RootFS.DiffIDs) != 2 {
t.Errorf("config.RootFS.DiffIDs was not updated")
}

// Check history.
if len(mutator.config.History) != 2 {
t.Errorf("config.History was not updated")
}
if mutator.config.History[1].EmptyLayer != false {
t.Errorf("config.History[1].EmptyLayer was not set")
}
if mutator.config.History[1].Comment != "new layer" {
t.Errorf("config.History[1].Comment was not set")
}
}

func TestMutateSet(t *testing.T) {
dir, err := ioutil.TempDir("", "umoci-TestMutateSet")
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion repack.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func Repack(engineExt casext.Engine, tagName string, bundlePath string, meta Met

// TODO: We should add a flag to allow for a new layer to be made
// non-distributable.
if _, err := mutator.Add(context.Background(), reader, history); err != nil {
if _, err := mutator.Add(context.Background(), ispec.MediaTypeImageLayer, reader, history, mutate.GzipCompressor); err != nil {
return errors.Wrap(err, "add diff layer")
}
}
Expand Down