Skip to content

Commit

Permalink
mutate: allow for custom compression
Browse files Browse the repository at this point in the history
at least via the API for now :)

Signed-off-by: Tycho Andersen <tycho@tycho.pizza>
  • Loading branch information
tych0 committed Sep 28, 2020
1 parent f495332 commit a705649
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 34 deletions.
2 changes: 1 addition & 1 deletion cmd/umoci/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func insert(ctx *cli.Context) error {

// TODO: We should add a flag to allow for a new layer to be made
// non-distributable.
if _, err := mutator.Add(context.Background(), reader, history); err != nil {
if _, err := mutator.Add(context.Background(), reader, history, mutate.GzipCompressor); err != nil {
return errors.Wrap(err, "add diff layer")
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/umoci/raw-add-layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func rawAddLayer(ctx *cli.Context) error {

// TODO: We should add a flag to allow for a new layer to be made
// non-distributable.
if _, err := mutator.Add(context.Background(), newLayer, history); err != nil {
if _, err := mutator.Add(context.Background(), newLayer, history, mutate.GzipCompressor); err != nil {
return errors.Wrap(err, "add diff layer")
}

Expand Down
76 changes: 76 additions & 0 deletions mutate/compress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package mutate

import (
"io"
"runtime"

gzip "github.com/klauspost/pgzip"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)

type Compressor interface {
Compress(io.Reader) (io.ReadCloser, error)
MediaType() string
}

type withClose struct {
r io.Reader
}

func (wc withClose) Read(p []byte) (n int, err error) {
return wc.r.Read(p)
}

func (wc withClose) Close() error {
return nil
}

type noopCompressor struct {
mediaType string
}

func NewNoopCompressor(mediaType string) Compressor {
return noopCompressor{mediaType}
}

func (nc noopCompressor) Compress(r io.Reader) (io.ReadCloser, error) {
return withClose{r}, nil
}

func (nc noopCompressor) MediaType() string {
return nc.mediaType
}

var GzipCompressor Compressor = gzipCompressor{}

type gzipCompressor struct{}

func (gz gzipCompressor) Compress(reader io.Reader) (io.ReadCloser, error) {
pipeReader, pipeWriter := io.Pipe()

gzw := gzip.NewWriter(pipeWriter)
if err := gzw.SetConcurrency(256<<10, 2*runtime.NumCPU()); err != nil {
return nil, errors.Wrapf(err, "set concurrency level to %v blocks", 2*runtime.NumCPU())
}
go func() {
if _, err := io.Copy(gzw, reader); err != nil {
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "compressing layer"))
}
if err := gzw.Close(); err != nil {
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "close gzip writer"))
}
if err := pipeWriter.Close(); err != nil {
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "close pipe writer"))
}
}()

return pipeReader, nil
}

func (gz gzipCompressor) MediaType() string {
return ispec.MediaTypeImageLayerGzip
}
49 changes: 49 additions & 0 deletions mutate/compress_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package mutate

import (
"bytes"
"io"
"io/ioutil"
"runtime"

gzip "github.com/klauspost/pgzip"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)

const (
fact = "meshuggah rocks!!!"
)

func TestNoopCompressor(t *testing.T) {
assert := assert.New(t)
buf := bytes.NewBufferString(fact)
c := NewNoopCompressor(fact)

r, err := c.Compress(buf)
assert.NoError(err)
assert.Equal(fact, c.MediaType())

content, err := ioutil.ReadAll(r)
assert.NoError(err)

assert.Equal(string(content), fact)
}

func TestGzipCompressor(t *testing.T) {
assert := assert.New(t)

buf := bytes.NewBufferString(fact)
c := NewGzipCompressor(buf)

r, err := c.Compress(buf)
assert.NoError(err)

r, err := gzip.NewReader(r)
assert.NoError(err)

content, err := ioutil.ReadAll(r)
assert.NoError(err)

assert.Equal(string(content), fact)
}
42 changes: 11 additions & 31 deletions mutate/mutate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ package mutate
import (
"io"
"reflect"
"runtime"
"time"

"github.com/apex/log"
gzip "github.com/klauspost/pgzip"
"github.com/opencontainers/go-digest"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/opencontainers/umoci/oci/cas"
Expand Down Expand Up @@ -232,38 +230,21 @@ func (m *Mutator) Set(ctx context.Context, config ispec.ImageConfig, meta Meta,
// add adds the given layer to the CAS, and mutates the configuration to
// include the diffID. The returned string is the digest of the *compressed*
// layer (which is compressed by us).
func (m *Mutator) add(ctx context.Context, reader io.Reader, history *ispec.History) (digest.Digest, int64, error) {
func (m *Mutator) add(ctx context.Context, reader io.Reader, history *ispec.History, compressor Compressor) (digest.Digest, int64, error) {
if err := m.cache(ctx); err != nil {
return "", -1, errors.Wrap(err, "getting cache failed")
}

diffidDigester := cas.BlobAlgorithm.Digester()
hashReader := io.TeeReader(reader, diffidDigester.Hash())

pipeReader, pipeWriter := io.Pipe()
defer pipeReader.Close()

gzw := gzip.NewWriter(pipeWriter)
defer gzw.Close()
if err := gzw.SetConcurrency(256<<10, 2*runtime.NumCPU()); err != nil {
return "", -1, errors.Wrapf(err, "set concurrency level to %v blocks", 2*runtime.NumCPU())
compressed, err := compressor.Compress(hashReader)
if err != nil {
return "", -1, errors.Wrapf(err, "couldn't create compression for blob")
}
go func() {
if _, err := io.Copy(gzw, hashReader); err != nil {
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "compressing layer"))
}
if err := gzw.Close(); err != nil {
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "close gzip writer"))
}
if err := pipeWriter.Close(); err != nil {
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "close pipe writer"))
}
}()
defer compressed.Close()

layerDigest, layerSize, err := m.engine.PutBlob(ctx, pipeReader)
layerDigest, layerSize, err := m.engine.PutBlob(ctx, compressed)
if err != nil {
return "", -1, errors.Wrap(err, "put layer blob")
}
Expand Down Expand Up @@ -291,21 +272,20 @@ func (m *Mutator) add(ctx context.Context, reader io.Reader, history *ispec.Hist
// generate the DiffIDs for the image metatadata. The provided history entry is
// appended to the image's history and should correspond to what operations
// were made to the configuration.
func (m *Mutator) Add(ctx context.Context, r io.Reader, history *ispec.History) (ispec.Descriptor, error) {
func (m *Mutator) Add(ctx context.Context, r io.Reader, history *ispec.History, compressor Compressor) (ispec.Descriptor, error) {
desc := ispec.Descriptor{}
if err := m.cache(ctx); err != nil {
return desc, errors.Wrap(err, "getting cache failed")
}

digest, size, err := m.add(ctx, r, history)
digest, size, err := m.add(ctx, r, history, compressor)
if err != nil {
return desc, errors.Wrap(err, "add layer")
}

// Append to layers.
desc = ispec.Descriptor{
// TODO: Detect whether the layer is gzip'd or not...
MediaType: ispec.MediaTypeImageLayerGzip,
MediaType: compressor.MediaType(),
Digest: digest,
Size: size,
}
Expand All @@ -315,13 +295,13 @@ func (m *Mutator) Add(ctx context.Context, r io.Reader, history *ispec.History)

// AddNonDistributable is the same as Add, except it adds a non-distributable
// layer to the image.
func (m *Mutator) AddNonDistributable(ctx context.Context, r io.Reader, history *ispec.History) (ispec.Descriptor, error) {
func (m *Mutator) AddNonDistributable(ctx context.Context, r io.Reader, history *ispec.History, compressor Compressor) (ispec.Descriptor, error) {
desc := ispec.Descriptor{}
if err := m.cache(ctx); err != nil {
return desc, errors.Wrap(err, "getting cache failed")
}

digest, size, err := m.add(ctx, r, history)
digest, size, err := m.add(ctx, r, history, compressor)
if err != nil {
return desc, errors.Wrap(err, "add non-distributable layer")
}
Expand Down
2 changes: 1 addition & 1 deletion repack.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func Repack(engineExt casext.Engine, tagName string, bundlePath string, meta Met

// TODO: We should add a flag to allow for a new layer to be made
// non-distributable.
if _, err := mutator.Add(context.Background(), reader, history); err != nil {
if _, err := mutator.Add(context.Background(), reader, history, mutate.GzipCompressor); err != nil {
return errors.Wrap(err, "add diff layer")
}
}
Expand Down

0 comments on commit a705649

Please sign in to comment.