Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-14532] Add integration testing to fhirio Read transform #17803

Merged
merged 14 commits into from
Jun 13, 2022
Merged
44 changes: 44 additions & 0 deletions sdks/go/data/fhir_bundles/batch-success-failure.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{
"resourceType": "Bundle",
"type": "batch",
"entry": [
{
"request": {
"method": "POST",
"url": "Patient"
},
"resource": {
"name": [
{
"use": "official",
"given": [
"John"
]
}
],
"gender": "male",
"birthDate": "1973-01-21",
"resourceType": "Patient"
}
},
{
"request": {
"method": "PUT",
"url": "Patient"
},
"resource": {
"name": [
{
"use": "official",
"given": [
"Alice"
]
}
],
"gender": "female",
"birthDate": "1970-01-01",
"resourceType": "Patient"
}
}
]
}
44 changes: 44 additions & 0 deletions sdks/go/data/fhir_bundles/transaction-failure.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{
"resourceType": "Bundle",
"type": "transaction",
"entry": [
{
"request": {
"method": "POST",
"url": "Patient"
},
"resource": {
"name": [
{
"use": "official",
"given": [
"John"
]
}
],
"gender": "male",
"birthDate": "1973-01-21",
"resourceType": "Patient"
}
},
{
"request": {
"method": "PUT",
"url": "Patient"
},
"resource": {
"name": [
{
"use": "official",
"given": [
"Alice"
]
}
],
"gender": "female",
"birthDate": "1970-01-01",
"resourceType": "Patient"
}
}
]
}
44 changes: 44 additions & 0 deletions sdks/go/data/fhir_bundles/transaction-success.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{
"resourceType": "Bundle",
"type": "transaction",
"entry": [
{
"request": {
"method": "POST",
"url": "Patient"
},
"resource": {
"name": [
{
"use": "official",
"given": [
"John"
]
}
],
"gender": "male",
"birthDate": "1973-01-21",
"resourceType": "Patient"
}
},
{
"request": {
"method": "POST",
"url": "Patient"
},
"resource": {
"name": [
{
"use": "official",
"given": [
"Alice"
]
}
],
"gender": "female",
"birthDate": "1970-01-01",
"resourceType": "Patient"
}
}
]
}
4 changes: 2 additions & 2 deletions sdks/go/pkg/beam/io/fhirio/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
)

const (
UserAgent = "apache-beam-io-google-cloud-platform-healthcare/" + core.SdkVersion
baseMetricPrefix = "fhirio/"
userAgent = "apache-beam-io-google-cloud-platform-healthcare/" + core.SdkVersion
)

type fhirStoreClient interface {
Expand All @@ -41,7 +41,7 @@ type fhirStoreClientImpl struct {
}

func newFhirStoreClient() *fhirStoreClientImpl {
healthcareService, err := healthcare.NewService(context.Background(), option.WithUserAgent(userAgent))
healthcareService, err := healthcare.NewService(context.Background(), option.WithUserAgent(UserAgent))
if err != nil {
panic("Failed to initialize Google Cloud Healthcare Service. Reason: " + err.Error())
}
Expand Down
17 changes: 12 additions & 5 deletions sdks/go/pkg/beam/io/fhirio/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,22 @@ func TestRead(t *testing.T) {
})
pipelineResult := ptest.RunAndValidate(t, p)
counterResults := pipelineResult.Metrics().AllMetrics().Counters()

if len(counterResults) != 1 {
t.Fatal("Only one counter should have been used")
t.Fatalf("counterResults got length %v, expected %v", len(counterResults), 1)
}
if counterResults[0].Name() != "fhirio/read_resource_error_count" {
t.Fatal("Only error counter should have been used")
counterResult := counterResults[0]

expectedCounterName := "fhirio/read_resource_error_count"
if counterResult.Name() != expectedCounterName {
t.Fatalf("counterResult.Name() is '%v', expected '%v'", counterResult.Name(), expectedCounterName)
}
if counterResults[0].Result() != int64(len(testResourcePaths)) {
t.Fatal("Counter should have been incremented by the number of test resource paths")

expectedCounterResult := int64(len(testResourcePaths))
if counterResult.Result() != expectedCounterResult {
t.Fatalf("counterResult.Result() is %v, expected %v", counterResult.Result(), expectedCounterResult)
}

})
}
}
8 changes: 8 additions & 0 deletions sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ var portableFilters = []string{
"TestKafkaIO.*",
// The portable runner does not support self-checkpointing
"TestCheckpointing",
// FhirIO currently only supports Dataflow and Direct runners
"TestFhirIO_.*",
}

var flinkFilters = []string{
Expand All @@ -101,6 +103,8 @@ var flinkFilters = []string{
"TestDebeziumIO_BasicRead",
// Triggers are not yet supported
"TestTrigger.*",
// FhirIO currently only supports Dataflow and Direct runners
"TestFhirIO_.*",
}

var samzaFilters = []string{
Expand All @@ -116,6 +120,8 @@ var samzaFilters = []string{
"TestWordCount.*",
// The Samza runner does not support self-checkpointing
"TestCheckpointing",
// FhirIO currently only supports Dataflow and Direct runners
"TestFhirIO_.*",
}

var sparkFilters = []string{
Expand All @@ -132,6 +138,8 @@ var sparkFilters = []string{
"TestDebeziumIO_BasicRead",
// The spark runner does not support self-checkpointing
"TestCheckpointing",
// FhirIO currently only supports Dataflow and Direct runners
"TestFhirIO_.*",
}

var dataflowFilters = []string{
Expand Down
46 changes: 46 additions & 0 deletions sdks/go/test/integration/io/fhirio/fhirio.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package fhirio contains integration tests for FhirIO transforms.
package fhirio

import (
"strings"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/fhirio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)

func ReadPipeline(testResourcePaths []string) *beam.Pipeline {
lnogueir marked this conversation as resolved.
Show resolved Hide resolved
p, s, resourcePaths := ptest.CreateList(testResourcePaths)
resources, failedReads := fhirio.Read(s, resourcePaths)
passert.Empty(s, failedReads)
passert.Count(s, resources, "", len(testResourcePaths))
return p
}

func InvalidReadPipeline(fhirStorePath string) *beam.Pipeline {
invalidResourcePath := fhirStorePath + "/fhir/Patient/invalid"
p, s, resourcePaths := ptest.CreateList([]string{invalidResourcePath})
resources, failedReads := fhirio.Read(s, resourcePaths)
passert.Count(s, failedReads, "", 1)
passert.Empty(s, resources)
passert.True(s, failedReads, func(errorMsg string) bool {
return strings.Contains(errorMsg, "bad status [404]")
})
return p
}
Loading