Skip to content

Prototype of PipelineOptions #1959

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: wuandy/JavaPplPP
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.Timestamp;
import com.google.cloud.firestore.pipeline.stages.AggregateOptions;
import com.google.cloud.firestore.pipeline.stages.PipelineOptions;
import com.google.cloud.firestore.pipeline.stages.GenericOptions;
import com.google.cloud.firestore.pipeline.expressions.Accumulator;
import com.google.cloud.firestore.pipeline.expressions.Expr;
import com.google.cloud.firestore.pipeline.expressions.ExprWithAlias;
Expand All @@ -44,7 +47,6 @@
import com.google.cloud.firestore.pipeline.stages.RemoveFields;
import com.google.cloud.firestore.pipeline.stages.Replace;
import com.google.cloud.firestore.pipeline.stages.Sample;
import com.google.cloud.firestore.pipeline.stages.SampleOptions;
import com.google.cloud.firestore.pipeline.stages.Select;
import com.google.cloud.firestore.pipeline.stages.Sort;
import com.google.cloud.firestore.pipeline.stages.Stage;
Expand All @@ -69,6 +71,7 @@
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -502,12 +505,10 @@ public Pipeline distinct(Selectable... selectables) {
* <pre>{@code
* // Find books with similar "topicVectors" to the given targetVector
* firestore.pipeline().collection("books")
* .findNearest("topicVectors", targetVector, FindNearest.DistanceMeasure.cosine(),
* FindNearestOptions
* .builder()
* .limit(10)
* .distanceField("distance")
* .build());
* .findNearest("topicVectors", targetVector, FindNearest.DistanceMeasure.COSINE,
* FindNearestOptions.DEFAULT
* .withLimit(10)
* .withDistanceField("distance"));
* }</pre>
*
* @param fieldName The name of the field containing the vector data. This field should store
Expand Down Expand Up @@ -540,12 +541,11 @@ public Pipeline findNearest(
* <pre>{@code
* // Find books with similar "topicVectors" to the given targetVector
* firestore.pipeline().collection("books")
* .findNearest(Field.of("topicVectors"), targetVector, FindNearest.DistanceMeasure.cosine(),
* FindNearestOptions
* .builder()
* .limit(10)
* .distanceField("distance")
* .build());
* .findNearest(
* FindNearest.of(Field.of("topicVectors"), targetVector, FindNearest.DistanceMeasure.COSINE),
* FindNearestOptions.DEFAULT
* .withLimit(10)
* .withDistanceField("distance"));
* }</pre>
*
* @param property The expression that evaluates to a vector value using the stage inputs.
Expand Down Expand Up @@ -590,7 +590,7 @@ public Pipeline findNearest(
*/
@BetaApi
public Pipeline sort(Ordering... orders) {
return append(new Sort(orders));
return append(new Sort(ImmutableList.copyOf(orders)));
}

/**
Expand Down Expand Up @@ -683,8 +683,7 @@ public Pipeline replace(Selectable field) {
*/
@BetaApi
public Pipeline sample(int limit) {
SampleOptions options = SampleOptions.docLimit(limit);
return sample(options);
return sample(Sample.withDocLimit(limit));
}

/**
Expand All @@ -698,19 +697,19 @@ public Pipeline sample(int limit) {
* <pre>{@code
* // Sample 10 books, if available.
* firestore.pipeline().collection("books")
* .sample(SampleOptions.docLimit(10));
* .sample(Sample.withDocLimit(10));
*
* // Sample 50% of books.
* firestore.pipeline().collection("books")
* .sample(SampleOptions.percentage(0.5));
* .sample(Sample.withPercentage(0.5));
* }</pre>
*
* @param options The {@code SampleOptions} specifies how sampling is performed.
* @param sample The {@code Sample} specifies how sampling is performed.
* @return A new {@code Pipeline} object with this stage appended to the stage list.
*/
@BetaApi
public Pipeline sample(SampleOptions options) {
return append(new Sample(options));
public Pipeline sample(Sample sample) {
return append(sample);
}

/**
Expand Down Expand Up @@ -756,21 +755,21 @@ public Pipeline union(Pipeline other) {
*
* // Emit a book document for each tag of the book.
* firestore.pipeline().collection("books")
* .unnest("tags");
* .unnest("tags", "tag");
*
* // Output:
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": "comedy", ... }
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": "space", ... }
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": "adventure", ... }
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tag": "comedy", ... }
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tag": "space", ... }
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tag": "adventure", ... }
* }</pre>
*
* @param fieldName The name of the field containing the array.
* @return A new {@code Pipeline} object with this stage appended to the stage list.
*/
@BetaApi
public Pipeline unnest(String fieldName) {
public Pipeline unnest(String fieldName, String alias) {
// return unnest(Field.of(fieldName));
return append(new Unnest(Field.of(fieldName)));
return append(new Unnest(Field.of(fieldName), alias));
}

// /**
Expand Down Expand Up @@ -829,22 +828,22 @@ public Pipeline unnest(String fieldName) {
*
* // Emit a book document for each tag of the book.
* firestore.pipeline().collection("books")
* .unnest("tags", UnnestOptions.indexField("tagIndex"));
* .unnest("tags", "tag", Unnest.Options.DEFAULT.withIndexField("tagIndex"));
*
* // Output:
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 0, "tags": "comedy", ... }
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 1, "tags": "space", ... }
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 2, "tags": "adventure", ... }
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 0, "tag": "comedy", ... }
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 1, "tag": "space", ... }
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 2, "tag": "adventure", ... }
* }</pre>
*
* @param fieldName The name of the field containing the array.
* @param options The {@code UnnestOptions} options.
* @return A new {@code Pipeline} object with this stage appended to the stage list.
*/
@BetaApi
public Pipeline unnest(String fieldName, UnnestOptions options) {
public Pipeline unnest(String fieldName, String alias, UnnestOptions options) {
// return unnest(Field.of(fieldName), options);
return append(new Unnest(Field.of(fieldName), options));
return append(new Unnest(Field.of(fieldName), alias, options));
}

// /**
Expand Down Expand Up @@ -905,12 +904,13 @@ public Pipeline unnest(String fieldName, UnnestOptions options) {
*
* @param name The unique name of the generic stage to add.
* @param params A map of parameters to configure the generic stage's behavior.
* @param optionalParams Named optional parameters to configure the generic stage's behavior.
* @return A new {@code Pipeline} object with this stage appended to the stage list.
*/
@BetaApi
public Pipeline genericStage(String name, List<Object> params) {
public Pipeline genericStage(String name, List<Object> params, GenericOptions optionalParams) {
// Implementation for genericStage (add the GenericStage if needed)
return append(new GenericStage(name, params)); // Assuming GenericStage takes a list of params
return append(new GenericStage(name, params, optionalParams)); // Assuming GenericStage takes a list of params
}

/**
Expand Down Expand Up @@ -946,7 +946,12 @@ public Pipeline genericStage(String name, List<Object> params) {
*/
@BetaApi
public ApiFuture<List<PipelineResult>> execute() {
return execute((ByteString) null, (com.google.protobuf.Timestamp) null);
return execute(PipelineOptions.DEFAULT, (ByteString) null, (com.google.protobuf.Timestamp) null);
}

@BetaApi
public ApiFuture<List<PipelineResult>> execute(PipelineOptions options) {
return execute(options, (ByteString) null, (com.google.protobuf.Timestamp) null);
}

/**
Expand Down Expand Up @@ -996,7 +1001,7 @@ public ApiFuture<List<PipelineResult>> execute() {
*/
@BetaApi
public void execute(ApiStreamObserver<PipelineResult> observer) {
executeInternal(null, null, observer);
executeInternal(PipelineOptions.DEFAULT, null, null, observer);
}

// @BetaApi
Expand All @@ -1016,10 +1021,13 @@ public void execute(ApiStreamObserver<PipelineResult> observer) {
// }

ApiFuture<List<PipelineResult>> execute(
@Nullable final ByteString transactionId, @Nullable com.google.protobuf.Timestamp readTime) {
@Nonnull PipelineOptions options,
@Nullable final ByteString transactionId,
@Nullable com.google.protobuf.Timestamp readTime) {
SettableApiFuture<List<PipelineResult>> futureResult = SettableApiFuture.create();

executeInternal(
options,
transactionId,
readTime,
new PipelineResultObserver() {
Expand All @@ -1045,13 +1053,17 @@ public void onError(Throwable t) {
}

void executeInternal(
@Nonnull PipelineOptions options,
@Nullable final ByteString transactionId,
@Nullable com.google.protobuf.Timestamp readTime,
ApiStreamObserver<PipelineResult> observer) {
ExecutePipelineRequest.Builder request =
ExecutePipelineRequest.newBuilder()
.setDatabase(rpcContext.getDatabaseName())
.setStructuredPipeline(StructuredPipeline.newBuilder().setPipeline(toProto()).build());
.setStructuredPipeline(StructuredPipeline.newBuilder()
.setPipeline(toProto())
.putAllOptions(StageUtils.toMap(options))
.build());

if (transactionId != null) {
request.setTransaction(transactionId);
Expand Down Expand Up @@ -1164,18 +1176,18 @@ public void onComplete() {

rpcContext.streamRequest(request, observer, rpcContext.getClient().executePipelineCallable());
}
}

@InternalExtensionOnly
abstract class PipelineResultObserver implements ApiStreamObserver<PipelineResult> {
private Timestamp executionTime; // Remove optional since Java doesn't have it
@InternalExtensionOnly
static abstract class PipelineResultObserver implements ApiStreamObserver<PipelineResult> {
private Timestamp executionTime; // Remove optional since Java doesn't have it

public void onCompleted(Timestamp executionTime) {
this.executionTime = executionTime;
this.onCompleted();
}
public void onCompleted(Timestamp executionTime) {
this.executionTime = executionTime;
this.onCompleted();
}

public Timestamp getExecutionTime() { // Add getter for executionTime
return executionTime;
public Timestamp getExecutionTime() { // Add getter for executionTime
return executionTime;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import com.google.api.core.InternalApi;
import com.google.cloud.firestore.pipeline.stages.Collection;
import com.google.cloud.firestore.pipeline.stages.CollectionGroup;
import com.google.cloud.firestore.pipeline.stages.CollectionGroupOptions;
import com.google.cloud.firestore.pipeline.stages.CollectionHints;
import com.google.cloud.firestore.pipeline.stages.CollectionOptions;
import com.google.cloud.firestore.pipeline.stages.Database;
import com.google.cloud.firestore.pipeline.stages.Documents;
import com.google.common.base.Preconditions;
Expand All @@ -45,7 +48,7 @@
* }</pre>
*/
@BetaApi
public class PipelineSource {
public final class PipelineSource {
private final FirestoreRpcContext<?> rpcContext;

@InternalApi
Expand All @@ -62,7 +65,13 @@ public class PipelineSource {
@Nonnull
@BetaApi
public Pipeline collection(@Nonnull String path) {
return new Pipeline(this.rpcContext, new Collection(path));
return collection(path, CollectionOptions.DEFAULT);
}

@Nonnull
@BetaApi
public Pipeline collection(@Nonnull String path, CollectionOptions options) {
return new Pipeline(this.rpcContext, new Collection(path, options));
}

/**
Expand All @@ -78,11 +87,17 @@ public Pipeline collection(@Nonnull String path) {
@Nonnull
@BetaApi
public Pipeline collectionGroup(@Nonnull String collectionId) {
return collectionGroup(collectionId, CollectionGroupOptions.DEFAULT);
}

@Nonnull
@BetaApi
public Pipeline collectionGroup(@Nonnull String collectionId, CollectionGroupOptions options) {
Preconditions.checkArgument(
!collectionId.contains("/"),
"Invalid collectionId '%s'. Collection IDs must not contain '/'.",
collectionId);
return new Pipeline(this.rpcContext, new CollectionGroup(collectionId));
return new Pipeline(this.rpcContext, new CollectionGroup(collectionId, options));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static com.google.cloud.firestore.pipeline.expressions.Function.inAny;
import static com.google.cloud.firestore.pipeline.expressions.Function.not;
import static com.google.cloud.firestore.pipeline.expressions.Function.or;
import static com.google.cloud.firestore.pipeline.expressions.FunctionUtils.exprToValue;

import com.google.api.core.InternalApi;
import com.google.cloud.firestore.Query.ComparisonFilterInternal;
Expand All @@ -38,6 +39,7 @@
import com.google.cloud.firestore.pipeline.expressions.Selectable;
import com.google.common.collect.Lists;
import com.google.firestore.v1.Cursor;
import com.google.firestore.v1.MapValue;
import com.google.firestore.v1.Value;
import java.util.HashMap;
import java.util.List;
Expand All @@ -51,6 +53,36 @@ public static Value encodeValue(Object value) {
return UserDataConverter.encodeValue(FieldPath.empty(), value, UserDataConverter.ARGUMENT);
}

@InternalApi
public static Value encodeValue(Expr value) {
return exprToValue(value);
}

@InternalApi
public static Value encodeValue(String value) {
return Value.newBuilder().setStringValue(value).build();
}

@InternalApi
public static Value encodeValue(boolean value) {
return Value.newBuilder().setBooleanValue(value).build();
}

@InternalApi
public static Value encodeValue(long value) {
return Value.newBuilder().setIntegerValue(value).build();
}

@InternalApi
public static Value encodeValue(double value) {
return Value.newBuilder().setDoubleValue(value).build();
}

@InternalApi
public static Value encodeValue(Map<String, Value> options) {
return Value.newBuilder().setMapValue(MapValue.newBuilder().putAllFields(options).build()).build();
}

@InternalApi
static FilterCondition toPipelineFilterCondition(FilterInternal f) {
if (f instanceof ComparisonFilterInternal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.firestore.pipeline.stages.PipelineOptions;
import com.google.cloud.firestore.telemetry.TraceUtil;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -129,7 +130,7 @@ public ApiFuture<AggregateQuerySnapshot> get(@Nonnull AggregateQuery query) {
@Override
public ApiFuture<List<PipelineResult>> execute(@Nonnull Pipeline pipeline) {
try (TraceUtil.Scope ignored = transactionTraceContext.makeCurrent()) {
return pipeline.execute(null, readTime);
return pipeline.execute(PipelineOptions.DEFAULT, null, readTime);
}
}

Expand Down
Loading
Loading