Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-11322] Apache Beam Example to tokenize sensitive data #13995

Merged
merged 34 commits into from
Mar 31, 2021
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
45ee19f
[WIP] Transfer from DataflowTemplates to Beam
Jan 11, 2021
ba046cb
Move to beam repo
Jan 11, 2021
df5881f
moved convertors for GCSio
Jan 12, 2021
e6652ab
Merge branch 'master' into DataTokenizationExample
ilya-kozyrev Jan 12, 2021
1e9c336
Renaming + readme
ilya-kozyrev Jan 12, 2021
76accdc
build errors
Jan 13, 2021
e464202
Merge remote-tracking branch 'origin/DataTokenizationExample' into Da…
Jan 13, 2021
8782cba
minimize suppress
Jan 13, 2021
46805f3
remove UDF usages
ilya-kozyrev Jan 14, 2021
d9a8e33
Fixes for stylechecks
Jan 14, 2021
76af970
grooming for data protectors
ilya-kozyrev Jan 14, 2021
e556e37
Merge branch 'DataTokenizationExample' of github.com:akvelon/beam int…
ilya-kozyrev Jan 14, 2021
bfcba7f
grooming for data protectors
ilya-kozyrev Jan 14, 2021
cfa41a7
fix javadoc
ilya-kozyrev Jan 14, 2021
c8200f4
Added support for window writing; Fixed ordering in tokenization process
Jan 15, 2021
6e8f62f
supressed checkstyle errors for BigTableIO class
ramazan-yapparov Jan 15, 2021
b489cc7
add data tokenization tests
Jan 18, 2021
ee93d14
Changed GCS to FileSystem and removed redundant function from Schemas…
Jan 18, 2021
4b59ed9
Updated README.md for local run with BigQuery sink
Jan 19, 2021
2e0efe0
add docstring
ilya-kozyrev Jan 20, 2021
3136249
Updated README.md
Jan 22, 2021
606ecb2
Updated README.md and added javadoc for the main pipeline class
Jan 25, 2021
2b30240
remove unused test case
Jan 27, 2021
329c3e6
Merge remote-tracking branch 'origin/DataTokenizationExample' into Da…
Jan 27, 2021
341ae1e
Merge branch 'master' into DataTokenizationExample
Feb 16, 2021
3002f7c
Style fix
Feb 16, 2021
7b7664e
Whitespaces fix
Feb 16, 2021
aeb471e
Fixed undeclared dependencies and excluded .csv resource files from l…
Feb 17, 2021
a923846
Fix for incorrect rpc url
MikhailMedvedevAkvelon Mar 18, 2021
5a5d0c4
Fix for nullable types
MikhailMedvedevAkvelon Mar 18, 2021
6ac4193
Data tokenization example group into batches (#11)
Nuzhdina-Elena Mar 22, 2021
906e6e7
code format fixed
Nuzhdina-Elena Mar 22, 2021
9ad7e12
Getting value from environment variables for maxBufferingDurationMs. …
Nuzhdina-Elena Mar 23, 2021
3ab9289
[DATAFLOW-139] Incorrect DSG url lead to NPE (#13)
MikhailMedvedevAkvelon Mar 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ tasks.rat {
"**/test.avsc",
"**/user.avsc",
"**/test/resources/**/*.txt",
"**/test/resources/**/*.csv",
"**/test/**/.placeholder",

// Default eclipse excludes neglect subprojects
Expand Down
13 changes: 13 additions & 0 deletions examples/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,15 @@ dependencies {
compile library.java.google_code_gson
compile library.java.google_http_client
compile library.java.google_oauth_client
compile library.java.jackson_databind
compile library.java.joda_time
compile library.java.protobuf_java
compile library.java.proto_google_cloud_bigtable_v2
compile library.java.proto_google_cloud_datastore_v1
compile library.java.slf4j_api
provided library.java.commons_io
provided library.java.commons_csv
runtime project(path: ":runners:direct-java", configuration: "shadow")
compile library.java.vendored_grpc_1_26_0
compile library.java.vendored_guava_26_0_jre
compile "com.google.api.grpc:proto-google-cloud-language-v1:1.81.4"
Expand All @@ -83,6 +89,7 @@ dependencies {
// "spotbugs-annotations:3.1.12" used in Beam. Not required.
exclude group: "org.apache.zookeeper", module: "zookeeper"
}
compile "org.apache.commons:commons-lang3:3.9"
compile "org.apache.httpcomponents:httpclient:4.5.13"
compile "org.apache.httpcomponents:httpcore:4.4.13"
testCompile project(path: ":runners:direct-java", configuration: "shadow")
Expand Down Expand Up @@ -149,3 +156,9 @@ task preCommit() {
}
}

task execute (type:JavaExec) {
main = System.getProperty("mainClass")
classpath = sourceSets.main.runtimeClasspath
systemProperties System.getProperties()
args System.getProperty("exec.args", "").split()
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Apache Beam pipeline example to tokenize data using remote RPC server

This directory contains an Apache Beam example that creates a pipeline to read data from one of
the supported sources, tokenize data with external API calls to remote RPC server, and write data into one of the supported sinks.

Supported data formats:

- JSON
- CSV

Supported input sources:

- File system
- [Google Pub/Sub](https://cloud.google.com/pubsub)

Supported destination sinks:

- File system
- [Google Cloud BigQuery](https://cloud.google.com/bigquery)
- [Cloud BigTable](https://cloud.google.com/bigtable)

Supported data schema format:

- JSON with an array of fields described in BigQuery format

In the main scenario, the template will create an Apache Beam pipeline that will read data in CSV or
JSON format from a specified input source, send the data to an external processing server, receive
processed data, and write it into a specified output sink.

## Requirements

- Java 8
- 1 of supported sources to read data from
- 1 of supported destination sinks to write data into
- A configured RPC to tokenize data

## Getting Started

This section describes what is needed to get the template up and running.

- Gradle preparation
- Local execution
- Running as a Dataflow Template
- Setting Up Project Environment
- Build Data Tokenization Dataflow Flex Template
- Creating the Dataflow Flex Template
- Executing Template

## Gradle preparation

To run this example your `build.gradle` file should contain the following task to execute the pipeline:

```
task execute (type:JavaExec) {
main = System.getProperty("mainClass")
classpath = sourceSets.main.runtimeClasspath
systemProperties System.getProperties()
args System.getProperty("exec.args", "").split()
}
```

This task allows to run the pipeline via the following command:

```bash
gradle clean execute -DmainClass=org.apache.beam.examples.complete.datatokenization.DataTokenization \
-Dexec.args="--<argument>=<value> --<argument>=<value>"
```

Comment on lines +83 to +87
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these instructions are nice and useful. I worry that users will not find out about this example. Do you have plans to blog about it, or add any extra documentation for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we plan to spread the word and blog about it.

## Running the pipeline

To execute this pipeline, specify the parameters:

- Data schema
- **dataSchemaPath**: Path to data schema (JSON format) compatible with BigQuery.
- 1 specified input source out of these:
- File System
- **inputFilePattern**: Filepattern for files to read data from
- **inputFileFormat**: File format of input files. Supported formats: JSON, CSV
- In case if input data is in CSV format:
- **csvContainsHeaders**: `true` if file(s) in bucket to read data from contain headers,
and `false` otherwise
- **csvDelimiter**: Delimiting character in CSV. Default: use delimiter provided in
csvFormat
- **csvFormat**: Csv format according to Apache Commons CSV format. Default is:
[Apache Commons CSV default](https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.html#DEFAULT)
. Must match format names exactly found
at: https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.Predefined.html
- Google Pub/Sub
- **pubsubTopic**: The Cloud Pub/Sub topic to read from, in the format of '
projects/yourproject/topics/yourtopic'
- 1 specified output sink out of these:
- File System
- **outputDirectory**: Directory to write data to
- **outputFileFormat**: File format of output files. Supported formats: JSON, CSV
- **windowDuration**: The window duration in which data will be written. Should be specified
only for 'Pub/Sub -> GCS' case. Defaults to 30s.

Allowed formats are:
- Ns (for seconds, example: 5s),
- Nm (for minutes, example: 12m),
- Nh (for hours, example: 2h).
- Google Cloud BigQuery
- **bigQueryTableName**: Cloud BigQuery table name to write into
- **tempLocation**: Folder in a Google Cloud Storage bucket, which is needed for
BigQuery to handle data writing
- Cloud BigTable
- **bigTableProjectId**: Id of the project where the Cloud BigTable instance to write into
is located
- **bigTableInstanceId**: Id of the Cloud BigTable instance to write into
- **bigTableTableId**: Id of the Cloud BigTable table to write into
- **bigTableKeyColumnName**: Column name to use as a key in Cloud BigTable
- **bigTableColumnFamilyName**: Column family name to use in Cloud BigTable
- RPC server parameters
- **rpcUri**: URI for the API calls to RPC server
- **batchSize**: Size of the batch to send to RPC server per request

The template allows for the user to supply the following optional parameter:

- **nonTokenizedDeadLetterPath**: Folder where failed to tokenize data will be stored


in the following format:

```bash
--dataSchemaPath="path-to-data-schema-in-json-format"
--inputFilePattern="path-pattern-to-input-data"
--outputDirectory="path-to-output-directory"
# example for CSV case
--inputFileFormat="CSV"
--outputFileFormat="CSV"
--csvContainsHeaders="true"
--nonTokenizedDeadLetterPath="path-to-errors-rows-writing"
--batchSize=batch-size-number
--rpcUri=http://host:port/tokenize
```

By default, this will run the pipeline locally with the DirectRunner. To change the runner, specify:

```bash
--runner=YOUR_SELECTED_RUNNER
```

See the [documentation](http://beam.apache.org/get-started/quickstart/) and
the [Examples README](../../../../../../../../../README.md) for more information about how to run this example.

## Running as a Dataflow Template

This example also exists as Google Dataflow Template, which you can build and run using Google Cloud Platform. See
this template documentation [README.md](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/v2/protegrity-data-tokenization/README.md) for
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see anything in this address. Will it be added later?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious how the template here and in DataflowTemplates will be different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we have a PR in the DataflowTemplates repository and it is focused on execution via Google Cloud Dataflow runner. The Beam version, on the other hand, is a bit more generic.

more information.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.complete.datatokenization.options;

import org.apache.beam.examples.complete.datatokenization.transforms.io.BigTableIO;
import org.apache.beam.examples.complete.datatokenization.transforms.io.FileSystemIO.FileSystemPipelineOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;

/**
* The {@link DataTokenizationOptions} interface provides the custom execution options passed by the
* executor at the command-line.
*/
public interface DataTokenizationOptions
extends PipelineOptions, FileSystemPipelineOptions, BigTableIO.BigTableOptions {

@Description("Path to data schema (JSON format) compatible with BigQuery.")
String getDataSchemaPath();

void setDataSchemaPath(String dataSchemaPath);

@Description(
"The Cloud Pub/Sub topic to read from."
+ "The name should be in the format of "
+ "projects/<project-id>/topics/<topic-name>.")
String getPubsubTopic();

void setPubsubTopic(String pubsubTopic);

@Description("Cloud BigQuery table name to write into.")
String getBigQueryTableName();

void setBigQueryTableName(String bigQueryTableName);

// Tokenization API specific parameters
@Description("URI for the API calls to RPC server.")
String getRpcUri();

void setRpcUri(String dsgUri);

@Description("Size of the batch to send to RPC server per request.")
@Default.Integer(10)
Integer getBatchSize();

void setBatchSize(Integer batchSize);

@Description("Dead-Letter path to store not-tokenized data")
String getNonTokenizedDeadLetterPath();

void setNonTokenizedDeadLetterPath(String nonTokenizedDeadLetterPath);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (C) 2020 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

/** Protegrity Data Tokenization template for Google Cloud Teleport. */
package org.apache.beam.examples.complete.datatokenization.options;
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (C) 2020 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

/** Protegrity Data Tokenization template for Google Cloud Teleport. */
package org.apache.beam.examples.complete.datatokenization;
Loading