Skip to content

DS v4 Glue Scala: New example for DynamoDB to Keyspaces migration #54

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions scala/datastax-v4/aws-glue/keyspaces-to-dynamodb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
## Using Glue for migration example
This example provides scala script for directly copying an Amazon Keyspaces table to DynamoDB. This allows you to migrate data to DynamoDB without setting up a spark cluster.

## Prerequisites
* Setup Spark Cassandra connector using provided [setup script](../)

### Setup migration from Keyspaces to DynamoDB
The following script sets up AWS Glue job to directly copy the Keyspaces table to DynamoDB. The script takes the following parameters
* PARENT_STACK_NAME is the stack name used to create the spark cassandra connector with Glue. [setup script](../)
* MIGRATION_STACK_NAME is the stack name used to create migration glue job.
* KEYSPACE_NAME and TABLE_NAME Keyspaces and table is the fully qualified name of the table you wish to migrate.
* DYNAMODB_TABLE_NAME is the name of the DynamoDB table the data should be written to.
* DYNAMODB_WRITE_UNITS is the target write throughput for the job. This setting is agnostic to the capacity mode (provisioned on on-demand) of your DynamoDB table.

```shell
./setup-migration.sh SETUP_STACK_NAME MIGRATION_STACK_NAME KEYSPACE_TABLE TABLE_NAME DYNAMODB_TABLE_NAME DYNAMODB_WRITE_UNITS

```

By default this script will copy all rows and all columns into DynamoDB. You can choose any arbitrary throughput, but you must increase the requested number of Glue workers through trial and error to achieve your desired throughput in DynamoDB.
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
AWSTemplateFormatVersion: 2010-09-09
Description: 'Directly copy Keyspaces rows to DynamoDB'
Parameters:
KeyspaceName:
NoEcho: false
Description: Cassandra Keyspace name
Type: String
Default: mykeyspace
MinLength: 3
MaxLength: 48
TableName:
NoEcho: false
Description: Cassandra Table name
Type: String
Default: mytable
MinLength: 3
MaxLength: 48
ParentStack:
NoEcho: false
Description: Stack used to setup the spark cassandra connector
Type: String
Default: aksglue1
MinLength: 3
MaxLength: 48
DynamoDBTableName:
NoEcho: false
Description: The DynamoDB table name to write the data into
Type: String
DynamoDBWriteUnits:
NoEcho: false
Description: Target write capacity units for the DynamoDB table
Type: Number
Default: 1000
MaxValue: 80000
Resources:
GlueJob:
Type: AWS::Glue::Job
Properties:
Command:
Name: glueetl
ScriptLocation: !Sub
- "s3://${IMPORTBUCKETNAME}/scripts/${ParentStack}-${AWS::StackName}-migration.scala"
- IMPORTBUCKETNAME:
Fn::ImportValue:
!Sub 'KeyspacesBucketNameExport-${ParentStack}'
DefaultArguments:
"--job-language": "scala"
"--user-jars-first": "true"
"--extra-jars": !Sub
- 's3://${IMPORTBUCKETNAME}/jars/spark-cassandra-connector-assembly_2.12-3.1.0.jar,s3://${IMPORTBUCKETNAME}/jars/aws-sigv4-auth-cassandra-java-driver-plugin-4.0.9-shaded.jar,s3://${IMPORTBUCKETNAME}/jars/spark-extension_2.12-2.8.0-3.4.jar,s3://${IMPORTBUCKETNAME}/jars/amazon-keyspaces-helpers-1.0-SNAPSHOT.jar'
- IMPORTBUCKETNAME:
Fn::ImportValue:
!Sub 'KeyspacesBucketNameExport-${ParentStack}'
"--extra-files": !Sub
- 's3://${IMPORTBUCKETNAME}/conf/keyspaces-application.conf'
- IMPORTBUCKETNAME:
Fn::ImportValue:
!Sub 'KeyspacesBucketNameExport-${ParentStack}'
"--enable-metrics": "true"
"--enable-continuous-cloudwatch-log": "true"
"--enable-spark-ui": "true"
"--spark-event-logs-path": !Sub
- "s3://${IMPORTBUCKETNAME}/spark-logs/"
- IMPORTBUCKETNAME:
Fn::ImportValue:
!Sub 'KeyspacesBucketNameExport-${ParentStack}'
"--write-shuffle-files-to-s3": "true"
"--write-shuffle-spills-to-s3": "true"
"--TempDir": !Sub
- 's3://${IMPORTBUCKETNAME}/shuffle-space/export-sample/'
- IMPORTBUCKETNAME:
Fn::ImportValue:
!Sub 'KeyspacesBucketNameExport-${ParentStack}'
"--DYNAMODB_WRITE_UNITS": !Sub '${DynamoDBWriteUnits}'
"--KEYSPACE_NAME": !Sub '${KeyspaceName}'
"--TABLE_NAME": !Sub '${TableName}'
"--DYNAMODB_TABLE_NAME": !Sub '${DynamoDBTableName}'
"--DRIVER_CONF": "keyspaces-application.conf"
#"--DISTINCT_KEYS": "id,create_date"
"--class": "GlueApp"
#Connections:
# ConnectionsList
Description: 'export to s3'
#ExecutionClass: String
#ExecutionProperty:
#ExecutionProperty
GlueVersion: "3.0"
#LogUri: String
#MaxCapacity: Double
#MaxRetries: Double
Name: !Sub ['AmazonKeyspacesExportToS3-${STACKNAME}', STACKNAME: !Join [ "-", [!Ref ParentStack, !Ref AWS::StackName]]]
#NonOverridableArguments: Json
#NotificationProperty:
#NotificationProperty
NumberOfWorkers: 2
Role:
Fn::ImportValue:
!Sub 'KeyspacesGlueJobServiceRoleExport-${ParentStack}'
#SecurityConfiguration: String
#Tags: Json
#Timeout: Integer
WorkerType: G.2X
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.streaming.Trigger
import scala.collection.JavaConverters._
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.SaveMode._
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import com.datastax.oss.driver.api.core._
import org.apache.spark.sql.functions.rand
import com.amazonaws.services.glue.log.GlueLogger
import java.time.ZonedDateTime
import java.time.ZoneOffset
import java.time.temporal.ChronoUnit
import java.time.format.DateTimeFormatter
import scala.collection.JavaConverters._
import com.amazonaws.services.glue.DynamicFrame
import com.amazonaws.services.glue.util.JsonOptions


object GlueApp {

def main(sysArgs: Array[String]) {

val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "KEYSPACE_NAME", "TABLE_NAME", "DRIVER_CONF", "DYNAMODB_TABLE_NAME", "DYNAMODB_WRITE_UNITS").toArray)

val driverConfFileName = args("DRIVER_CONF")

val conf = new SparkConf()
.setAll(
Seq(
("spark.task.maxFailures", "100"),

("spark.cassandra.connection.config.profile.path", driverConfFileName),
("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions"),
("directJoinSetting", "on"),

("spark.cassandra.output.consistency.level", "LOCAL_QUORUM"),//WRITES
("spark.cassandra.input.consistency.level", "LOCAL_ONE"),//READS

("spark.cassandra.sql.inClauseToJoinConversionThreshold", "0"),
("spark.cassandra.sql.inClauseToFullScanConversionThreshold", "0"),
("spark.cassandra.concurrent.reads", "50"),

("spark.cassandra.output.concurrent.writes", "5"),
("spark.cassandra.output.batch.grouping.key", "none"),
("spark.cassandra.output.batch.size.rows", "1"),
("spark.cassandra.output.batch.size.rows", "1"),
("spark.cassandra.output.ignoreNulls", "true")
))


val spark: SparkContext = new SparkContext(conf)
val glueContext: GlueContext = new GlueContext(spark)
val sparkSession: SparkSession = glueContext.getSparkSession

import sparkSession.implicits._

Job.init(args("JOB_NAME"), glueContext, args.asJava)

val logger = new GlueLogger

//validation steps for peers and partitioner
val connector = CassandraConnector.apply(conf);
val session = connector.openSession();
val peersCount = session.execute("SELECT * FROM system.peers").all().size()

val partitioner = session.execute("SELECT partitioner from system.local").one().getString("partitioner")

logger.info("Total number of seeds:" + peersCount)
logger.info("Configured partitioner:" + partitioner)

if(peersCount == 0){
throw new Exception("No system peers found. Check required permissions to read from the system.peers table. If using VPCE check permissions for describing VPCE endpoints. https://docs.aws.amazon.com/keyspaces/latest/devguide/vpc-endpoints.html")
}

if(partitioner.equals("com.amazonaws.cassandra.DefaultPartitioner")){
throw new Exception("Sark requires the use of RandomPartitioner or Murmur3Partitioner. See Working with partioners in Amazon Keyspaces documentation. https://docs.aws.amazon.com/keyspaces/latest/devguide/working-with-partitioners.html")
}

val tableName = args("TABLE_NAME")
val keyspaceName = args("KEYSPACE_NAME")
val dynamodbTableName = args("DYNAMODB_TABLE_NAME")
val dynamodbWriteUnits = args("DYNAMODB_WRITE_UNITS")

val tableDf = sparkSession.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> tableName,
"keyspace" -> keyspaceName,
"pushdown" -> "false"))//set to true when executing against Apache Cassandra, false when working with Keyspaces
.load()
//.filter("my_column=='somevalue' AND my_othercolumn=='someothervalue'")


val dynamicFrame = DynamicFrame(tableDf, glueContext)

val dynamoDbSink = glueContext.getSinkWithFormat(
connectionType = "dynamodb",
options = JsonOptions(Map(
"dynamodb.output.tableName" -> dynamodbTableName,
"dynamodb.throughput.write.percent" -> "1.0",
"dynamodb.throughput.write" -> dynamodbWriteUnits
))
)

dynamoDbSink.writeDynamicFrame(dynamicFrame)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/bin/bash

echo "Positional Arguments: PARENT_STACK_NAME, STACK_NAME, KEYSPACE_NAME, TABLE_NAME, DYNAMODB_TABLE_NAME, DYNAMODB_WRITE_UNITS"
echo ""
echo "PARENT_STACK_NAME: Stack name used for setting up the connector"
echo "STACK_NAME: Stack name used for setting up glue job"
echo "KEYSPACE_NAME: Keyspace to export from"
echo "TABLE_NAME: Table to export from"
echo "DYNAMODB_TABLE_NAME: The DynamoDB table name to import into"
echo "DYNAMODB_WRITE_UNITS: For on-demand or provisioned tables, the amount of consumed capacity to target"

PARENT_STACK_NAME=${1:-aksglue}
STACK_NAME="${2:-$PARENT_STACK_NAME-migration}"
KEYSPACE_NAME=${3:-mykeyspace}
TABLE_NAME=${4:-mytable}
DYNAMODB_TABLE_NAME=${5}
DYNAMODB_WRITE_UNITS=${6}

echo "Parent stack used: ${PARENT_STACK_NAME}"
echo "Stack name used: ${STACK_NAME}"
echo "Keyspace used used: ${KEYSPACE_NAME}"
echo "Table used: ${TABLE_NAME}"
echo "DynamoDB target table: ${DYNAMODB_TABLE_NAME}"
echo "Target write rate: ${DYNAMODB_WRITE_UNITS}"

if ! command -v aws &> /dev/null; then
echo "AWS CLI \"aws\" is not installed. aws is required for deploying artifacts to s3. See https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html "
exit 1
fi

export KEYSPACES_GLUE_BUCKET=$(aws cloudformation describe-stacks --query "Stacks[?StackName==\`$PARENT_STACK_NAME\`][].Outputs[?ExportName==\`KeyspacesBucketNameExport-$PARENT_STACK_NAME\`]".OutputValue --output text)

if [ -z "${KEYSPACES_GLUE_BUCKET}" ]; then
echo "Parent stack not found. Cloudformation Export not found KeyspacesBucketNameExport-$PARENT_STACK_NAME"
exit 1
fi


echo "Moving script to bucket ${KEYSPACES_GLUE_BUCKET}"

aws s3api put-object --bucket $KEYSPACES_GLUE_BUCKET --key scripts/$PARENT_STACK_NAME-$STACK_NAME-migration.scala --body migration-sample.scala || exit 1

aws cloudformation create-stack --stack-name ${STACK_NAME} --parameters ParameterKey=ParentStack,ParameterValue=$PARENT_STACK_NAME ParameterKey=KeyspaceName,ParameterValue=$KEYSPACE_NAME ParameterKey=TableName,ParameterValue=$TABLE_NAME ParameterKey=DynamoDBTableName,ParameterValue=${DYNAMODB_TABLE_NAME} ParameterKey=DynamoDBWriteUnits,ParameterValue=$DYNAMODB_WRITE_UNITS --template-body 'file://glue-job-ks-to-dynamodb.yaml' || exit 1 #--debug

echo Waiting for CloudFormation stack to complete ...
aws cloudformation wait stack-create-complete --stack-name ${STACK_NAME} || exit 1