Skip to content

Commit

Permalink
chore(storage): implement NewRangeReader interface, refactor gRPC read (
Browse files Browse the repository at this point in the history
  • Loading branch information
noahdietz authored May 31, 2022
1 parent 9266276 commit bd82561
Show file tree
Hide file tree
Showing 6 changed files with 576 additions and 243 deletions.
12 changes: 11 additions & 1 deletion storage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type storageClient interface {
ComposeObject(ctx context.Context, req *composeObjectRequest, opts ...storageOption) (*ObjectAttrs, error)
RewriteObject(ctx context.Context, req *rewriteObjectRequest, opts ...storageOption) (*rewriteObjectResponse, error)

OpenReader(ctx context.Context, r *Reader, opts ...storageOption) error
NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (*Reader, error)
OpenWriter(ctx context.Context, w *Writer, opts ...storageOption) error

// IAM methods.
Expand Down Expand Up @@ -211,6 +211,16 @@ type userProjectOption struct {

func (o *userProjectOption) Apply(s *settings) { s.userProject = o.project }

type newRangeReaderParams struct {
bucket string
conds *Conditions
encryptionKey []byte
gen int64
length int64
object string
offset int64
}

type composeObjectRequest struct {
dstBucket string
dstObject string
Expand Down
45 changes: 45 additions & 0 deletions storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,51 @@ func TestDeleteDefaultObjectACLEmulated(t *testing.T) {
})
}

func TestOpenReaderEmulated(t *testing.T) {
transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) {
// Populate test data.
_, err := client.CreateBucket(context.Background(), project, &BucketAttrs{
Name: bucket,
})
if err != nil {
t.Fatalf("client.CreateBucket: %v", err)
}
prefix := time.Now().Nanosecond()
want := &ObjectAttrs{
Bucket: bucket,
Name: fmt.Sprintf("%d-object-%d", prefix, time.Now().Nanosecond()),
}
w := veneerClient.Bucket(bucket).Object(want.Name).NewWriter(context.Background())
if _, err := w.Write(randomBytesToWrite); err != nil {
t.Fatalf("failed to populate test data: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("closing object: %v", err)
}

params := &newRangeReaderParams{
bucket: bucket,
object: want.Name,
gen: defaultGen,
offset: 0,
length: -1,
}
r, err := client.NewRangeReader(context.Background(), params)
if err != nil {
t.Fatalf("opening reading: %v", err)
}
wantLen := len(randomBytesToWrite)
got := make([]byte, wantLen)
n, err := r.Read(got)
if n != wantLen {
t.Fatalf("expected to read %d bytes, but got %d", wantLen, n)
}
if diff := cmp.Diff(got, randomBytesToWrite); diff != "" {
t.Fatalf("Read: got(-),want(+):\n%s", diff)
}
})
}

func initEmulatorClients() func() error {
noopCloser := func() error { return nil }
if !isEmulatorEnvironmentSet() {
Expand Down
248 changes: 246 additions & 2 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ package storage

import (
"context"
"fmt"
"io"
"os"

"cloud.google.com/go/internal/trace"
gapic "cloud.google.com/go/storage/internal/apiv2"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
Expand Down Expand Up @@ -565,9 +568,134 @@ func (c *grpcStorageClient) RewriteObject(ctx context.Context, req *rewriteObjec
return nil, errMethodNotSupported
}

func (c *grpcStorageClient) OpenReader(ctx context.Context, r *Reader, opts ...storageOption) error {
return errMethodNotSupported
func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (r *Reader, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.NewRangeReader")
defer func() { trace.EndSpan(ctx, err) }()

if params.conds != nil {
if err := params.conds.validate("grpcStorageClient.NewRangeReader"); err != nil {
return nil, err
}
}

s := callSettings(c.settings, opts...)

// A negative length means "read to the end of the object", but the
// read_limit field it corresponds to uses zero to mean the same thing. Thus
// we coerce the length to 0 to read to the end of the object.
if params.length < 0 {
params.length = 0
}

b := bucketResourceName(globalProjectAlias, params.bucket)
// TODO(noahdietz): Use encryptionKey to set relevant request fields.
req := &storagepb.ReadObjectRequest{
Bucket: b,
Object: params.object,
}
// The default is a negative value, which means latest.
if params.gen >= 0 {
req.Generation = params.gen
}

// Define a function that initiates a Read with offset and length, assuming
// we have already read seen bytes.
reopen := func(seen int64) (*readStreamResponse, context.CancelFunc, error) {
// If the context has already expired, return immediately without making
// we call.
if err := ctx.Err(); err != nil {
return nil, nil, err
}

cc, cancel := context.WithCancel(ctx)

start := params.offset + seen
// Only set a ReadLimit if length is greater than zero, because zero
// means read it all.
if params.length > 0 {
req.ReadLimit = params.length - seen
}
req.ReadOffset = start

if err := applyCondsProto("gRPCReader.reopen", params.gen, params.conds, req); err != nil {
cancel()
return nil, nil, err
}

var stream storagepb.Storage_ReadObjectClient
var msg *storagepb.ReadObjectResponse
var err error

err = run(cc, func() error {
stream, err = c.raw.ReadObject(cc, req, s.gax...)
if err != nil {
return err
}

msg, err = stream.Recv()

return err
}, s.retry, s.idempotent, setRetryHeaderGRPC(ctx))
if err != nil {
// Close the stream context we just created to ensure we don't leak
// resources.
cancel()
return nil, nil, err
}

return &readStreamResponse{stream, msg}, cancel, nil
}

res, cancel, err := reopen(0)
if err != nil {
return nil, err
}

// The first message was Recv'd on stream open, use it to populate the
// object metadata.
msg := res.response
obj := msg.GetMetadata()
// This is the size of the entire object, even if only a range was requested.
size := obj.GetSize()

r = &Reader{
Attrs: ReaderObjectAttrs{
Size: size,
ContentType: obj.GetContentType(),
ContentEncoding: obj.GetContentEncoding(),
CacheControl: obj.GetCacheControl(),
LastModified: obj.GetUpdateTime().AsTime(),
Metageneration: obj.GetMetageneration(),
Generation: obj.GetGeneration(),
},
reader: &gRPCReader{
stream: res.stream,
reopen: reopen,
cancel: cancel,
size: size,
// Store the content from the first Recv in the
// client buffer for reading later.
leftovers: msg.GetChecksummedData().GetContent(),
},
}

cr := msg.GetContentRange()
if cr != nil {
r.Attrs.StartOffset = cr.GetStart()
r.remain = cr.GetEnd() - cr.GetStart() + 1
} else {
r.remain = size
}

// Only support checksums when reading an entire object, not a range.
if checksums := msg.GetObjectChecksums(); checksums != nil && checksums.Crc32C != nil && params.offset == 0 && params.length == 0 {
r.wantCRC = checksums.GetCrc32C()
r.checkCRC = true
}

return r, nil
}

func (c *grpcStorageClient) OpenWriter(ctx context.Context, w *Writer, opts ...storageOption) error {
return errMethodNotSupported
}
Expand Down Expand Up @@ -653,3 +781,119 @@ func (c *grpcStorageClient) DeleteHMACKey(ctx context.Context, desc *hmacKeyDesc
func setUserProjectMetadata(ctx context.Context, project string) context.Context {
return metadata.AppendToOutgoingContext(ctx, "x-goog-user-project", project)
}

type readStreamResponse struct {
stream storagepb.Storage_ReadObjectClient
response *storagepb.ReadObjectResponse
}

type gRPCReader struct {
seen, size int64
stream storagepb.Storage_ReadObjectClient
reopen func(seen int64) (*readStreamResponse, context.CancelFunc, error)
leftovers []byte
cancel context.CancelFunc
}

// Read reads bytes into the user's buffer from an open gRPC stream.
func (r *gRPCReader) Read(p []byte) (int, error) {
// No stream to read from, either never initiliazed or Close was called.
// Note: There is a potential concurrency issue if multiple routines are
// using the same reader. One encounters an error and the stream is closed
// and then reopened while the other routine attempts to read from it.
if r.stream == nil {
return 0, fmt.Errorf("reader has been closed")
}

// The entire object has been read by this reader, return EOF.
if r.size != 0 && r.size == r.seen {
return 0, io.EOF
}

var n int
// Read leftovers and return what was available to conform to the Reader
// interface: https://pkg.go.dev/io#Reader.
if len(r.leftovers) > 0 {
n = copy(p, r.leftovers)
r.seen += int64(n)
r.leftovers = r.leftovers[n:]
return n, nil
}

// Attempt to Recv the next message on the stream.
msg, err := r.recv()
if err != nil {
return 0, err
}

// TODO: Determine if we need to capture incremental CRC32C for this
// chunk. The Object CRC32C checksum is captured when directed to read
// the entire Object. If directed to read a range, we may need to
// calculate the range's checksum for verification if the checksum is
// present in the response here.
// TODO: Figure out if we need to support decompressive transcoding
// https://cloud.google.com/storage/docs/transcoding.
content := msg.GetChecksummedData().GetContent()
n = copy(p[n:], content)
leftover := len(content) - n
if leftover > 0 {
// Wasn't able to copy all of the data in the message, store for
// future Read calls.
r.leftovers = content[n:]
}
r.seen += int64(n)

return n, nil
}

// Close cancels the read stream's context in order for it to be closed and
// collected.
func (r *gRPCReader) Close() error {
if r.cancel != nil {
r.cancel()
}
r.stream = nil
return nil
}

// recv attempts to Recv the next message on the stream. In the event
// that a retryable error is encountered, the stream will be closed, reopened,
// and Recv again. This will attempt to Recv until one of the following is true:
//
// * Recv is successful
// * A non-retryable error is encountered
// * The Reader's context is canceled
//
// The last error received is the one that is returned, which could be from
// an attempt to reopen the stream.
//
// This is an experimental API and not intended for public use.
func (r *gRPCReader) recv() (*storagepb.ReadObjectResponse, error) {
msg, err := r.stream.Recv()
if err != nil && shouldRetry(err) {
// This will "close" the existing stream and immediately attempt to
// reopen the stream, but will backoff if further attempts are necessary.
// Reopening the stream Recvs the first message, so if retrying is
// successful, the next logical chunk will be returned.
msg, err = r.reopenStream()
}

return msg, err
}

// reopenStream "closes" the existing stream and attempts to reopen a stream and
// sets the Reader's stream and cancelStream properties in the process.
//
// This is an experimental API and not intended for public use.
func (r *gRPCReader) reopenStream() (*storagepb.ReadObjectResponse, error) {
// Close existing stream and initialize new stream with updated offset.
r.Close()

res, cancel, err := r.reopen(r.seen)
if err != nil {
return nil, err
}
r.stream = res.stream
r.cancel = cancel
return res.response, nil
}
Loading

0 comments on commit bd82561

Please sign in to comment.