Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8932][Cleanup] Change default PubsubIO client from json to grpc. #10479

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.pubsub;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.naming.SizeLimitExceededException;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubTopic;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;

/** Writer to Pubsub which batches messages from bounded collections. */
class PubsubBoundedWriter<T> extends DoFn<T, Void> {
/**
* Max batch byte size. Messages are base64 encoded which encodes each set of three bytes into
* four bytes.
*/
private static final int MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT = ((10 * 1000 * 1000) / 4) * 3;

private static final int MAX_PUBLISH_BATCH_SIZE = 100;

private transient List<OutgoingMessage> output;
private transient PubsubClient pubsubClient;
private transient int currentOutputBytes;

private final int maxPublishBatchByteSize;
private final int maxPublishBatchSize;
private final PubsubIO.Write<T> write;

private PubsubBoundedWriter(PubsubIO.Write<T> write) {
Preconditions.checkNotNull(write.getTopicProvider());
this.maxPublishBatchSize =
MoreObjects.firstNonNull(write.getMaxBatchSize(), MAX_PUBLISH_BATCH_SIZE);
this.maxPublishBatchByteSize =
MoreObjects.firstNonNull(write.getMaxBatchBytesSize(), MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT);
this.write = write;
}

static <T> PubsubBoundedWriter<T> of(PubsubIO.Write<T> write) {
return new PubsubBoundedWriter<>(write);
}

@StartBundle
public void startBundle(StartBundleContext c) throws IOException {
this.output = new ArrayList<>();
this.currentOutputBytes = 0;

// NOTE: idAttribute is ignored.
this.pubsubClient =
write
.getPubsubClientFactory()
.newClient(
write.getTimestampAttribute(),
null,
c.getPipelineOptions().as(PubsubOptions.class));
}

@ProcessElement
public void processElement(ProcessContext c) throws IOException, SizeLimitExceededException {
byte[] payload;
PubsubMessage message = write.getFormatFn().apply(c.element());
payload = message.getPayload();
Map<String, String> attributes = message.getAttributeMap();

if (payload.length > maxPublishBatchByteSize) {
String msg =
String.format(
"Pub/Sub message size (%d) exceeded maximum batch size (%d)",
payload.length, maxPublishBatchByteSize);
throw new SizeLimitExceededException(msg);
}

// Checking before adding the message stops us from violating the max bytes
if (((currentOutputBytes + payload.length) >= maxPublishBatchByteSize)
|| (output.size() >= maxPublishBatchSize)) {
publish();
}

// NOTE: The record id is always null.
output.add(
OutgoingMessage.of(
com.google.pubsub.v1.PubsubMessage.newBuilder()
.setData(ByteString.copyFrom(payload))
.putAllAttributes(attributes)
.build(),
c.timestamp().getMillis(),
null));
currentOutputBytes += payload.length;
}

@FinishBundle
public void finishBundle() throws IOException {
if (!output.isEmpty()) {
publish();
}
output = null;
currentOutputBytes = 0;
pubsubClient.close();
pubsubClient = null;
}

private void publish() throws IOException {
PubsubTopic topic = write.getTopicProvider().get();
int n = pubsubClient.publish(PubsubClient.topicPathFromPath(topic.asPath()), output);
checkState(n == output.size());
output.clear();
currentOutputBytes = 0;
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.delegate(write);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,14 @@
*/
package org.apache.beam.sdk.io.gcp.pubsub;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;

import com.google.api.client.util.Clock;
import com.google.auto.value.AutoValue;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.naming.SizeLimitExceededException;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.ReflectData;
import org.apache.beam.sdk.PipelineRunner;
Expand All @@ -43,7 +35,6 @@
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.options.ValueProvider;
Expand All @@ -52,7 +43,6 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -93,7 +83,7 @@ public class PubsubIO {
private static final Logger LOG = LoggerFactory.getLogger(PubsubIO.class);

/** Factory for creating pubsub client to manage transport. */
private static final PubsubClient.PubsubClientFactory FACTORY = PubsubJsonClient.FACTORY;
private static final PubsubClient.PubsubClientFactory FACTORY = PubsubGrpcClient.FACTORY;

/**
* Project IDs must contain 6-63 lowercase letters, digits, or dashes. IDs must start with a
Expand Down Expand Up @@ -587,11 +577,11 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>
@Nullable
abstract ValueProvider<PubsubTopic> getTopicProvider();

abstract PubsubClient.PubsubClientFactory getPubsubClientFactory();

@Nullable
abstract ValueProvider<PubsubSubscription> getSubscriptionProvider();

abstract PubsubClient.PubsubClientFactory getPubsubClientFactory();

/** The name of the message attribute to read timestamps from. */
@Nullable
abstract String getTimestampAttribute();
Expand Down Expand Up @@ -858,14 +848,6 @@ private PubsubIO() {}
/** Implementation of write methods. */
@AutoValue
public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
/**
* Max batch byte size. Messages are base64 encoded which encodes each set of three bytes into
* four bytes.
*/
private static final int MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT = ((10 * 1000 * 1000) / 4) * 3;

private static final int MAX_PUBLISH_BATCH_SIZE = 100;

@Nullable
abstract ValueProvider<PubsubTopic> getTopicProvider();

Expand Down Expand Up @@ -998,15 +980,6 @@ public Write<T> withIdAttribute(String idAttribute) {
return toBuilder().setIdAttribute(idAttribute).build();
}

/**
* Used to write a PubSub message together with PubSub attributes. The user-supplied format
* function translates the input type T to a PubsubMessage object, which is used by the sink to
* separately set the PubSub message's payload and attributes.
*/
private Write<T> withFormatFn(SimpleFunction<T, PubsubMessage> formatFn) {
return toBuilder().setFormatFn(formatFn).build();
}

@Override
public PDone expand(PCollection<T> input) {
if (getTopicProvider() == null) {
Expand All @@ -1015,12 +988,8 @@ public PDone expand(PCollection<T> input) {

switch (input.isBounded()) {
case BOUNDED:
input.apply(
ParDo.of(
new PubsubBoundedWriter(
MoreObjects.firstNonNull(getMaxBatchSize(), MAX_PUBLISH_BATCH_SIZE),
MoreObjects.firstNonNull(
getMaxBatchBytesSize(), MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT))));
// NOTE: idAttribute is ignored.
input.apply(ParDo.of(PubsubBoundedWriter.of(this)));
return PDone.in(input.getPipeline());
case UNBOUNDED:
return input
Expand All @@ -1047,101 +1016,6 @@ public void populateDisplayData(DisplayData.Builder builder) {
populateCommonDisplayData(
builder, getTimestampAttribute(), getIdAttribute(), getTopicProvider());
}

/**
* Writer to Pubsub which batches messages from bounded collections.
*
* <p>Public so can be suppressed by runners.
*/
public class PubsubBoundedWriter extends DoFn<T, Void> {
private transient List<OutgoingMessage> output;
private transient PubsubClient pubsubClient;
private transient int currentOutputBytes;

private int maxPublishBatchByteSize;
private int maxPublishBatchSize;

PubsubBoundedWriter(int maxPublishBatchSize, int maxPublishBatchByteSize) {
this.maxPublishBatchSize = maxPublishBatchSize;
this.maxPublishBatchByteSize = maxPublishBatchByteSize;
}

PubsubBoundedWriter() {
this(MAX_PUBLISH_BATCH_SIZE, MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT);
}

@StartBundle
public void startBundle(StartBundleContext c) throws IOException {
this.output = new ArrayList<>();
this.currentOutputBytes = 0;

// NOTE: idAttribute is ignored.
this.pubsubClient =
getPubsubClientFactory()
.newClient(
getTimestampAttribute(), null, c.getPipelineOptions().as(PubsubOptions.class));
}

@ProcessElement
public void processElement(ProcessContext c) throws IOException, SizeLimitExceededException {
byte[] payload;
PubsubMessage message = getFormatFn().apply(c.element());
payload = message.getPayload();
Map<String, String> attributes = message.getAttributeMap();

if (payload.length > maxPublishBatchByteSize) {
String msg =
String.format(
"Pub/Sub message size (%d) exceeded maximum batch size (%d)",
payload.length, maxPublishBatchByteSize);
throw new SizeLimitExceededException(msg);
}

// Checking before adding the message stops us from violating the max bytes
if (((currentOutputBytes + payload.length) >= maxPublishBatchByteSize)
|| (output.size() >= maxPublishBatchSize)) {
publish();
}

// NOTE: The record id is always null.
output.add(
OutgoingMessage.of(
com.google.pubsub.v1.PubsubMessage.newBuilder()
.setData(ByteString.copyFrom(payload))
.putAllAttributes(attributes)
.build(),
c.timestamp().getMillis(),
null));
currentOutputBytes += payload.length;
}

@FinishBundle
public void finishBundle() throws IOException {
if (!output.isEmpty()) {
publish();
}
output = null;
currentOutputBytes = 0;
pubsubClient.close();
pubsubClient = null;
}

private void publish() throws IOException {
PubsubTopic topic = getTopicProvider().get();
int n =
pubsubClient.publish(
PubsubClient.topicPathFromName(topic.project, topic.topic), output);
checkState(n == output.size());
output.clear();
currentOutputBytes = 0;
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.delegate(Write.this);
}
}
}

private static <T> SerializableFunction<PubsubMessage, T> parsePayloadUsingCoder(Coder<T> coder) {
Expand Down