From c22d728c5e127b7821712ea1b9c2f8ac0b5b77d6 Mon Sep 17 00:00:00 2001 From: Fathima Mohammed Date: Thu, 21 Apr 2022 21:20:54 +0000 Subject: [PATCH 01/13] [BEAM-14329] Enable exponential backoff retries in FhirIO Execute bundle requests. --- .../io/google-cloud-platform/build.gradle | 1 + .../beam/sdk/io/gcp/healthcare/FhirIO.java | 2 +- .../healthcare/HttpHealthcareApiClient.java | 56 +++++-------------- .../sdk/io/gcp/healthcare/FhirIOTestUtil.java | 2 +- 4 files changed, 16 insertions(+), 45 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index e89a8bf350ebb..4f2b6fc47ba58 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -139,6 +139,7 @@ dependencies { implementation library.java.arrow_memory_core implementation library.java.arrow_vector + implementation 'com.google.http-client:google-http-client-gson:1.41.2' implementation "org.threeten:threetenbp:1.4.4" implementation "io.opencensus:opencensus-api:0.30.0" diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index f08ce8da166a2..d5facd34cef3d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -1460,7 +1460,7 @@ public void executeBundles(ProcessContext context) { private void parseResponse(ProcessContext context, String inputBody, HttpBody resp) throws JsonProcessingException { - JsonObject bundle = JsonParser.parseString(resp.getData()).getAsJsonObject(); + JsonObject bundle = JsonParser.parseString(resp.toString()).getAsJsonObject(); String bundleType = bundle.getAsJsonPrimitive(BUNDLE_TYPE_FIELD).getAsString(); JsonArray entries = bundle.getAsJsonArray(BUNDLE_ENTRY_FIELD).getAsJsonArray(); if (entries == null) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java index 1b09ff49d10bc..ce1be228e4649 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java @@ -23,6 +23,7 @@ import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.http.javanet.NetHttpTransport; import com.google.api.client.json.JsonFactory; +import com.google.api.client.json.gson.GsonFactory; import com.google.api.client.json.jackson2.JacksonFactory; import com.google.api.services.healthcare.v1.CloudHealthcare; import com.google.api.services.healthcare.v1.CloudHealthcare.Projects.Locations.Datasets.FhirStores.Fhir.PatientEverything; @@ -75,18 +76,14 @@ import org.apache.beam.sdk.util.ReleaseInfo; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; -import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.client.methods.RequestBuilder; import org.apache.http.client.utils.URIBuilder; import org.apache.http.entity.ByteArrayEntity; -import org.apache.http.entity.ContentType; -import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.DefaultHttpRequestRetryHandler; import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; import org.slf4j.Logger; @@ -105,9 +102,7 @@ public class HttpHealthcareApiClient implements HealthcareApiClient, Serializabl String.format( "apache-beam-io-google-cloud-platform-healthcare/%s", ReleaseInfo.getReleaseInfo().getSdkVersion()); - private static final String FHIRSTORE_HEADER_CONTENT_TYPE = "application/fhir+json"; - private static final String FHIRSTORE_HEADER_ACCEPT = "application/fhir+json; charset=utf-8"; - private static final String FHIRSTORE_HEADER_ACCEPT_CHARSET = "utf-8"; + private static final JsonFactory PARSER = new GsonFactory(); private static final Logger LOG = LoggerFactory.getLogger(HttpHealthcareApiClient.class); private transient CloudHealthcare client; private transient HttpClient httpClient; @@ -553,45 +548,20 @@ public Operation pollOperation(Operation operation, Long sleepMs) } @Override - public HttpBody executeFhirBundle(String fhirStore, String bundle) - throws IOException, HealthcareHttpException { - if (httpClient == null || client == null) { + public HttpBody executeFhirBundle(String fhirStore, String bundle) throws IOException { + if (client == null) { initClient(); } + HttpBody httpBody = PARSER.fromString(bundle, HttpBody.class); - credentials.refreshIfExpired(); - StringEntity requestEntity = new StringEntity(bundle, ContentType.APPLICATION_JSON); - URI uri; - try { - uri = new URIBuilder(client.getRootUrl() + "v1/" + fhirStore + "/fhir").build(); - } catch (URISyntaxException e) { - LOG.error("URL error when making executeBundle request to FHIR API. " + e.getMessage()); - throw new IllegalArgumentException(e); - } - - HttpUriRequest request = - RequestBuilder.post() - .setUri(uri) - .setEntity(requestEntity) - .addHeader("Authorization", "Bearer " + credentials.getAccessToken().getTokenValue()) - .addHeader("User-Agent", USER_AGENT) - .addHeader("Content-Type", FHIRSTORE_HEADER_CONTENT_TYPE) - .addHeader("Accept-Charset", FHIRSTORE_HEADER_ACCEPT_CHARSET) - .addHeader("Accept", FHIRSTORE_HEADER_ACCEPT) - .build(); - - HttpResponse response = httpClient.execute(request); - HttpEntity responseEntity = response.getEntity(); - String content = EntityUtils.toString(responseEntity); - - // Check 2XX code. - int statusCode = response.getStatusLine().getStatusCode(); - if (!(statusCode / 100 == 2)) { - throw HealthcareHttpException.of(statusCode, content); - } - HttpBody responseModel = new HttpBody(); - responseModel.setData(content); - return responseModel; + return client + .projects() + .locations() + .datasets() + .fhirStores() + .fhir() + .executeBundle(fhirStore, httpBody) + .execute(); } /** diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java index 754e5f6a29260..6bc0fbbbd7c97 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java @@ -95,7 +95,7 @@ static List executeFhirBundles( for (String bundle : bundles) { HttpBody resp = client.executeFhirBundle(fhirStore, bundle); - JsonObject jsonResponse = JsonParser.parseString(resp.getData()).getAsJsonObject(); + JsonObject jsonResponse = JsonParser.parseString(resp.toString()).getAsJsonObject(); for (JsonElement entry : jsonResponse.getAsJsonArray("entry")) { String location = entry From c219f84dd2ea2e6f13f675e898955432337b3fbf Mon Sep 17 00:00:00 2001 From: Fathima Mohammed Date: Tue, 24 May 2022 14:48:58 +0000 Subject: [PATCH 02/13] [BEAM-14504] Add fhir bundle with metadata in executebundles method. --- .../healthcare/FhirBundleWithMetadata.java | 67 +++++++ .../FhirBundleWithMetadataCoder.java | 49 +++++ .../beam/sdk/io/gcp/healthcare/FhirIO.java | 180 ++++++++++++++---- .../GetStringHealthcareIOErrorFn.java | 40 ++++ .../sdk/io/gcp/healthcare/FhirIOTest.java | 2 +- .../sdk/io/gcp/healthcare/FhirIOWriteIT.java | 6 +- 6 files changed, 300 insertions(+), 44 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleWithMetadata.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleWithMetadataCoder.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/GetStringHealthcareIOErrorFn.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleWithMetadata.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleWithMetadata.java new file mode 100644 index 0000000000000..8a61a094eb123 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleWithMetadata.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.healthcare; + +import com.google.auto.value.AutoValue; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.DefaultCoder; + +/** + * FhirBundleWithMetadata represents a FHIR bundle, with it's metadata (eg. Hl7 messageId) in JSON + * format to be executed on the intermediate FHIR store. * + */ +@DefaultCoder(FhirBundleWithMetadataCoder.class) +@AutoValue +public abstract class FhirBundleWithMetadata { + + static Builder builder() { + return new AutoValue_FhirBundleWithMetadata.Builder(); + } + + public abstract String getMetadata(); + + public abstract String getBundle(); + + public abstract String getResponse(); + + public static FhirBundleWithMetadata of( + @Nullable String metadata, String bundle, @Nullable String response) { + + return FhirBundleWithMetadata.builder() + .setMetadata(Objects.toString(metadata, "")) + .setBundle(bundle) + .setResponse(Objects.toString(response, "")) + .build(); + } + + public static FhirBundleWithMetadata of(String bundle) { + return FhirBundleWithMetadata.of(null, bundle, null); + } + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setMetadata(String metadata); + + abstract Builder setBundle(String bundle); + + abstract Builder setResponse(String response); + + abstract FhirBundleWithMetadata build(); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleWithMetadataCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleWithMetadataCoder.java new file mode 100644 index 0000000000000..82af834962bba --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleWithMetadataCoder.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.healthcare; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; + +/** Coder for {@link FhirBundleWithMetadata}. */ +public class FhirBundleWithMetadataCoder extends CustomCoder { + private static final NullableCoder STRING_CODER = NullableCoder.of(StringUtf8Coder.of()); + + public static FhirBundleWithMetadataCoder of() { + return new FhirBundleWithMetadataCoder(); + } + + @Override + public void encode(FhirBundleWithMetadata value, OutputStream outStream) throws IOException { + STRING_CODER.encode(value.getMetadata(), outStream); + STRING_CODER.encode(value.getBundle(), outStream); + STRING_CODER.encode(value.getResponse(), outStream); + } + + @Override + public FhirBundleWithMetadata decode(InputStream inStream) throws IOException { + String metadata = STRING_CODER.decode(inStream); + String bundle = STRING_CODER.decode(inStream); + String response = STRING_CODER.decode(inStream); + return FhirBundleWithMetadata.of(metadata, bundle, response); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index ee2bfba5ed4cd..6bfce4e70d744 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.gcp.healthcare; +import static org.apache.beam.sdk.io.gcp.healthcare.FhirIO.ExecuteBundles.FAILED_BUNDLES; +import static org.apache.beam.sdk.io.gcp.healthcare.FhirIO.ExecuteBundles.SUCCESSFUL_BUNDLES; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; import com.fasterxml.jackson.core.JsonProcessingException; @@ -58,6 +60,7 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.Import.ContentStructure; +import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.Write.AbstractResult; import org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.FhirResourcePagesIterator; import org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.HealthcareHttpException; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; @@ -635,7 +638,7 @@ private String fetchResource(HealthcareApiClient client, String resourceName) /** The type Write. */ @AutoValue - public abstract static class Write extends PTransform, Write.Result> { + public abstract static class Write extends PTransform, Write.AbstractResult> { /** The tag for successful writes to FHIR store. */ public static final TupleTag SUCCESSFUL_BODY = new TupleTag() {}; @@ -663,8 +666,27 @@ public enum WriteMethod { IMPORT } + public abstract static class AbstractResult implements POutput { + private Pipeline pipeline; + + public abstract PCollection getSuccessfulBodies(); + + public abstract PCollection> getFailedBodies(); + + public abstract PCollection> getFailedFiles(); + + @Override + public Pipeline getPipeline() { + return this.pipeline; + } + + @Override + public void finishSpecifyingOutput( + String transformName, PInput input, PTransform transform) {} + } + /** The type Result. */ - public static class Result implements POutput { + public static class Result extends AbstractResult { private final Pipeline pipeline; private final PCollection successfulBodies; @@ -700,6 +722,7 @@ static Result in( * * @return the entries that were inserted */ + @Override public PCollection getSuccessfulBodies() { return this.successfulBodies; } @@ -709,6 +732,7 @@ public PCollection getSuccessfulBodies() { * * @return the failed inserts with err */ + @Override public PCollection> getFailedBodies() { return this.failedBodies; } @@ -718,6 +742,7 @@ public PCollection> getFailedBodies() { * * @return the failed GCS uri with err */ + @Override public PCollection> getFailedFiles() { return this.failedFiles; } @@ -933,7 +958,7 @@ public static Write executeBundles(ValueProvider fhirStore) { private static final Logger LOG = LoggerFactory.getLogger(Write.class); @Override - public Result expand(PCollection input) { + public AbstractResult expand(PCollection input) { switch (this.getWriteMethod()) { case IMPORT: LOG.warn( @@ -951,7 +976,12 @@ public Result expand(PCollection input) { return input.apply(new Import(getFhirStore(), tempPath, deadPath, contentStructure)); case EXECUTE_BUNDLE: default: - return input.apply(new ExecuteBundles(this.getFhirStore())); + return input + .apply( + MapElements.into(TypeDescriptor.of(FhirBundleWithMetadata.class)) + .via(FhirBundleWithMetadata::of)) + .setCoder(FhirBundleWithMetadataCoder.of()) + .apply(new ExecuteBundles(this.getFhirStore())); } } } @@ -1349,7 +1379,15 @@ public enum ContentStructure { } /** The type Execute bundles. */ - public static class ExecuteBundles extends Write { + public static class ExecuteBundles + extends PTransform, ExecuteBundlesResult> { + + /** The TupleTag used for bundles that were executed successfully. */ + public static final TupleTag SUCCESSFUL_BUNDLES = new TupleTag<>(); + + /** The TupleTag used for bundles that failed to be executed for any reason. */ + public static final TupleTag> FAILED_BUNDLES = + new TupleTag<>(); private final ValueProvider fhirStore; @@ -1358,48 +1396,35 @@ public static class ExecuteBundles extends Write { * * @param fhirStore the fhir store */ - ExecuteBundles(ValueProvider fhirStore) { + public ExecuteBundles(ValueProvider fhirStore) { this.fhirStore = fhirStore; } - @Override - ValueProvider getFhirStore() { - return fhirStore; - } - - @Override - WriteMethod getWriteMethod() { - return WriteMethod.EXECUTE_BUNDLE; - } - - @Override - Optional getContentStructure() { - return Optional.empty(); - } - - @Override - Optional> getImportGcsTempPath() { - return Optional.empty(); + public ExecuteBundles(String fhirStore) { + this.fhirStore = StaticValueProvider.of(fhirStore); } - @Override - Optional> getImportGcsDeadLetterPath() { - return Optional.empty(); + public ValueProvider getFhirStore() { + return fhirStore; } @Override - public FhirIO.Write.Result expand(PCollection input) { - PCollectionTuple bodies = + public ExecuteBundlesResult expand(PCollection input) { + PCollectionTuple bundles = input.apply( - ParDo.of(new ExecuteBundlesFn(fhirStore)) - .withOutputTags(Write.SUCCESSFUL_BODY, TupleTagList.of(Write.FAILED_BODY))); - bodies.get(Write.SUCCESSFUL_BODY).setCoder(StringUtf8Coder.of()); - bodies.get(Write.FAILED_BODY).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())); - return Write.Result.in(input.getPipeline(), bodies); + ParDo.of(new ExecuteBundlesFn(this.fhirStore)) + .withOutputTags(SUCCESSFUL_BUNDLES, TupleTagList.of(FAILED_BUNDLES))); + bundles.get(SUCCESSFUL_BUNDLES).setCoder(FhirBundleWithMetadataCoder.of()); + bundles + .get(FAILED_BUNDLES) + .setCoder(HealthcareIOErrorCoder.of(FhirBundleWithMetadataCoder.of())); + + return ExecuteBundlesResult.in( + input.getPipeline(), bundles.get(SUCCESSFUL_BUNDLES), bundles.get(FAILED_BUNDLES)); } /** The type Write Fhir fn. */ - static class ExecuteBundlesFn extends DoFn { + static class ExecuteBundlesFn extends DoFn { private static final Counter EXECUTE_BUNDLE_ERRORS = Metrics.counter( @@ -1450,18 +1475,18 @@ public void initClient() throws IOException { @ProcessElement public void executeBundles(ProcessContext context) { - String inputBody = context.element(); + String bundle = context.element().getBundle(); try { long startTime = Instant.now().toEpochMilli(); // Validate that data was set to valid JSON. - mapper.readTree(inputBody); - HttpBody resp = client.executeFhirBundle(fhirStore.get(), inputBody); + mapper.readTree(bundle); + HttpBody resp = client.executeFhirBundle(fhirStore.get(), bundle); EXECUTE_BUNDLE_LATENCY_MS.update(Instant.now().toEpochMilli() - startTime); - parseResponse(context, inputBody, resp); + parseResponse(context, bundle, resp); } catch (IOException | HealthcareHttpException e) { EXECUTE_BUNDLE_ERRORS.inc(); - context.output(Write.FAILED_BODY, HealthcareIOError.of(inputBody, e)); + context.output(FAILED_BUNDLES, HealthcareIOError.of(context.element(), e)); } } @@ -1524,6 +1549,81 @@ private int parseBundleStatus(String status) { } } + /** + * ExecuteBundlesResult contains both successfully executed bundles and information help debugging + * failed executions (eg metadata & error msgs). + */ + public static class ExecuteBundlesResult extends AbstractResult { + private final Pipeline pipeline; + private final PCollection successfulBundles; + private final PCollection> failedBundles; + + private ExecuteBundlesResult( + Pipeline pipeline, + PCollection successfulBundles, + PCollection> failedBundles) { + this.pipeline = pipeline; + this.successfulBundles = successfulBundles; + this.failedBundles = failedBundles; + } + + /** + * Entry point for the ExecuteBundlesResult, storing the successful and failed bundles and their + * metadata. + */ + public static ExecuteBundlesResult in( + Pipeline pipeline, + PCollection successfulBundles, + PCollection> failedBundles) { + return new ExecuteBundlesResult(pipeline, successfulBundles, failedBundles); + } + + /** Gets successful FhirBundleWithMetadata from execute bundles operation. */ + public PCollection getSuccessfulBundlesWithMetadata() { + return this.successfulBundles; + } + + /** + * Gets failed FhirBundleWithMetadata with metadata wrapped inside HealthcareIOError. The bundle + * field could be null. + */ + public PCollection> getFailedBundles() { + return this.failedBundles; + } + + @Override + public PCollection getSuccessfulBodies() { + return this.successfulBundles.apply( + MapElements.into(TypeDescriptors.strings()).via(FhirBundleWithMetadata::getBundle)); + } + + @Override + public PCollection> getFailedBodies() { + return this.failedBundles + .apply("GetBodiesFromBundles", ParDo.of(new GetStringHealthcareIOErrorFn())) + .setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())); + } + + @Override + public PCollection> getFailedFiles() { + return super.pipeline.apply(Create.empty(HealthcareIOErrorCoder.of(StringUtf8Coder.of()))); + } + + @Override + public Pipeline getPipeline() { + return this.pipeline; + } + + @Override + public Map, PValue> expand() { + return ImmutableMap.of(SUCCESSFUL_BUNDLES, successfulBundles, FAILED_BUNDLES, failedBundles); + } + + @Override + public void finishSpecifyingOutput( + String transformName, PInput input, PTransform transform) {} + } + /** Export FHIR resources from a FHIR store to new line delimited json files on GCS. */ public static class Export extends PTransform> { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/GetStringHealthcareIOErrorFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/GetStringHealthcareIOErrorFn.java new file mode 100644 index 0000000000000..002726fe679d1 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/GetStringHealthcareIOErrorFn.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.healthcare; + +import org.apache.beam.sdk.transforms.DoFn; + +/** + * ParDo function that transforms a HealthcareIOError for a FhirExecuteBundleParameter to an error + * with the body (string) as the data resource, for backwards compatibility. + */ +public class GetStringHealthcareIOErrorFn + extends DoFn, HealthcareIOError> { + + @ProcessElement + public void process(ProcessContext context) { + HealthcareIOError input = context.element(); + context.output( + new HealthcareIOError<>( + input.getDataResource().getBundle(), + input.getErrorMessage(), + input.getStackTrace(), + input.getObservedTime(), + input.getStatusCode())); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java index c85d1c435fa2d..907078a5fa6cf 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java @@ -110,7 +110,7 @@ public void test_FhirIO_failedWrites() { PCollection fhirBundles = pipeline.apply(Create.of(emptyMessages)); - FhirIO.Write.Result writeResult = + FhirIO.Write.AbstractResult writeResult = fhirBundles.apply( FhirIO.Write.executeBundles( "projects/foo/locations/us-central1/datasets/bar/hl7V2Stores/baz")); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOWriteIT.java index 831aae38fad96..d524166ec0543 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOWriteIT.java @@ -108,7 +108,7 @@ public static void teardownBucket() throws IOException { @Test public void testFhirIO_ExecuteBundle() throws IOException { - FhirIO.Write.Result writeResult = + FhirIO.Write.AbstractResult writeResult = pipeline .apply(Create.of(BUNDLES.get(version))) .apply(FhirIO.Write.executeBundles(options.getFhirStore())); @@ -121,7 +121,7 @@ public void testFhirIO_ExecuteBundle() throws IOException { @Test public void testFhirIO_ExecuteBundle_parseResponse() { List bundles = BUNDLES.get("BUNDLE_PARSE_TEST"); - FhirIO.Write.Result writeResult = + FhirIO.Write.AbstractResult writeResult = pipeline .apply(Create.of(bundles)) .apply(FhirIO.Write.executeBundles(options.getFhirStore())); @@ -158,7 +158,7 @@ public void testFhirIO_Import() { if (options.getTempLocation() == null) { options.setTempLocation("gs://temp-storage-for-healthcare-io-tests"); } - FhirIO.Write.Result result = + FhirIO.Write.AbstractResult result = pipeline .apply(Create.of(BUNDLES.get(version))) .apply( From 857f644f4136c6f2ccf1a95d210e201e0fc83906 Mon Sep 17 00:00:00 2001 From: Fathima Mohammed Date: Tue, 24 May 2022 14:58:53 +0000 Subject: [PATCH 03/13] make FhirResourcePagesIterator public --- .../beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java index 2bf497ed526e0..0229ef8b170d3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java @@ -897,7 +897,7 @@ public enum FhirMethod { private String pageToken; private boolean isFirstRequest; - private FhirResourcePagesIterator( + public FhirResourcePagesIterator( FhirMethod fhirMethod, HealthcareApiClient client, String fhirStore, From ed5c0071ccc4b7a1fd904e8464c677d9efe9a887 Mon Sep 17 00:00:00 2001 From: Fathima Mohammed Date: Wed, 25 May 2022 13:52:20 +0000 Subject: [PATCH 04/13] Added comments --- .../healthcare/FhirBundleWithMetadata.java | 10 ++++++-- .../beam/sdk/io/gcp/healthcare/FhirIO.java | 24 ++++++++++++------- .../GetStringHealthcareIOErrorFn.java | 4 ++-- 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleWithMetadata.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleWithMetadata.java index 8a61a094eb123..fa38ff7e35333 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleWithMetadata.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleWithMetadata.java @@ -23,8 +23,8 @@ import org.apache.beam.sdk.coders.DefaultCoder; /** - * FhirBundleWithMetadata represents a FHIR bundle, with it's metadata (eg. Hl7 messageId) in JSON - * format to be executed on the intermediate FHIR store. * + * FhirBundleWithMetadata represents a FHIR bundle, with it's metadata (eg. source ID like HL7 + * message path) in JSON format to be executed on any FHIR store. * */ @DefaultCoder(FhirBundleWithMetadataCoder.class) @AutoValue @@ -34,10 +34,16 @@ static Builder builder() { return new AutoValue_FhirBundleWithMetadata.Builder(); } + /** + * String representing the source of the Bundle to be written. Used to pass source data through + * the ExecuteBundles PTransform + */ public abstract String getMetadata(); + /** FHIR R4 bundle resource object as a string */ public abstract String getBundle(); + /** HTTP response from the FHIR store after attempting to write the Bundle method. */ public abstract String getResponse(); public static FhirBundleWithMetadata of( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index 6bfce4e70d744..96fb546511309 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk.io.gcp.healthcare; -import static org.apache.beam.sdk.io.gcp.healthcare.FhirIO.ExecuteBundles.FAILED_BUNDLES; -import static org.apache.beam.sdk.io.gcp.healthcare.FhirIO.ExecuteBundles.SUCCESSFUL_BUNDLES; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; import com.fasterxml.jackson.core.JsonProcessingException; @@ -59,8 +57,6 @@ import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.Import.ContentStructure; -import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.Write.AbstractResult; import org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.FhirResourcePagesIterator; import org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.HealthcareHttpException; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; @@ -1521,16 +1517,22 @@ private void parseResponse(ProcessContext context, String inputBody, HttpBody re } else { fail++; context.output( - Write.FAILED_BODY, + FAILED_BUNDLES, HealthcareIOError.of( - inputBody, HealthcareHttpException.of(statusCode, entry.toString()))); + context.element(), HealthcareHttpException.of(statusCode, entry.toString()))); } } EXECUTE_BUNDLE_RESOURCE_SUCCESS.inc(success); EXECUTE_BUNDLE_RESOURCE_ERRORS.inc(fail); } else if (bundleType.equals(BUNDLE_RESPONSE_TYPE_TRANSACTION)) { EXECUTE_BUNDLE_RESOURCE_SUCCESS.inc(entries.size()); - context.output(Write.SUCCESSFUL_BODY, bundle.toString()); + context.output( + SUCCESSFUL_BUNDLES, + FhirBundleWithMetadata.builder() + .setMetadata(context.element().getMetadata()) + .setBundle(inputBody) + .setResponse(resp.getData()) + .build()); } EXECUTE_BUNDLE_SUCCESS.inc(); return; @@ -1553,7 +1555,7 @@ private int parseBundleStatus(String status) { * ExecuteBundlesResult contains both successfully executed bundles and information help debugging * failed executions (eg metadata & error msgs). */ - public static class ExecuteBundlesResult extends AbstractResult { + public static class ExecuteBundlesResult extends Write.AbstractResult { private final Pipeline pipeline; private final PCollection successfulBundles; private final PCollection> failedBundles; @@ -1616,7 +1618,11 @@ public Pipeline getPipeline() { @Override public Map, PValue> expand() { - return ImmutableMap.of(SUCCESSFUL_BUNDLES, successfulBundles, FAILED_BUNDLES, failedBundles); + return ImmutableMap.of( + ExecuteBundles.SUCCESSFUL_BUNDLES, + successfulBundles, + ExecuteBundles.FAILED_BUNDLES, + failedBundles); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/GetStringHealthcareIOErrorFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/GetStringHealthcareIOErrorFn.java index 002726fe679d1..89ca016adc2fd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/GetStringHealthcareIOErrorFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/GetStringHealthcareIOErrorFn.java @@ -20,8 +20,8 @@ import org.apache.beam.sdk.transforms.DoFn; /** - * ParDo function that transforms a HealthcareIOError for a FhirExecuteBundleParameter to an error - * with the body (string) as the data resource, for backwards compatibility. + * ParDo function that transforms a HealthcareIOError for a FhirBundleWithMetadata to an error with + * the body (string) as the data resource, for backwards compatibility. */ public class GetStringHealthcareIOErrorFn extends DoFn, HealthcareIOError> { From 4f4d989b04776ba65016d887b0bb9f3104fe66de Mon Sep 17 00:00:00 2001 From: Fathima Mohammed Date: Wed, 25 May 2022 16:30:40 +0000 Subject: [PATCH 05/13] Fixed checkstyle errors. --- .../beam/sdk/io/gcp/healthcare/FhirBundleWithMetadata.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleWithMetadata.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleWithMetadata.java index fa38ff7e35333..cfe9a9f53d704 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleWithMetadata.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleWithMetadata.java @@ -36,11 +36,11 @@ static Builder builder() { /** * String representing the source of the Bundle to be written. Used to pass source data through - * the ExecuteBundles PTransform + * the ExecuteBundles PTransform. */ public abstract String getMetadata(); - /** FHIR R4 bundle resource object as a string */ + /** FHIR R4 bundle resource object as a string. */ public abstract String getBundle(); /** HTTP response from the FHIR store after attempting to write the Bundle method. */ From 28038e9ad7b75a74158d5b39251e0e7e9a6b29d5 Mon Sep 17 00:00:00 2001 From: Fathima Mohammed Date: Fri, 27 May 2022 03:35:22 +0000 Subject: [PATCH 06/13] Added FhirBundleResponse. Resolved comments. --- ...Metadata.java => FhirBundleParameter.java} | 32 ++---- ...der.java => FhirBundleParameterCoder.java} | 16 ++- .../io/gcp/healthcare/FhirBundleResponse.java | 61 +++++++++++ .../healthcare/FhirBundleResponseCoder.java | 47 ++++++++ .../beam/sdk/io/gcp/healthcare/FhirIO.java | 101 ++++++++++-------- .../GetStringHealthcareIOErrorFn.java | 40 ------- 6 files changed, 183 insertions(+), 114 deletions(-) rename sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/{FhirBundleWithMetadata.java => FhirBundleParameter.java} (58%) rename sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/{FhirBundleWithMetadataCoder.java => FhirBundleParameterCoder.java} (71%) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponse.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponseCoder.java delete mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/GetStringHealthcareIOErrorFn.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleWithMetadata.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleParameter.java similarity index 58% rename from sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleWithMetadata.java rename to sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleParameter.java index cfe9a9f53d704..64b8158d65ff6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleWithMetadata.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleParameter.java @@ -22,42 +22,34 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.coders.DefaultCoder; -/** - * FhirBundleWithMetadata represents a FHIR bundle, with it's metadata (eg. source ID like HL7 - * message path) in JSON format to be executed on any FHIR store. * - */ -@DefaultCoder(FhirBundleWithMetadataCoder.class) +/** FhirBundleParameter represents a FHIR bundle in JSON format to be executed on a FHIR store. */ +@DefaultCoder(FhirBundleParameterCoder.class) @AutoValue -public abstract class FhirBundleWithMetadata { +public abstract class FhirBundleParameter { static Builder builder() { - return new AutoValue_FhirBundleWithMetadata.Builder(); + return new AutoValue_FhirBundleParameter.Builder(); } /** - * String representing the source of the Bundle to be written. Used to pass source data through - * the ExecuteBundles PTransform. + * String representing the metadata of the Bundle to be written. Used to pass metadata through the + * ExecuteBundles PTransform. */ public abstract String getMetadata(); /** FHIR R4 bundle resource object as a string. */ public abstract String getBundle(); - /** HTTP response from the FHIR store after attempting to write the Bundle method. */ - public abstract String getResponse(); - - public static FhirBundleWithMetadata of( - @Nullable String metadata, String bundle, @Nullable String response) { + public static FhirBundleParameter of(@Nullable String metadata, String bundle) { - return FhirBundleWithMetadata.builder() + return FhirBundleParameter.builder() .setMetadata(Objects.toString(metadata, "")) .setBundle(bundle) - .setResponse(Objects.toString(response, "")) .build(); } - public static FhirBundleWithMetadata of(String bundle) { - return FhirBundleWithMetadata.of(null, bundle, null); + public static FhirBundleParameter of(String bundle) { + return FhirBundleParameter.of(null, bundle); } @AutoValue.Builder @@ -66,8 +58,6 @@ abstract static class Builder { abstract Builder setBundle(String bundle); - abstract Builder setResponse(String response); - - abstract FhirBundleWithMetadata build(); + abstract FhirBundleParameter build(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleWithMetadataCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleParameterCoder.java similarity index 71% rename from sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleWithMetadataCoder.java rename to sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleParameterCoder.java index 82af834962bba..95e8f82b65d61 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleWithMetadataCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleParameterCoder.java @@ -24,26 +24,24 @@ import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -/** Coder for {@link FhirBundleWithMetadata}. */ -public class FhirBundleWithMetadataCoder extends CustomCoder { +/** Coder for {@link FhirBundleParameter}. */ +public class FhirBundleParameterCoder extends CustomCoder { private static final NullableCoder STRING_CODER = NullableCoder.of(StringUtf8Coder.of()); - public static FhirBundleWithMetadataCoder of() { - return new FhirBundleWithMetadataCoder(); + public static FhirBundleParameterCoder of() { + return new FhirBundleParameterCoder(); } @Override - public void encode(FhirBundleWithMetadata value, OutputStream outStream) throws IOException { + public void encode(FhirBundleParameter value, OutputStream outStream) throws IOException { STRING_CODER.encode(value.getMetadata(), outStream); STRING_CODER.encode(value.getBundle(), outStream); - STRING_CODER.encode(value.getResponse(), outStream); } @Override - public FhirBundleWithMetadata decode(InputStream inStream) throws IOException { + public FhirBundleParameter decode(InputStream inStream) throws IOException { String metadata = STRING_CODER.decode(inStream); String bundle = STRING_CODER.decode(inStream); - String response = STRING_CODER.decode(inStream); - return FhirBundleWithMetadata.of(metadata, bundle, response); + return FhirBundleParameter.of(metadata, bundle); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponse.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponse.java new file mode 100644 index 0000000000000..6a607476c8f12 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponse.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.healthcare; + +import com.google.auto.value.AutoValue; +import java.util.Objects; +import javax.annotation.Nullable; + +@AutoValue +public abstract class FhirBundleResponse { + + static FhirBundleResponse.Builder builder() { + return new AutoValue_FhirBundleResponse.Builder(); + } + + /** + * String representing the metadata of the Bundle to be written. Used to pass metadata through the + * ExecuteBundles PTransform. + */ + public abstract FhirBundleParameter getFhirBundleParameter(); + + /** FHIR R4 bundle resource object as a string. */ + public abstract String getResponse(); + + public static FhirBundleResponse of( + FhirBundleParameter fhirBundleParameter, @Nullable String response) { + return FhirBundleResponse.builder() + .setFhirBundleParameter(fhirBundleParameter) + .setResponse(Objects.toString(response, "")) + .build(); + } + + public static FhirBundleResponse of(FhirBundleParameter fhirBundleParameter) { + return FhirBundleResponse.of(fhirBundleParameter, null); + } + + @AutoValue.Builder + abstract static class Builder { + abstract FhirBundleResponse.Builder setFhirBundleParameter( + FhirBundleParameter fhirBundleParameter); + + abstract FhirBundleResponse.Builder setResponse(String response); + + abstract FhirBundleResponse build(); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponseCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponseCoder.java new file mode 100644 index 0000000000000..d1cb6fd5c1950 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponseCoder.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.healthcare; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; + +public class FhirBundleResponseCoder extends CustomCoder { + private static final NullableCoder STRING_CODER = NullableCoder.of(StringUtf8Coder.of()); + private final FhirBundleParameterCoder fhirBundleParameterCoder = FhirBundleParameterCoder.of(); + + public static FhirBundleResponseCoder of() { + return new FhirBundleResponseCoder(); + } + + @Override + public void encode(FhirBundleResponse value, OutputStream outStream) throws IOException { + fhirBundleParameterCoder.encode(value.getFhirBundleParameter(), outStream); + STRING_CODER.encode(value.getResponse(), outStream); + } + + @Override + public FhirBundleResponse decode(InputStream inStream) throws IOException { + FhirBundleParameter fhirBundleParameter = fhirBundleParameterCoder.decode(inStream); + String response = STRING_CODER.decode(inStream); + return FhirBundleResponse.of(fhirBundleParameter, response); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index 96fb546511309..4dd33352e2ed5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -70,6 +70,7 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Wait; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; @@ -974,9 +975,9 @@ public AbstractResult expand(PCollection input) { default: return input .apply( - MapElements.into(TypeDescriptor.of(FhirBundleWithMetadata.class)) - .via(FhirBundleWithMetadata::of)) - .setCoder(FhirBundleWithMetadataCoder.of()) + MapElements.into(TypeDescriptor.of(FhirBundleParameter.class)) + .via(FhirBundleParameter::of)) + .setCoder(FhirBundleParameterCoder.of()) .apply(new ExecuteBundles(this.getFhirStore())); } } @@ -1376,13 +1377,13 @@ public enum ContentStructure { /** The type Execute bundles. */ public static class ExecuteBundles - extends PTransform, ExecuteBundlesResult> { + extends PTransform, ExecuteBundlesResult> { /** The TupleTag used for bundles that were executed successfully. */ - public static final TupleTag SUCCESSFUL_BUNDLES = new TupleTag<>(); + public static final TupleTag SUCCESSFUL_BUNDLES = new TupleTag<>(); /** The TupleTag used for bundles that failed to be executed for any reason. */ - public static final TupleTag> FAILED_BUNDLES = + public static final TupleTag> FAILED_BUNDLES = new TupleTag<>(); private final ValueProvider fhirStore; @@ -1405,22 +1406,20 @@ public ValueProvider getFhirStore() { } @Override - public ExecuteBundlesResult expand(PCollection input) { + public ExecuteBundlesResult expand(PCollection input) { PCollectionTuple bundles = input.apply( ParDo.of(new ExecuteBundlesFn(this.fhirStore)) .withOutputTags(SUCCESSFUL_BUNDLES, TupleTagList.of(FAILED_BUNDLES))); - bundles.get(SUCCESSFUL_BUNDLES).setCoder(FhirBundleWithMetadataCoder.of()); - bundles - .get(FAILED_BUNDLES) - .setCoder(HealthcareIOErrorCoder.of(FhirBundleWithMetadataCoder.of())); + bundles.get(SUCCESSFUL_BUNDLES).setCoder(FhirBundleResponseCoder.of()); + bundles.get(FAILED_BUNDLES).setCoder(HealthcareIOErrorCoder.of(FhirBundleResponseCoder.of())); return ExecuteBundlesResult.in( input.getPipeline(), bundles.get(SUCCESSFUL_BUNDLES), bundles.get(FAILED_BUNDLES)); } /** The type Write Fhir fn. */ - static class ExecuteBundlesFn extends DoFn { + static class ExecuteBundlesFn extends DoFn { private static final Counter EXECUTE_BUNDLE_ERRORS = Metrics.counter( @@ -1479,14 +1478,15 @@ public void executeBundles(ProcessContext context) { HttpBody resp = client.executeFhirBundle(fhirStore.get(), bundle); EXECUTE_BUNDLE_LATENCY_MS.update(Instant.now().toEpochMilli() - startTime); - parseResponse(context, bundle, resp); + parseResponse(context, resp); } catch (IOException | HealthcareHttpException e) { EXECUTE_BUNDLE_ERRORS.inc(); - context.output(FAILED_BUNDLES, HealthcareIOError.of(context.element(), e)); + context.output( + FAILED_BUNDLES, HealthcareIOError.of(FhirBundleResponse.of(context.element()), e)); } } - private void parseResponse(ProcessContext context, String inputBody, HttpBody resp) + private void parseResponse(ProcessContext context, HttpBody resp) throws JsonProcessingException { JsonObject bundle = JsonParser.parseString(resp.toString()).getAsJsonObject(); String bundleType = bundle.getAsJsonPrimitive(BUNDLE_TYPE_FIELD).getAsString(); @@ -1513,13 +1513,15 @@ private void parseResponse(ProcessContext context, String inputBody, HttpBody re // 20X's are successes, otherwise failure. if (statusCode / 100 == 2) { success++; - context.output(Write.SUCCESSFUL_BODY, entry.toString()); + context.output( + SUCCESSFUL_BUNDLES, FhirBundleResponse.of(context.element(), entry.toString())); } else { fail++; context.output( FAILED_BUNDLES, HealthcareIOError.of( - context.element(), HealthcareHttpException.of(statusCode, entry.toString()))); + FhirBundleResponse.of(context.element()), + HealthcareHttpException.of(statusCode, entry.toString()))); } } EXECUTE_BUNDLE_RESOURCE_SUCCESS.inc(success); @@ -1527,12 +1529,7 @@ private void parseResponse(ProcessContext context, String inputBody, HttpBody re } else if (bundleType.equals(BUNDLE_RESPONSE_TYPE_TRANSACTION)) { EXECUTE_BUNDLE_RESOURCE_SUCCESS.inc(entries.size()); context.output( - SUCCESSFUL_BUNDLES, - FhirBundleWithMetadata.builder() - .setMetadata(context.element().getMetadata()) - .setBundle(inputBody) - .setResponse(resp.getData()) - .build()); + SUCCESSFUL_BUNDLES, FhirBundleResponse.of(context.element(), bundle.toString())); } EXECUTE_BUNDLE_SUCCESS.inc(); return; @@ -1557,13 +1554,13 @@ private int parseBundleStatus(String status) { */ public static class ExecuteBundlesResult extends Write.AbstractResult { private final Pipeline pipeline; - private final PCollection successfulBundles; - private final PCollection> failedBundles; + private final PCollection successfulBundles; + private final PCollection> failedBundles; private ExecuteBundlesResult( Pipeline pipeline, - PCollection successfulBundles, - PCollection> failedBundles) { + PCollection successfulBundles, + PCollection> failedBundles) { this.pipeline = pipeline; this.successfulBundles = successfulBundles; this.failedBundles = failedBundles; @@ -1575,37 +1572,53 @@ private ExecuteBundlesResult( */ public static ExecuteBundlesResult in( Pipeline pipeline, - PCollection successfulBundles, - PCollection> failedBundles) { + PCollection successfulBundles, + PCollection> failedBundles) { return new ExecuteBundlesResult(pipeline, successfulBundles, failedBundles); } - /** Gets successful FhirBundleWithMetadata from execute bundles operation. */ - public PCollection getSuccessfulBundlesWithMetadata() { - return this.successfulBundles; - } - - /** - * Gets failed FhirBundleWithMetadata with metadata wrapped inside HealthcareIOError. The bundle - * field could be null. - */ - public PCollection> getFailedBundles() { - return this.failedBundles; - } - @Override public PCollection getSuccessfulBodies() { return this.successfulBundles.apply( - MapElements.into(TypeDescriptors.strings()).via(FhirBundleWithMetadata::getBundle)); + MapElements.into(TypeDescriptors.strings()) + .via(bundleResponse -> bundleResponse.getFhirBundleParameter().getBundle())); + } + + /** Gets successful FhirBundleResponse from execute bundles operation. */ + public PCollection getSuccessfulBundles() { + return this.successfulBundles; } @Override public PCollection> getFailedBodies() { return this.failedBundles - .apply("GetBodiesFromBundles", ParDo.of(new GetStringHealthcareIOErrorFn())) + .apply( + "GetBodiesFromBundles", + MapElements.via( + new SimpleFunction< + HealthcareIOError, HealthcareIOError>() { + @Override + public HealthcareIOError apply( + HealthcareIOError input) { + return new HealthcareIOError<>( + input.getDataResource().getFhirBundleParameter().getBundle(), + input.getErrorMessage(), + input.getStackTrace(), + input.getObservedTime(), + input.getStatusCode()); + } + })) .setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())); } + /** + * Gets failed FhirBundleResponse wrapped inside HealthcareIOError. The bundle field could be + * null. + */ + public PCollection> getFailedBundles() { + return this.failedBundles; + } + @Override public PCollection> getFailedFiles() { return super.pipeline.apply(Create.empty(HealthcareIOErrorCoder.of(StringUtf8Coder.of()))); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/GetStringHealthcareIOErrorFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/GetStringHealthcareIOErrorFn.java deleted file mode 100644 index 89ca016adc2fd..0000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/GetStringHealthcareIOErrorFn.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.healthcare; - -import org.apache.beam.sdk.transforms.DoFn; - -/** - * ParDo function that transforms a HealthcareIOError for a FhirBundleWithMetadata to an error with - * the body (string) as the data resource, for backwards compatibility. - */ -public class GetStringHealthcareIOErrorFn - extends DoFn, HealthcareIOError> { - - @ProcessElement - public void process(ProcessContext context) { - HealthcareIOError input = context.element(); - context.output( - new HealthcareIOError<>( - input.getDataResource().getBundle(), - input.getErrorMessage(), - input.getStackTrace(), - input.getObservedTime(), - input.getStatusCode())); - } -} From 7cdf181bd5fbd53fdc521e5aa9287b86a3d3c9c6 Mon Sep 17 00:00:00 2001 From: Fathima Mohammed Date: Fri, 27 May 2022 18:03:20 +0000 Subject: [PATCH 07/13] Updated getFailedBodies method. --- .../beam/sdk/io/gcp/healthcare/FhirIO.java | 33 +++++++++---------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index 4dd33352e2ed5..5645e6f21037f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -1591,24 +1591,21 @@ public PCollection getSuccessfulBundles() { @Override public PCollection> getFailedBodies() { - return this.failedBundles - .apply( - "GetBodiesFromBundles", - MapElements.via( - new SimpleFunction< - HealthcareIOError, HealthcareIOError>() { - @Override - public HealthcareIOError apply( - HealthcareIOError input) { - return new HealthcareIOError<>( - input.getDataResource().getFhirBundleParameter().getBundle(), - input.getErrorMessage(), - input.getStackTrace(), - input.getObservedTime(), - input.getStatusCode()); - } - })) - .setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())); + return this.failedBundles.apply( + MapElements.via( + new SimpleFunction< + HealthcareIOError, HealthcareIOError>() { + @Override + public HealthcareIOError apply( + HealthcareIOError input) { + return new HealthcareIOError<>( + input.getDataResource().getFhirBundleParameter().getBundle(), + input.getErrorMessage(), + input.getStackTrace(), + input.getObservedTime(), + input.getStatusCode()); + } + })); } /** From ed480b91cf6669bcdc3eaf0497f8a1676532d5ed Mon Sep 17 00:00:00 2001 From: Fathima Mohammed Date: Fri, 27 May 2022 20:05:41 +0000 Subject: [PATCH 08/13] Updated FailedBundles. --- .../io/gcp/healthcare/FhirBundleResponse.java | 14 ++-- .../beam/sdk/io/gcp/healthcare/FhirIO.java | 69 ++++++++++--------- .../sdk/io/gcp/healthcare/FhirIOWriteIT.java | 28 ++++++-- 3 files changed, 66 insertions(+), 45 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponse.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponse.java index 6a607476c8f12..1941fa0ffb6f2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponse.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponse.java @@ -28,13 +28,13 @@ static FhirBundleResponse.Builder builder() { return new AutoValue_FhirBundleResponse.Builder(); } - /** - * String representing the metadata of the Bundle to be written. Used to pass metadata through the - * ExecuteBundles PTransform. - */ + /** FhirBundleParameter represents a FHIR bundle in JSON format to be executed on a FHIR store. */ public abstract FhirBundleParameter getFhirBundleParameter(); - /** FHIR R4 bundle resource object as a string. */ + /** + * HTTP response from the FHIR store after attempting to write the Bundle method. The value varies + * depending on BATCH vs TRANSACTION bundles. + */ public abstract String getResponse(); public static FhirBundleResponse of( @@ -45,10 +45,6 @@ public static FhirBundleResponse of( .build(); } - public static FhirBundleResponse of(FhirBundleParameter fhirBundleParameter) { - return FhirBundleResponse.of(fhirBundleParameter, null); - } - @AutoValue.Builder abstract static class Builder { abstract FhirBundleResponse.Builder setFhirBundleParameter( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index 5645e6f21037f..d7be674de4487 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -31,6 +31,7 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; import java.io.IOException; +import java.io.Serializable; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; @@ -1383,7 +1384,7 @@ public static class ExecuteBundles public static final TupleTag SUCCESSFUL_BUNDLES = new TupleTag<>(); /** The TupleTag used for bundles that failed to be executed for any reason. */ - public static final TupleTag> FAILED_BUNDLES = + public static final TupleTag> FAILED_BUNDLES = new TupleTag<>(); private final ValueProvider fhirStore; @@ -1412,7 +1413,9 @@ public ExecuteBundlesResult expand(PCollection input) { ParDo.of(new ExecuteBundlesFn(this.fhirStore)) .withOutputTags(SUCCESSFUL_BUNDLES, TupleTagList.of(FAILED_BUNDLES))); bundles.get(SUCCESSFUL_BUNDLES).setCoder(FhirBundleResponseCoder.of()); - bundles.get(FAILED_BUNDLES).setCoder(HealthcareIOErrorCoder.of(FhirBundleResponseCoder.of())); + bundles + .get(FAILED_BUNDLES) + .setCoder(HealthcareIOErrorCoder.of(FhirBundleParameterCoder.of())); return ExecuteBundlesResult.in( input.getPipeline(), bundles.get(SUCCESSFUL_BUNDLES), bundles.get(FAILED_BUNDLES)); @@ -1481,8 +1484,7 @@ public void executeBundles(ProcessContext context) { parseResponse(context, resp); } catch (IOException | HealthcareHttpException e) { EXECUTE_BUNDLE_ERRORS.inc(); - context.output( - FAILED_BUNDLES, HealthcareIOError.of(FhirBundleResponse.of(context.element()), e)); + context.output(FAILED_BUNDLES, HealthcareIOError.of(context.element(), e)); } } @@ -1514,14 +1516,15 @@ private void parseResponse(ProcessContext context, HttpBody resp) if (statusCode / 100 == 2) { success++; context.output( - SUCCESSFUL_BUNDLES, FhirBundleResponse.of(context.element(), entry.toString())); + SUCCESSFUL_BUNDLES, + FhirBundleResponse.of(context.element(), entry.getAsString())); } else { fail++; context.output( FAILED_BUNDLES, HealthcareIOError.of( - FhirBundleResponse.of(context.element()), - HealthcareHttpException.of(statusCode, entry.toString()))); + context.element(), + HealthcareHttpException.of(statusCode, entry.getAsString()))); } } EXECUTE_BUNDLE_RESOURCE_SUCCESS.inc(success); @@ -1529,7 +1532,7 @@ private void parseResponse(ProcessContext context, HttpBody resp) } else if (bundleType.equals(BUNDLE_RESPONSE_TYPE_TRANSACTION)) { EXECUTE_BUNDLE_RESOURCE_SUCCESS.inc(entries.size()); context.output( - SUCCESSFUL_BUNDLES, FhirBundleResponse.of(context.element(), bundle.toString())); + SUCCESSFUL_BUNDLES, FhirBundleResponse.of(context.element(), bundle.getAsString())); } EXECUTE_BUNDLE_SUCCESS.inc(); return; @@ -1552,15 +1555,15 @@ private int parseBundleStatus(String status) { * ExecuteBundlesResult contains both successfully executed bundles and information help debugging * failed executions (eg metadata & error msgs). */ - public static class ExecuteBundlesResult extends Write.AbstractResult { + public static class ExecuteBundlesResult extends Write.AbstractResult implements Serializable { private final Pipeline pipeline; private final PCollection successfulBundles; - private final PCollection> failedBundles; + private final PCollection> failedBundles; private ExecuteBundlesResult( Pipeline pipeline, PCollection successfulBundles, - PCollection> failedBundles) { + PCollection> failedBundles) { this.pipeline = pipeline; this.successfulBundles = successfulBundles; this.failedBundles = failedBundles; @@ -1573,15 +1576,17 @@ private ExecuteBundlesResult( public static ExecuteBundlesResult in( Pipeline pipeline, PCollection successfulBundles, - PCollection> failedBundles) { + PCollection> failedBundles) { return new ExecuteBundlesResult(pipeline, successfulBundles, failedBundles); } @Override public PCollection getSuccessfulBodies() { - return this.successfulBundles.apply( - MapElements.into(TypeDescriptors.strings()) - .via(bundleResponse -> bundleResponse.getFhirBundleParameter().getBundle())); + return this.successfulBundles + .apply( + MapElements.into(TypeDescriptors.strings()) + .via(bundleResponse -> bundleResponse.getFhirBundleParameter().getBundle())) + .setCoder(StringUtf8Coder.of()); } /** Gets successful FhirBundleResponse from execute bundles operation. */ @@ -1591,28 +1596,30 @@ public PCollection getSuccessfulBundles() { @Override public PCollection> getFailedBodies() { - return this.failedBundles.apply( - MapElements.via( - new SimpleFunction< - HealthcareIOError, HealthcareIOError>() { - @Override - public HealthcareIOError apply( - HealthcareIOError input) { - return new HealthcareIOError<>( - input.getDataResource().getFhirBundleParameter().getBundle(), - input.getErrorMessage(), - input.getStackTrace(), - input.getObservedTime(), - input.getStatusCode()); - } - })); + return this.failedBundles + .apply( + MapElements.via( + new SimpleFunction< + HealthcareIOError, HealthcareIOError>() { + @Override + public HealthcareIOError apply( + HealthcareIOError input) { + return new HealthcareIOError<>( + input.getDataResource().getBundle(), + input.getErrorMessage(), + input.getStackTrace(), + input.getObservedTime(), + input.getStatusCode()); + } + })) + .setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())); } /** * Gets failed FhirBundleResponse wrapped inside HealthcareIOError. The bundle field could be * null. */ - public PCollection> getFailedBundles() { + public PCollection> getFailedBundles() { return this.failedBundles; } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOWriteIT.java index d524166ec0543..8c4cedc5e3359 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOWriteIT.java @@ -28,7 +28,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.stream.Collectors; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.ExecuteBundles; +import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.ExecuteBundlesResult; import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.Import.ContentStructure; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; @@ -120,11 +123,13 @@ public void testFhirIO_ExecuteBundle() throws IOException { @Test public void testFhirIO_ExecuteBundle_parseResponse() { - List bundles = BUNDLES.get("BUNDLE_PARSE_TEST"); - FhirIO.Write.AbstractResult writeResult = - pipeline - .apply(Create.of(bundles)) - .apply(FhirIO.Write.executeBundles(options.getFhirStore())); + List bundles = + BUNDLES.get("BUNDLE_PARSE_TEST").stream() + .map(bundle -> FhirBundleParameter.of(bundle)) + .collect(Collectors.toList()); + + ExecuteBundlesResult writeResult = + pipeline.apply(Create.of(bundles)).apply(new ExecuteBundles(options.getFhirStore())); PAssert.that(writeResult.getSuccessfulBodies()) .satisfies( @@ -137,6 +142,19 @@ public void testFhirIO_ExecuteBundle_parseResponse() { assertEquals(2, counter); return null; }); + + PAssert.that(writeResult.getSuccessfulBundles()) + .satisfies( + input -> { + int counter = 0; + for (FhirBundleResponse resp : input) { + assertFalse(resp.getResponse().isEmpty()); + counter++; + } + assertEquals(2, counter); + return null; + }); + PAssert.that(writeResult.getFailedBodies()) .satisfies( input -> { From bc21df1ba829ec1ba9d4d6d03d4d16acf4970575 Mon Sep 17 00:00:00 2001 From: Fathima Mohammed Date: Sun, 29 May 2022 17:06:05 +0000 Subject: [PATCH 09/13] Add a nested class for ParDo fn. --- .../beam/sdk/io/gcp/healthcare/FhirIO.java | 57 ++++++++++--------- 1 file changed, 29 insertions(+), 28 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index d7be674de4487..c704f5e3ebe0e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -31,7 +31,6 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; import java.io.IOException; -import java.io.Serializable; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; @@ -71,7 +70,6 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Wait; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; @@ -1413,9 +1411,7 @@ public ExecuteBundlesResult expand(PCollection input) { ParDo.of(new ExecuteBundlesFn(this.fhirStore)) .withOutputTags(SUCCESSFUL_BUNDLES, TupleTagList.of(FAILED_BUNDLES))); bundles.get(SUCCESSFUL_BUNDLES).setCoder(FhirBundleResponseCoder.of()); - bundles - .get(FAILED_BUNDLES) - .setCoder(HealthcareIOErrorCoder.of(FhirBundleParameterCoder.of())); + bundles.get(FAILED_BUNDLES).setCoder(HealthcareIOErrorCoder.of(FhirBundleParameterCoder.of())); return ExecuteBundlesResult.in( input.getPipeline(), bundles.get(SUCCESSFUL_BUNDLES), bundles.get(FAILED_BUNDLES)); @@ -1484,7 +1480,8 @@ public void executeBundles(ProcessContext context) { parseResponse(context, resp); } catch (IOException | HealthcareHttpException e) { EXECUTE_BUNDLE_ERRORS.inc(); - context.output(FAILED_BUNDLES, HealthcareIOError.of(context.element(), e)); + context.output( + FAILED_BUNDLES, HealthcareIOError.of(context.element(), e)); } } @@ -1516,8 +1513,7 @@ private void parseResponse(ProcessContext context, HttpBody resp) if (statusCode / 100 == 2) { success++; context.output( - SUCCESSFUL_BUNDLES, - FhirBundleResponse.of(context.element(), entry.getAsString())); + SUCCESSFUL_BUNDLES, FhirBundleResponse.of(context.element(), entry.getAsString())); } else { fail++; context.output( @@ -1555,7 +1551,8 @@ private int parseBundleStatus(String status) { * ExecuteBundlesResult contains both successfully executed bundles and information help debugging * failed executions (eg metadata & error msgs). */ - public static class ExecuteBundlesResult extends Write.AbstractResult implements Serializable { + public static class ExecuteBundlesResult extends Write.AbstractResult { + private final Pipeline pipeline; private final PCollection successfulBundles; private final PCollection> failedBundles; @@ -1582,14 +1579,15 @@ public static ExecuteBundlesResult in( @Override public PCollection getSuccessfulBodies() { - return this.successfulBundles - .apply( + return this.successfulBundles.apply( MapElements.into(TypeDescriptors.strings()) .via(bundleResponse -> bundleResponse.getFhirBundleParameter().getBundle())) .setCoder(StringUtf8Coder.of()); } - /** Gets successful FhirBundleResponse from execute bundles operation. */ + /** + * Gets successful FhirBundleResponse from execute bundles operation. + */ public PCollection getSuccessfulBundles() { return this.successfulBundles; } @@ -1597,21 +1595,7 @@ public PCollection getSuccessfulBundles() { @Override public PCollection> getFailedBodies() { return this.failedBundles - .apply( - MapElements.via( - new SimpleFunction< - HealthcareIOError, HealthcareIOError>() { - @Override - public HealthcareIOError apply( - HealthcareIOError input) { - return new HealthcareIOError<>( - input.getDataResource().getBundle(), - input.getErrorMessage(), - input.getStackTrace(), - input.getObservedTime(), - input.getStatusCode()); - } - })) + .apply(ParDo.of(new GetStringHealthcareIOErrorFn())) .setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())); } @@ -1644,7 +1628,24 @@ public Map, PValue> expand() { @Override public void finishSpecifyingOutput( - String transformName, PInput input, PTransform transform) {} + String transformName, PInput input, PTransform transform) { + } + + private static class GetStringHealthcareIOErrorFn extends + DoFn, HealthcareIOError> { + + @ProcessElement + public void process(ProcessContext context) { + HealthcareIOError input = context.element(); + context.output( + new HealthcareIOError( + input.getDataResource().getBundle(), + input.getErrorMessage(), + input.getStackTrace(), + input.getObservedTime(), + input.getStatusCode())); + } + } } /** Export FHIR resources from a FHIR store to new line delimited json files on GCS. */ From 45169a0b153b9484915644aa25c3779d6d5c83b0 Mon Sep 17 00:00:00 2001 From: Fathima Mohammed Date: Sun, 29 May 2022 17:09:00 +0000 Subject: [PATCH 10/13] Spotless Apply fix. --- .../beam/sdk/io/gcp/healthcare/FhirIO.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index c704f5e3ebe0e..baf35d55d6d4a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -1411,7 +1411,9 @@ public ExecuteBundlesResult expand(PCollection input) { ParDo.of(new ExecuteBundlesFn(this.fhirStore)) .withOutputTags(SUCCESSFUL_BUNDLES, TupleTagList.of(FAILED_BUNDLES))); bundles.get(SUCCESSFUL_BUNDLES).setCoder(FhirBundleResponseCoder.of()); - bundles.get(FAILED_BUNDLES).setCoder(HealthcareIOErrorCoder.of(FhirBundleParameterCoder.of())); + bundles + .get(FAILED_BUNDLES) + .setCoder(HealthcareIOErrorCoder.of(FhirBundleParameterCoder.of())); return ExecuteBundlesResult.in( input.getPipeline(), bundles.get(SUCCESSFUL_BUNDLES), bundles.get(FAILED_BUNDLES)); @@ -1480,8 +1482,7 @@ public void executeBundles(ProcessContext context) { parseResponse(context, resp); } catch (IOException | HealthcareHttpException e) { EXECUTE_BUNDLE_ERRORS.inc(); - context.output( - FAILED_BUNDLES, HealthcareIOError.of(context.element(), e)); + context.output(FAILED_BUNDLES, HealthcareIOError.of(context.element(), e)); } } @@ -1513,7 +1514,8 @@ private void parseResponse(ProcessContext context, HttpBody resp) if (statusCode / 100 == 2) { success++; context.output( - SUCCESSFUL_BUNDLES, FhirBundleResponse.of(context.element(), entry.getAsString())); + SUCCESSFUL_BUNDLES, + FhirBundleResponse.of(context.element(), entry.getAsString())); } else { fail++; context.output( @@ -1579,15 +1581,14 @@ public static ExecuteBundlesResult in( @Override public PCollection getSuccessfulBodies() { - return this.successfulBundles.apply( + return this.successfulBundles + .apply( MapElements.into(TypeDescriptors.strings()) .via(bundleResponse -> bundleResponse.getFhirBundleParameter().getBundle())) .setCoder(StringUtf8Coder.of()); } - /** - * Gets successful FhirBundleResponse from execute bundles operation. - */ + /** Gets successful FhirBundleResponse from execute bundles operation. */ public PCollection getSuccessfulBundles() { return this.successfulBundles; } @@ -1628,11 +1629,10 @@ public Map, PValue> expand() { @Override public void finishSpecifyingOutput( - String transformName, PInput input, PTransform transform) { - } + String transformName, PInput input, PTransform transform) {} - private static class GetStringHealthcareIOErrorFn extends - DoFn, HealthcareIOError> { + private static class GetStringHealthcareIOErrorFn + extends DoFn, HealthcareIOError> { @ProcessElement public void process(ProcessContext context) { From 32e27c1d7e1d301c809056cbf85db177e94e756c Mon Sep 17 00:00:00 2001 From: Fathima Mohammed Date: Sun, 29 May 2022 20:23:08 +0000 Subject: [PATCH 11/13] Fix Post commit tests. --- .../java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java | 2 +- .../org/apache/beam/sdk/io/gcp/healthcare/FhirIOWriteIT.java | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index baf35d55d6d4a..0ed2f453ac1e7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -1530,7 +1530,7 @@ private void parseResponse(ProcessContext context, HttpBody resp) } else if (bundleType.equals(BUNDLE_RESPONSE_TYPE_TRANSACTION)) { EXECUTE_BUNDLE_RESOURCE_SUCCESS.inc(entries.size()); context.output( - SUCCESSFUL_BUNDLES, FhirBundleResponse.of(context.element(), bundle.getAsString())); + SUCCESSFUL_BUNDLES, FhirBundleResponse.of(context.element(), bundle.toString())); } EXECUTE_BUNDLE_SUCCESS.inc(); return; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOWriteIT.java index 8c4cedc5e3359..e6a71f37c2a48 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOWriteIT.java @@ -129,7 +129,9 @@ public void testFhirIO_ExecuteBundle_parseResponse() { .collect(Collectors.toList()); ExecuteBundlesResult writeResult = - pipeline.apply(Create.of(bundles)).apply(new ExecuteBundles(options.getFhirStore())); + pipeline + .apply(Create.of(bundles).withCoder(FhirBundleParameterCoder.of())) + .apply(new ExecuteBundles(options.getFhirStore())); PAssert.that(writeResult.getSuccessfulBodies()) .satisfies( From 49af03306fa3d49268ba1ada21c2d2ad65f443ef Mon Sep 17 00:00:00 2001 From: Fathima Mohammed Date: Sun, 29 May 2022 21:07:53 +0000 Subject: [PATCH 12/13] Fix Post commit tests. --- .../java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index 0ed2f453ac1e7..7995021935eae 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -1514,15 +1514,13 @@ private void parseResponse(ProcessContext context, HttpBody resp) if (statusCode / 100 == 2) { success++; context.output( - SUCCESSFUL_BUNDLES, - FhirBundleResponse.of(context.element(), entry.getAsString())); + SUCCESSFUL_BUNDLES, FhirBundleResponse.of(context.element(), entry.toString())); } else { fail++; context.output( FAILED_BUNDLES, HealthcareIOError.of( - context.element(), - HealthcareHttpException.of(statusCode, entry.getAsString()))); + context.element(), HealthcareHttpException.of(statusCode, entry.toString()))); } } EXECUTE_BUNDLE_RESOURCE_SUCCESS.inc(success); From e80d0b9efb1b08652df29932c5a34dd28b95f352 Mon Sep 17 00:00:00 2001 From: Fathima Mohammed Date: Thu, 2 Jun 2022 15:15:03 +0000 Subject: [PATCH 13/13] Use defaultSchema instead of custom coder. --- .../gcp/healthcare/FhirBundleParameter.java | 10 ++-- .../healthcare/FhirBundleParameterCoder.java | 47 ------------------- .../io/gcp/healthcare/FhirBundleResponse.java | 8 +++- .../healthcare/FhirBundleResponseCoder.java | 47 ------------------- .../beam/sdk/io/gcp/healthcare/FhirIO.java | 7 +-- .../sdk/io/gcp/healthcare/FhirIOWriteIT.java | 3 +- 6 files changed, 20 insertions(+), 102 deletions(-) delete mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleParameterCoder.java delete mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponseCoder.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleParameter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleParameter.java index 64b8158d65ff6..bd812d59164ee 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleParameter.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleParameter.java @@ -18,14 +18,17 @@ package org.apache.beam.sdk.io.gcp.healthcare; import com.google.auto.value.AutoValue; +import java.io.Serializable; import java.util.Objects; import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaCreate; /** FhirBundleParameter represents a FHIR bundle in JSON format to be executed on a FHIR store. */ -@DefaultCoder(FhirBundleParameterCoder.class) +@DefaultSchema(AutoValueSchema.class) @AutoValue -public abstract class FhirBundleParameter { +public abstract class FhirBundleParameter implements Serializable { static Builder builder() { return new AutoValue_FhirBundleParameter.Builder(); @@ -40,6 +43,7 @@ static Builder builder() { /** FHIR R4 bundle resource object as a string. */ public abstract String getBundle(); + @SchemaCreate public static FhirBundleParameter of(@Nullable String metadata, String bundle) { return FhirBundleParameter.builder() diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleParameterCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleParameterCoder.java deleted file mode 100644 index 95e8f82b65d61..0000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleParameterCoder.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.healthcare; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.NullableCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; - -/** Coder for {@link FhirBundleParameter}. */ -public class FhirBundleParameterCoder extends CustomCoder { - private static final NullableCoder STRING_CODER = NullableCoder.of(StringUtf8Coder.of()); - - public static FhirBundleParameterCoder of() { - return new FhirBundleParameterCoder(); - } - - @Override - public void encode(FhirBundleParameter value, OutputStream outStream) throws IOException { - STRING_CODER.encode(value.getMetadata(), outStream); - STRING_CODER.encode(value.getBundle(), outStream); - } - - @Override - public FhirBundleParameter decode(InputStream inStream) throws IOException { - String metadata = STRING_CODER.decode(inStream); - String bundle = STRING_CODER.decode(inStream); - return FhirBundleParameter.of(metadata, bundle); - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponse.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponse.java index 1941fa0ffb6f2..9f971b2bb9535 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponse.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponse.java @@ -18,11 +18,16 @@ package org.apache.beam.sdk.io.gcp.healthcare; import com.google.auto.value.AutoValue; +import java.io.Serializable; import java.util.Objects; import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaCreate; +@DefaultSchema(AutoValueSchema.class) @AutoValue -public abstract class FhirBundleResponse { +public abstract class FhirBundleResponse implements Serializable { static FhirBundleResponse.Builder builder() { return new AutoValue_FhirBundleResponse.Builder(); @@ -37,6 +42,7 @@ static FhirBundleResponse.Builder builder() { */ public abstract String getResponse(); + @SchemaCreate public static FhirBundleResponse of( FhirBundleParameter fhirBundleParameter, @Nullable String response) { return FhirBundleResponse.builder() diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponseCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponseCoder.java deleted file mode 100644 index d1cb6fd5c1950..0000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponseCoder.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.healthcare; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.NullableCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; - -public class FhirBundleResponseCoder extends CustomCoder { - private static final NullableCoder STRING_CODER = NullableCoder.of(StringUtf8Coder.of()); - private final FhirBundleParameterCoder fhirBundleParameterCoder = FhirBundleParameterCoder.of(); - - public static FhirBundleResponseCoder of() { - return new FhirBundleResponseCoder(); - } - - @Override - public void encode(FhirBundleResponse value, OutputStream outStream) throws IOException { - fhirBundleParameterCoder.encode(value.getFhirBundleParameter(), outStream); - STRING_CODER.encode(value.getResponse(), outStream); - } - - @Override - public FhirBundleResponse decode(InputStream inStream) throws IOException { - FhirBundleParameter fhirBundleParameter = fhirBundleParameterCoder.decode(inStream); - String response = STRING_CODER.decode(inStream); - return FhirBundleResponse.of(fhirBundleParameter, response); - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index ce1924e1ebb3b..6f2fb3125b6de 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -45,6 +45,7 @@ import java.util.UUID; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.FileIO; @@ -965,7 +966,7 @@ public AbstractResult expand(PCollection input) { .apply( MapElements.into(TypeDescriptor.of(FhirBundleParameter.class)) .via(FhirBundleParameter::of)) - .setCoder(FhirBundleParameterCoder.of()) + .setCoder(SerializableCoder.of(FhirBundleParameter.class)) .apply(new ExecuteBundles(this.getFhirStore())); } } @@ -1399,10 +1400,10 @@ public ExecuteBundlesResult expand(PCollection input) { input.apply( ParDo.of(new ExecuteBundlesFn(this.fhirStore)) .withOutputTags(SUCCESSFUL_BUNDLES, TupleTagList.of(FAILED_BUNDLES))); - bundles.get(SUCCESSFUL_BUNDLES).setCoder(FhirBundleResponseCoder.of()); + bundles.get(SUCCESSFUL_BUNDLES).setCoder(SerializableCoder.of(FhirBundleResponse.class)); bundles .get(FAILED_BUNDLES) - .setCoder(HealthcareIOErrorCoder.of(FhirBundleParameterCoder.of())); + .setCoder(HealthcareIOErrorCoder.of(SerializableCoder.of(FhirBundleParameter.class))); return ExecuteBundlesResult.in( input.getPipeline(), bundles.get(SUCCESSFUL_BUNDLES), bundles.get(FAILED_BUNDLES)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOWriteIT.java index e6a71f37c2a48..1729025e48e25 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOWriteIT.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.stream.Collectors; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.ExecuteBundles; import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.ExecuteBundlesResult; import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.Import.ContentStructure; @@ -130,7 +131,7 @@ public void testFhirIO_ExecuteBundle_parseResponse() { ExecuteBundlesResult writeResult = pipeline - .apply(Create.of(bundles).withCoder(FhirBundleParameterCoder.of())) + .apply(Create.of(bundles).withCoder(SerializableCoder.of(FhirBundleParameter.class))) .apply(new ExecuteBundles(options.getFhirStore())); PAssert.that(writeResult.getSuccessfulBodies())