Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-11322] Apache Beam Example to tokenize sensitive data #13995

Merged
merged 34 commits into from
Mar 31, 2021

Conversation

KhaninArtur
Copy link
Contributor

Some users may want to protect their sensitive data using tokenization.

We propose to create a Beam example template that will demonstrate Beam transform to protect sensitive data using tokenization. In our example, we use an external service for the data tokenization.

At a high level, a pipeline that:

  • supports batch (GCS) and streaming (Pub/Sub) input sources
  • tokenizes sensitive data via external REST service - we are about to use Protegrity
  • outputs tokenized data into BigQuery or BigTable

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status Build Status Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status --- --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

Artur Khanin and others added 28 commits January 11, 2021 12:33
…taTokenizationExample

# Conflicts:
#	examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/DataTokenization.java
#	examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java
#	examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/JavascriptTextTransformer.java
…o DataTokenizationExample

� Conflicts:
�	examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java
# Conflicts:
#	examples/java/build.gradle
@pabloem
Copy link
Member

pabloem commented Feb 25, 2021

hi @KhaninArtur ! thanks for your contribution - is this PR ready to be reviewed?

@KhaninArtur
Copy link
Contributor Author

Hi @pabloem! Yes, this PR is ready to be reviewed 👍🏼

@KhaninArtur
Copy link
Contributor Author

Hi, @pabloem!
Are there any updates regarding this PR?

Copy link
Member

@pabloem pabloem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I apologize about the long delay. I took a quick superficial look - and I added some actionable comments. I'm happy to iterate on this.
Thanks!

Comment on lines +83 to +87
```bash
gradle clean execute -DmainClass=org.apache.beam.examples.complete.datatokenization.DataTokenization \
-Dexec.args="--<argument>=<value> --<argument>=<value>"
```

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these instructions are nice and useful. I worry that users will not find out about this example. Do you have plans to blog about it, or add any extra documentation for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we plan to spread the word and blog about it.

## Running as a Dataflow Template

This example also exists as Google Dataflow Template, which you can build and run using Google Cloud Platform. See
this template documentation [README.md](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/v2/protegrity-data-tokenization/README.md) for
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see anything in this address. Will it be added later?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious how the template here and in DataflowTemplates will be different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we have a PR in the DataflowTemplates repository and it is focused on execution via Google Cloud Dataflow runner. The Beam version, on the other hand, is a bit more generic.

import org.slf4j.LoggerFactory;

/** The {@link FileSystemIO} class to read/write data from/into File Systems. */
public class FileSystemIO {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this class meant to be generic? Or specific for this template? I see the class is within the template package - I am just wondering if we should name the class TokenizationFileIO or something that states clearly that these transforms are meant only to be used for the data tokenization template?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got you, renamed it.

import org.slf4j.LoggerFactory;

/** The {@link BigTableIO} class for writing data from template to BigTable. */
public class BigTableIO {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as with FileSystemIO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

import org.slf4j.LoggerFactory;

/** The {@link BigQueryIO} class for writing data from template to BigTable. */
public class BigQueryIO {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as FileSystemIO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 218 to 241
@ProcessElement
public void process(
ProcessContext context,
BoundedWindow window,
@StateId("buffer") BagState<Row> bufferState,
@StateId("count") ValueState<Integer> countState,
@TimerId("expiry") Timer expiryTimer) {

expiryTimer.set(window.maxTimestamp());

int count = firstNonNull(countState.read(), 0);
count++;
countState.write(count);
bufferState.add(context.element().getValue());

if (count >= batchSize) {
processBufferedRows(bufferState.read(), context);
bufferState.clear();
countState.clear();
}
}

@SuppressWarnings("argument.type.incompatible")
private void processBufferedRows(Iterable<Row> rows, WindowedContext context) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that this DoFn does a lot of its own buffering. Have you considered using GroupIntoBatches[1] for this? GroupIntoBatches has the same sort of buffering/counting/timer emission logic, but it receives more updates (e.g. it recently started supporting 'autosharding' which lets runners decouple the number of shards from the number of keys).

Think about it - but I recommend you try using GroupIntoBatches, as you will get some extra nice benefits from it.

[1] https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/GroupIntoBatches.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much for your suggestion, @pabloem! GroupIntoBatches works really great, replaced Stateful DoFn with it.

* GroupIntoBatches was used in the data tokenization pipeline

* io files were renamed for the data tokenization template
@KhaninArtur
Copy link
Contributor Author

retest this please

1 similar comment
@KhaninArtur
Copy link
Contributor Author

retest this please

@pabloem
Copy link
Member

pabloem commented Mar 22, 2021

taking a look today

Nuzhdina-Elena and others added 2 commits March 23, 2021 10:50
…Information about it added to README (#14)

Getting value from environment variables for maxBufferingDurationMs
* Fix bug incorrect DSG url lead to NPE DATAFLOW-139
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants