From 6a0d394f25d7c2cdad2f6f356bddfd0915670089 Mon Sep 17 00:00:00 2001 From: Sean Shriver Date: Thu, 14 Nov 2024 09:18:16 -0600 Subject: [PATCH] Keyspaces to DynamoDB --- .../aws-glue/keyspaces-to-dynamodb/README.md | 20 +++ .../glue-job-ks-to-dynamodb.yaml | 102 +++++++++++++++ .../migration-sample.scala | 117 ++++++++++++++++++ .../keyspaces-to-dynamodb/setup-migration.sh | 46 +++++++ 4 files changed, 285 insertions(+) create mode 100644 scala/datastax-v4/aws-glue/keyspaces-to-dynamodb/README.md create mode 100644 scala/datastax-v4/aws-glue/keyspaces-to-dynamodb/glue-job-ks-to-dynamodb.yaml create mode 100644 scala/datastax-v4/aws-glue/keyspaces-to-dynamodb/migration-sample.scala create mode 100755 scala/datastax-v4/aws-glue/keyspaces-to-dynamodb/setup-migration.sh diff --git a/scala/datastax-v4/aws-glue/keyspaces-to-dynamodb/README.md b/scala/datastax-v4/aws-glue/keyspaces-to-dynamodb/README.md new file mode 100644 index 0000000..864e283 --- /dev/null +++ b/scala/datastax-v4/aws-glue/keyspaces-to-dynamodb/README.md @@ -0,0 +1,20 @@ +## Using Glue for migration example +This example provides scala script for directly copying an Amazon Keyspaces table to DynamoDB. This allows you to migrate data to DynamoDB without setting up a spark cluster. + +## Prerequisites +* Setup Spark Cassandra connector using provided [setup script](../) + +### Setup migration from Keyspaces to DynamoDB +The following script sets up AWS Glue job to directly copy the Keyspaces table to DynamoDB. The script takes the following parameters +* PARENT_STACK_NAME is the stack name used to create the spark cassandra connector with Glue. [setup script](../) +* MIGRATION_STACK_NAME is the stack name used to create migration glue job. +* KEYSPACE_NAME and TABLE_NAME Keyspaces and table is the fully qualified name of the table you wish to migrate. +* DYNAMODB_TABLE_NAME is the name of the DynamoDB table the data should be written to. +* DYNAMODB_WRITE_UNITS is the target write throughput for the job. This setting is agnostic to the capacity mode (provisioned on on-demand) of your DynamoDB table. + +```shell +./setup-migration.sh SETUP_STACK_NAME MIGRATION_STACK_NAME KEYSPACE_TABLE TABLE_NAME DYNAMODB_TABLE_NAME DYNAMODB_WRITE_UNITS + +``` + +By default this script will copy all rows and all columns into DynamoDB. You can choose any arbitrary throughput, but you must increase the requested number of Glue workers through trial and error to achieve your desired throughput in DynamoDB. \ No newline at end of file diff --git a/scala/datastax-v4/aws-glue/keyspaces-to-dynamodb/glue-job-ks-to-dynamodb.yaml b/scala/datastax-v4/aws-glue/keyspaces-to-dynamodb/glue-job-ks-to-dynamodb.yaml new file mode 100644 index 0000000..3dadd31 --- /dev/null +++ b/scala/datastax-v4/aws-glue/keyspaces-to-dynamodb/glue-job-ks-to-dynamodb.yaml @@ -0,0 +1,102 @@ +AWSTemplateFormatVersion: 2010-09-09 +Description: 'Directly copy Keyspaces rows to DynamoDB' +Parameters: + KeyspaceName: + NoEcho: false + Description: Cassandra Keyspace name + Type: String + Default: mykeyspace + MinLength: 3 + MaxLength: 48 + TableName: + NoEcho: false + Description: Cassandra Table name + Type: String + Default: mytable + MinLength: 3 + MaxLength: 48 + ParentStack: + NoEcho: false + Description: Stack used to setup the spark cassandra connector + Type: String + Default: aksglue1 + MinLength: 3 + MaxLength: 48 + DynamoDBTableName: + NoEcho: false + Description: The DynamoDB table name to write the data into + Type: String + DynamoDBWriteUnits: + NoEcho: false + Description: Target write capacity units for the DynamoDB table + Type: Number + Default: 1000 + MaxValue: 80000 +Resources: + GlueJob: + Type: AWS::Glue::Job + Properties: + Command: + Name: glueetl + ScriptLocation: !Sub + - "s3://${IMPORTBUCKETNAME}/scripts/${ParentStack}-${AWS::StackName}-migration.scala" + - IMPORTBUCKETNAME: + Fn::ImportValue: + !Sub 'KeyspacesBucketNameExport-${ParentStack}' + DefaultArguments: + "--job-language": "scala" + "--user-jars-first": "true" + "--extra-jars": !Sub + - 's3://${IMPORTBUCKETNAME}/jars/spark-cassandra-connector-assembly_2.12-3.1.0.jar,s3://${IMPORTBUCKETNAME}/jars/aws-sigv4-auth-cassandra-java-driver-plugin-4.0.9-shaded.jar,s3://${IMPORTBUCKETNAME}/jars/spark-extension_2.12-2.8.0-3.4.jar,s3://${IMPORTBUCKETNAME}/jars/amazon-keyspaces-helpers-1.0-SNAPSHOT.jar' + - IMPORTBUCKETNAME: + Fn::ImportValue: + !Sub 'KeyspacesBucketNameExport-${ParentStack}' + "--extra-files": !Sub + - 's3://${IMPORTBUCKETNAME}/conf/keyspaces-application.conf' + - IMPORTBUCKETNAME: + Fn::ImportValue: + !Sub 'KeyspacesBucketNameExport-${ParentStack}' + "--enable-metrics": "true" + "--enable-continuous-cloudwatch-log": "true" + "--enable-spark-ui": "true" + "--spark-event-logs-path": !Sub + - "s3://${IMPORTBUCKETNAME}/spark-logs/" + - IMPORTBUCKETNAME: + Fn::ImportValue: + !Sub 'KeyspacesBucketNameExport-${ParentStack}' + "--write-shuffle-files-to-s3": "true" + "--write-shuffle-spills-to-s3": "true" + "--TempDir": !Sub + - 's3://${IMPORTBUCKETNAME}/shuffle-space/export-sample/' + - IMPORTBUCKETNAME: + Fn::ImportValue: + !Sub 'KeyspacesBucketNameExport-${ParentStack}' + "--DYNAMODB_WRITE_UNITS": !Sub '${DynamoDBWriteUnits}' + "--KEYSPACE_NAME": !Sub '${KeyspaceName}' + "--TABLE_NAME": !Sub '${TableName}' + "--DYNAMODB_TABLE_NAME": !Sub '${DynamoDBTableName}' + "--DRIVER_CONF": "keyspaces-application.conf" + #"--DISTINCT_KEYS": "id,create_date" + "--class": "GlueApp" + #Connections: + # ConnectionsList + Description: 'export to s3' + #ExecutionClass: String + #ExecutionProperty: + #ExecutionProperty + GlueVersion: "3.0" + #LogUri: String + #MaxCapacity: Double + #MaxRetries: Double + Name: !Sub ['AmazonKeyspacesExportToS3-${STACKNAME}', STACKNAME: !Join [ "-", [!Ref ParentStack, !Ref AWS::StackName]]] + #NonOverridableArguments: Json + #NotificationProperty: + #NotificationProperty + NumberOfWorkers: 2 + Role: + Fn::ImportValue: + !Sub 'KeyspacesGlueJobServiceRoleExport-${ParentStack}' + #SecurityConfiguration: String + #Tags: Json + #Timeout: Integer + WorkerType: G.2X \ No newline at end of file diff --git a/scala/datastax-v4/aws-glue/keyspaces-to-dynamodb/migration-sample.scala b/scala/datastax-v4/aws-glue/keyspaces-to-dynamodb/migration-sample.scala new file mode 100644 index 0000000..714a9d9 --- /dev/null +++ b/scala/datastax-v4/aws-glue/keyspaces-to-dynamodb/migration-sample.scala @@ -0,0 +1,117 @@ +import com.amazonaws.services.glue.GlueContext +import com.amazonaws.services.glue.util.GlueArgParser +import com.amazonaws.services.glue.util.Job +import org.apache.spark.SparkContext +import org.apache.spark.SparkConf +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.Row +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.from_json +import org.apache.spark.sql.streaming.Trigger +import scala.collection.JavaConverters._ +import com.datastax.spark.connector._ +import org.apache.spark.sql.cassandra._ +import org.apache.spark.sql.SaveMode._ +import com.datastax.spark.connector._ +import com.datastax.spark.connector.cql._ +import com.datastax.oss.driver.api.core._ +import org.apache.spark.sql.functions.rand +import com.amazonaws.services.glue.log.GlueLogger +import java.time.ZonedDateTime +import java.time.ZoneOffset +import java.time.temporal.ChronoUnit +import java.time.format.DateTimeFormatter +import scala.collection.JavaConverters._ +import com.amazonaws.services.glue.DynamicFrame +import com.amazonaws.services.glue.util.JsonOptions + + +object GlueApp { + + def main(sysArgs: Array[String]) { + + val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "KEYSPACE_NAME", "TABLE_NAME", "DRIVER_CONF", "DYNAMODB_TABLE_NAME", "DYNAMODB_WRITE_UNITS").toArray) + + val driverConfFileName = args("DRIVER_CONF") + + val conf = new SparkConf() + .setAll( + Seq( + ("spark.task.maxFailures", "100"), + + ("spark.cassandra.connection.config.profile.path", driverConfFileName), + ("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions"), + ("directJoinSetting", "on"), + + ("spark.cassandra.output.consistency.level", "LOCAL_QUORUM"),//WRITES + ("spark.cassandra.input.consistency.level", "LOCAL_ONE"),//READS + + ("spark.cassandra.sql.inClauseToJoinConversionThreshold", "0"), + ("spark.cassandra.sql.inClauseToFullScanConversionThreshold", "0"), + ("spark.cassandra.concurrent.reads", "50"), + + ("spark.cassandra.output.concurrent.writes", "5"), + ("spark.cassandra.output.batch.grouping.key", "none"), + ("spark.cassandra.output.batch.size.rows", "1"), + ("spark.cassandra.output.batch.size.rows", "1"), + ("spark.cassandra.output.ignoreNulls", "true") + )) + + + val spark: SparkContext = new SparkContext(conf) + val glueContext: GlueContext = new GlueContext(spark) + val sparkSession: SparkSession = glueContext.getSparkSession + + import sparkSession.implicits._ + + Job.init(args("JOB_NAME"), glueContext, args.asJava) + + val logger = new GlueLogger + + //validation steps for peers and partitioner + val connector = CassandraConnector.apply(conf); + val session = connector.openSession(); + val peersCount = session.execute("SELECT * FROM system.peers").all().size() + + val partitioner = session.execute("SELECT partitioner from system.local").one().getString("partitioner") + + logger.info("Total number of seeds:" + peersCount) + logger.info("Configured partitioner:" + partitioner) + + if(peersCount == 0){ + throw new Exception("No system peers found. Check required permissions to read from the system.peers table. If using VPCE check permissions for describing VPCE endpoints. https://docs.aws.amazon.com/keyspaces/latest/devguide/vpc-endpoints.html") + } + + if(partitioner.equals("com.amazonaws.cassandra.DefaultPartitioner")){ + throw new Exception("Sark requires the use of RandomPartitioner or Murmur3Partitioner. See Working with partioners in Amazon Keyspaces documentation. https://docs.aws.amazon.com/keyspaces/latest/devguide/working-with-partitioners.html") + } + + val tableName = args("TABLE_NAME") + val keyspaceName = args("KEYSPACE_NAME") + val dynamodbTableName = args("DYNAMODB_TABLE_NAME") + val dynamodbWriteUnits = args("DYNAMODB_WRITE_UNITS") + + val tableDf = sparkSession.read + .format("org.apache.spark.sql.cassandra") + .options(Map( "table" -> tableName, + "keyspace" -> keyspaceName, + "pushdown" -> "false"))//set to true when executing against Apache Cassandra, false when working with Keyspaces + .load() + //.filter("my_column=='somevalue' AND my_othercolumn=='someothervalue'") + + + val dynamicFrame = DynamicFrame(tableDf, glueContext) + + val dynamoDbSink = glueContext.getSinkWithFormat( + connectionType = "dynamodb", + options = JsonOptions(Map( + "dynamodb.output.tableName" -> dynamodbTableName, + "dynamodb.throughput.write.percent" -> "1.0", + "dynamodb.throughput.write" -> dynamodbWriteUnits + )) + ) + + dynamoDbSink.writeDynamicFrame(dynamicFrame) + } +} diff --git a/scala/datastax-v4/aws-glue/keyspaces-to-dynamodb/setup-migration.sh b/scala/datastax-v4/aws-glue/keyspaces-to-dynamodb/setup-migration.sh new file mode 100755 index 0000000..6613599 --- /dev/null +++ b/scala/datastax-v4/aws-glue/keyspaces-to-dynamodb/setup-migration.sh @@ -0,0 +1,46 @@ + #!/bin/bash + +echo "Positional Arguments: PARENT_STACK_NAME, STACK_NAME, KEYSPACE_NAME, TABLE_NAME, DYNAMODB_TABLE_NAME, DYNAMODB_WRITE_UNITS" +echo "" +echo "PARENT_STACK_NAME: Stack name used for setting up the connector" +echo "STACK_NAME: Stack name used for setting up glue job" +echo "KEYSPACE_NAME: Keyspace to export from" +echo "TABLE_NAME: Table to export from" +echo "DYNAMODB_TABLE_NAME: The DynamoDB table name to import into" +echo "DYNAMODB_WRITE_UNITS: For on-demand or provisioned tables, the amount of consumed capacity to target" + +PARENT_STACK_NAME=${1:-aksglue} +STACK_NAME="${2:-$PARENT_STACK_NAME-migration}" +KEYSPACE_NAME=${3:-mykeyspace} +TABLE_NAME=${4:-mytable} +DYNAMODB_TABLE_NAME=${5} +DYNAMODB_WRITE_UNITS=${6} + +echo "Parent stack used: ${PARENT_STACK_NAME}" +echo "Stack name used: ${STACK_NAME}" +echo "Keyspace used used: ${KEYSPACE_NAME}" +echo "Table used: ${TABLE_NAME}" +echo "DynamoDB target table: ${DYNAMODB_TABLE_NAME}" +echo "Target write rate: ${DYNAMODB_WRITE_UNITS}" + +if ! command -v aws &> /dev/null; then + echo "AWS CLI \"aws\" is not installed. aws is required for deploying artifacts to s3. See https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html " + exit 1 +fi + +export KEYSPACES_GLUE_BUCKET=$(aws cloudformation describe-stacks --query "Stacks[?StackName==\`$PARENT_STACK_NAME\`][].Outputs[?ExportName==\`KeyspacesBucketNameExport-$PARENT_STACK_NAME\`]".OutputValue --output text) + +if [ -z "${KEYSPACES_GLUE_BUCKET}" ]; then + echo "Parent stack not found. Cloudformation Export not found KeyspacesBucketNameExport-$PARENT_STACK_NAME" + exit 1 +fi + + +echo "Moving script to bucket ${KEYSPACES_GLUE_BUCKET}" + +aws s3api put-object --bucket $KEYSPACES_GLUE_BUCKET --key scripts/$PARENT_STACK_NAME-$STACK_NAME-migration.scala --body migration-sample.scala || exit 1 + +aws cloudformation create-stack --stack-name ${STACK_NAME} --parameters ParameterKey=ParentStack,ParameterValue=$PARENT_STACK_NAME ParameterKey=KeyspaceName,ParameterValue=$KEYSPACE_NAME ParameterKey=TableName,ParameterValue=$TABLE_NAME ParameterKey=DynamoDBTableName,ParameterValue=${DYNAMODB_TABLE_NAME} ParameterKey=DynamoDBWriteUnits,ParameterValue=$DYNAMODB_WRITE_UNITS --template-body 'file://glue-job-ks-to-dynamodb.yaml' || exit 1 #--debug + +echo Waiting for CloudFormation stack to complete ... +aws cloudformation wait stack-create-complete --stack-name ${STACK_NAME} || exit 1