diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleParameter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleParameter.java new file mode 100644 index 0000000000000..bd812d59164ee --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleParameter.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.healthcare; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaCreate; + +/** FhirBundleParameter represents a FHIR bundle in JSON format to be executed on a FHIR store. */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class FhirBundleParameter implements Serializable { + + static Builder builder() { + return new AutoValue_FhirBundleParameter.Builder(); + } + + /** + * String representing the metadata of the Bundle to be written. Used to pass metadata through the + * ExecuteBundles PTransform. + */ + public abstract String getMetadata(); + + /** FHIR R4 bundle resource object as a string. */ + public abstract String getBundle(); + + @SchemaCreate + public static FhirBundleParameter of(@Nullable String metadata, String bundle) { + + return FhirBundleParameter.builder() + .setMetadata(Objects.toString(metadata, "")) + .setBundle(bundle) + .build(); + } + + public static FhirBundleParameter of(String bundle) { + return FhirBundleParameter.of(null, bundle); + } + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setMetadata(String metadata); + + abstract Builder setBundle(String bundle); + + abstract FhirBundleParameter build(); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponse.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponse.java new file mode 100644 index 0000000000000..9f971b2bb9535 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirBundleResponse.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.healthcare; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaCreate; + +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class FhirBundleResponse implements Serializable { + + static FhirBundleResponse.Builder builder() { + return new AutoValue_FhirBundleResponse.Builder(); + } + + /** FhirBundleParameter represents a FHIR bundle in JSON format to be executed on a FHIR store. */ + public abstract FhirBundleParameter getFhirBundleParameter(); + + /** + * HTTP response from the FHIR store after attempting to write the Bundle method. The value varies + * depending on BATCH vs TRANSACTION bundles. + */ + public abstract String getResponse(); + + @SchemaCreate + public static FhirBundleResponse of( + FhirBundleParameter fhirBundleParameter, @Nullable String response) { + return FhirBundleResponse.builder() + .setFhirBundleParameter(fhirBundleParameter) + .setResponse(Objects.toString(response, "")) + .build(); + } + + @AutoValue.Builder + abstract static class Builder { + abstract FhirBundleResponse.Builder setFhirBundleParameter( + FhirBundleParameter fhirBundleParameter); + + abstract FhirBundleResponse.Builder setResponse(String response); + + abstract FhirBundleResponse build(); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index dbbbac259da27..6f2fb3125b6de 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -45,6 +45,7 @@ import java.util.UUID; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.FileIO; @@ -56,7 +57,6 @@ import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.Import.ContentStructure; import org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.FhirResourcePagesIterator; import org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.HealthcareHttpException; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; @@ -624,7 +624,7 @@ private String fetchResource(HealthcareApiClient client, String resourceName) /** The type Write. */ @AutoValue - public abstract static class Write extends PTransform, Write.Result> { + public abstract static class Write extends PTransform, Write.AbstractResult> { /** The tag for successful writes to FHIR store. */ public static final TupleTag SUCCESSFUL_BODY = new TupleTag() {}; @@ -652,8 +652,27 @@ public enum WriteMethod { IMPORT } + public abstract static class AbstractResult implements POutput { + private Pipeline pipeline; + + public abstract PCollection getSuccessfulBodies(); + + public abstract PCollection> getFailedBodies(); + + public abstract PCollection> getFailedFiles(); + + @Override + public Pipeline getPipeline() { + return this.pipeline; + } + + @Override + public void finishSpecifyingOutput( + String transformName, PInput input, PTransform transform) {} + } + /** The type Result. */ - public static class Result implements POutput { + public static class Result extends AbstractResult { private final Pipeline pipeline; private final PCollection successfulBodies; @@ -689,6 +708,7 @@ static Result in( * * @return the entries that were inserted */ + @Override public PCollection getSuccessfulBodies() { return this.successfulBodies; } @@ -698,6 +718,7 @@ public PCollection getSuccessfulBodies() { * * @return the failed inserts with err */ + @Override public PCollection> getFailedBodies() { return this.failedBodies; } @@ -707,6 +728,7 @@ public PCollection> getFailedBodies() { * * @return the failed GCS uri with err */ + @Override public PCollection> getFailedFiles() { return this.failedFiles; } @@ -922,7 +944,7 @@ public static Write executeBundles(ValueProvider fhirStore) { private static final Logger LOG = LoggerFactory.getLogger(Write.class); @Override - public Result expand(PCollection input) { + public AbstractResult expand(PCollection input) { switch (this.getWriteMethod()) { case IMPORT: LOG.warn( @@ -940,7 +962,12 @@ public Result expand(PCollection input) { return input.apply(new Import(getFhirStore(), tempPath, deadPath, contentStructure)); case EXECUTE_BUNDLE: default: - return input.apply(new ExecuteBundles(this.getFhirStore())); + return input + .apply( + MapElements.into(TypeDescriptor.of(FhirBundleParameter.class)) + .via(FhirBundleParameter::of)) + .setCoder(SerializableCoder.of(FhirBundleParameter.class)) + .apply(new ExecuteBundles(this.getFhirStore())); } } } @@ -1338,7 +1365,15 @@ public enum ContentStructure { } /** The type Execute bundles. */ - public static class ExecuteBundles extends Write { + public static class ExecuteBundles + extends PTransform, ExecuteBundlesResult> { + + /** The TupleTag used for bundles that were executed successfully. */ + public static final TupleTag SUCCESSFUL_BUNDLES = new TupleTag<>(); + + /** The TupleTag used for bundles that failed to be executed for any reason. */ + public static final TupleTag> FAILED_BUNDLES = + new TupleTag<>(); private final ValueProvider fhirStore; @@ -1347,48 +1382,35 @@ public static class ExecuteBundles extends Write { * * @param fhirStore the fhir store */ - ExecuteBundles(ValueProvider fhirStore) { + public ExecuteBundles(ValueProvider fhirStore) { this.fhirStore = fhirStore; } - @Override - ValueProvider getFhirStore() { - return fhirStore; - } - - @Override - WriteMethod getWriteMethod() { - return WriteMethod.EXECUTE_BUNDLE; - } - - @Override - Optional getContentStructure() { - return Optional.empty(); - } - - @Override - Optional> getImportGcsTempPath() { - return Optional.empty(); + public ExecuteBundles(String fhirStore) { + this.fhirStore = StaticValueProvider.of(fhirStore); } - @Override - Optional> getImportGcsDeadLetterPath() { - return Optional.empty(); + public ValueProvider getFhirStore() { + return fhirStore; } @Override - public FhirIO.Write.Result expand(PCollection input) { - PCollectionTuple bodies = + public ExecuteBundlesResult expand(PCollection input) { + PCollectionTuple bundles = input.apply( - ParDo.of(new ExecuteBundlesFn(fhirStore)) - .withOutputTags(Write.SUCCESSFUL_BODY, TupleTagList.of(Write.FAILED_BODY))); - bodies.get(Write.SUCCESSFUL_BODY).setCoder(StringUtf8Coder.of()); - bodies.get(Write.FAILED_BODY).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())); - return Write.Result.in(input.getPipeline(), bodies); + ParDo.of(new ExecuteBundlesFn(this.fhirStore)) + .withOutputTags(SUCCESSFUL_BUNDLES, TupleTagList.of(FAILED_BUNDLES))); + bundles.get(SUCCESSFUL_BUNDLES).setCoder(SerializableCoder.of(FhirBundleResponse.class)); + bundles + .get(FAILED_BUNDLES) + .setCoder(HealthcareIOErrorCoder.of(SerializableCoder.of(FhirBundleParameter.class))); + + return ExecuteBundlesResult.in( + input.getPipeline(), bundles.get(SUCCESSFUL_BUNDLES), bundles.get(FAILED_BUNDLES)); } /** The type Write Fhir fn. */ - static class ExecuteBundlesFn extends DoFn { + static class ExecuteBundlesFn extends DoFn { private static final Counter EXECUTE_BUNDLE_ERRORS = Metrics.counter( @@ -1439,22 +1461,22 @@ public void initClient() throws IOException { @ProcessElement public void executeBundles(ProcessContext context) { - String inputBody = context.element(); + String bundle = context.element().getBundle(); try { long startTime = Instant.now().toEpochMilli(); // Validate that data was set to valid JSON. - mapper.readTree(inputBody); - HttpBody resp = client.executeFhirBundle(fhirStore.get(), inputBody); + mapper.readTree(bundle); + HttpBody resp = client.executeFhirBundle(fhirStore.get(), bundle); EXECUTE_BUNDLE_LATENCY_MS.update(Instant.now().toEpochMilli() - startTime); - parseResponse(context, inputBody, resp); + parseResponse(context, resp); } catch (IOException | HealthcareHttpException e) { EXECUTE_BUNDLE_ERRORS.inc(); - context.output(Write.FAILED_BODY, HealthcareIOError.of(inputBody, e)); + context.output(FAILED_BUNDLES, HealthcareIOError.of(context.element(), e)); } } - private void parseResponse(ProcessContext context, String inputBody, HttpBody resp) + private void parseResponse(ProcessContext context, HttpBody resp) throws JsonProcessingException { JsonObject bundle = JsonParser.parseString(resp.toString()).getAsJsonObject(); String bundleType = bundle.getAsJsonPrimitive(BUNDLE_TYPE_FIELD).getAsString(); @@ -1481,20 +1503,22 @@ private void parseResponse(ProcessContext context, String inputBody, HttpBody re // 20X's are successes, otherwise failure. if (statusCode / 100 == 2) { success++; - context.output(Write.SUCCESSFUL_BODY, entry.toString()); + context.output( + SUCCESSFUL_BUNDLES, FhirBundleResponse.of(context.element(), entry.toString())); } else { fail++; context.output( - Write.FAILED_BODY, + FAILED_BUNDLES, HealthcareIOError.of( - inputBody, HealthcareHttpException.of(statusCode, entry.toString()))); + context.element(), HealthcareHttpException.of(statusCode, entry.toString()))); } } EXECUTE_BUNDLE_RESOURCE_SUCCESS.inc(success); EXECUTE_BUNDLE_RESOURCE_ERRORS.inc(fail); } else if (bundleType.equals(BUNDLE_RESPONSE_TYPE_TRANSACTION)) { EXECUTE_BUNDLE_RESOURCE_SUCCESS.inc(entries.size()); - context.output(Write.SUCCESSFUL_BODY, bundle.toString()); + context.output( + SUCCESSFUL_BUNDLES, FhirBundleResponse.of(context.element(), bundle.toString())); } EXECUTE_BUNDLE_SUCCESS.inc(); return; @@ -1513,6 +1537,106 @@ private int parseBundleStatus(String status) { } } + /** + * ExecuteBundlesResult contains both successfully executed bundles and information help debugging + * failed executions (eg metadata & error msgs). + */ + public static class ExecuteBundlesResult extends Write.AbstractResult { + + private final Pipeline pipeline; + private final PCollection successfulBundles; + private final PCollection> failedBundles; + + private ExecuteBundlesResult( + Pipeline pipeline, + PCollection successfulBundles, + PCollection> failedBundles) { + this.pipeline = pipeline; + this.successfulBundles = successfulBundles; + this.failedBundles = failedBundles; + } + + /** + * Entry point for the ExecuteBundlesResult, storing the successful and failed bundles and their + * metadata. + */ + public static ExecuteBundlesResult in( + Pipeline pipeline, + PCollection successfulBundles, + PCollection> failedBundles) { + return new ExecuteBundlesResult(pipeline, successfulBundles, failedBundles); + } + + @Override + public PCollection getSuccessfulBodies() { + return this.successfulBundles + .apply( + MapElements.into(TypeDescriptors.strings()) + .via(bundleResponse -> bundleResponse.getFhirBundleParameter().getBundle())) + .setCoder(StringUtf8Coder.of()); + } + + /** Gets successful FhirBundleResponse from execute bundles operation. */ + public PCollection getSuccessfulBundles() { + return this.successfulBundles; + } + + @Override + public PCollection> getFailedBodies() { + return this.failedBundles + .apply(ParDo.of(new GetStringHealthcareIOErrorFn())) + .setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())); + } + + /** + * Gets failed FhirBundleResponse wrapped inside HealthcareIOError. The bundle field could be + * null. + */ + public PCollection> getFailedBundles() { + return this.failedBundles; + } + + @Override + public PCollection> getFailedFiles() { + return super.pipeline.apply(Create.empty(HealthcareIOErrorCoder.of(StringUtf8Coder.of()))); + } + + @Override + public Pipeline getPipeline() { + return this.pipeline; + } + + @Override + public Map, PValue> expand() { + return ImmutableMap.of( + ExecuteBundles.SUCCESSFUL_BUNDLES, + successfulBundles, + ExecuteBundles.FAILED_BUNDLES, + failedBundles); + } + + @Override + public void finishSpecifyingOutput( + String transformName, PInput input, PTransform transform) {} + + private static class GetStringHealthcareIOErrorFn + extends DoFn, HealthcareIOError> { + + @ProcessElement + public void process(ProcessContext context) { + HealthcareIOError input = context.element(); + context.output( + new HealthcareIOError( + input.getDataResource().getBundle(), + input.getErrorMessage(), + input.getStackTrace(), + input.getObservedTime(), + input.getStatusCode())); + } + } + } + + /** Export FHIR resources from a FHIR store to new line delimited json files on GCS. */ /** * Export FHIR resources from a FHIR store to new line delimited json files on GCS or BigQuery. * Output PCollection contains the URI where the FHIR store was exported to. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java index b74eba1cca668..958974b58fa28 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java @@ -921,7 +921,7 @@ public enum FhirMethod { private String pageToken; private boolean isFirstRequest; - private FhirResourcePagesIterator( + public FhirResourcePagesIterator( FhirMethod fhirMethod, HealthcareApiClient client, String fhirStore, diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java index 6b7bb64fc21ce..4dc07ea746ae3 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java @@ -113,7 +113,7 @@ public void test_FhirIO_failedWrites() { PCollection fhirBundles = pipeline.apply(Create.of(emptyMessages)); - FhirIO.Write.Result writeResult = + FhirIO.Write.AbstractResult writeResult = fhirBundles.apply( FhirIO.Write.executeBundles( "projects/foo/locations/us-central1/datasets/bar/hl7V2Stores/baz")); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOWriteIT.java index 831aae38fad96..1729025e48e25 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOWriteIT.java @@ -28,7 +28,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.stream.Collectors; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.ExecuteBundles; +import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.ExecuteBundlesResult; import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.Import.ContentStructure; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; @@ -108,7 +112,7 @@ public static void teardownBucket() throws IOException { @Test public void testFhirIO_ExecuteBundle() throws IOException { - FhirIO.Write.Result writeResult = + FhirIO.Write.AbstractResult writeResult = pipeline .apply(Create.of(BUNDLES.get(version))) .apply(FhirIO.Write.executeBundles(options.getFhirStore())); @@ -120,11 +124,15 @@ public void testFhirIO_ExecuteBundle() throws IOException { @Test public void testFhirIO_ExecuteBundle_parseResponse() { - List bundles = BUNDLES.get("BUNDLE_PARSE_TEST"); - FhirIO.Write.Result writeResult = + List bundles = + BUNDLES.get("BUNDLE_PARSE_TEST").stream() + .map(bundle -> FhirBundleParameter.of(bundle)) + .collect(Collectors.toList()); + + ExecuteBundlesResult writeResult = pipeline - .apply(Create.of(bundles)) - .apply(FhirIO.Write.executeBundles(options.getFhirStore())); + .apply(Create.of(bundles).withCoder(SerializableCoder.of(FhirBundleParameter.class))) + .apply(new ExecuteBundles(options.getFhirStore())); PAssert.that(writeResult.getSuccessfulBodies()) .satisfies( @@ -137,6 +145,19 @@ public void testFhirIO_ExecuteBundle_parseResponse() { assertEquals(2, counter); return null; }); + + PAssert.that(writeResult.getSuccessfulBundles()) + .satisfies( + input -> { + int counter = 0; + for (FhirBundleResponse resp : input) { + assertFalse(resp.getResponse().isEmpty()); + counter++; + } + assertEquals(2, counter); + return null; + }); + PAssert.that(writeResult.getFailedBodies()) .satisfies( input -> { @@ -158,7 +179,7 @@ public void testFhirIO_Import() { if (options.getTempLocation() == null) { options.setTempLocation("gs://temp-storage-for-healthcare-io-tests"); } - FhirIO.Write.Result result = + FhirIO.Write.AbstractResult result = pipeline .apply(Create.of(BUNDLES.get(version))) .apply(