Skip to content

Commit 6a0d394

Browse files
committed
Keyspaces to DynamoDB
1 parent 36c3ac5 commit 6a0d394

File tree

4 files changed

+285
-0
lines changed

4 files changed

+285
-0
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
## Using Glue for migration example
2+
This example provides scala script for directly copying an Amazon Keyspaces table to DynamoDB. This allows you to migrate data to DynamoDB without setting up a spark cluster.
3+
4+
## Prerequisites
5+
* Setup Spark Cassandra connector using provided [setup script](../)
6+
7+
### Setup migration from Keyspaces to DynamoDB
8+
The following script sets up AWS Glue job to directly copy the Keyspaces table to DynamoDB. The script takes the following parameters
9+
* PARENT_STACK_NAME is the stack name used to create the spark cassandra connector with Glue. [setup script](../)
10+
* MIGRATION_STACK_NAME is the stack name used to create migration glue job.
11+
* KEYSPACE_NAME and TABLE_NAME Keyspaces and table is the fully qualified name of the table you wish to migrate.
12+
* DYNAMODB_TABLE_NAME is the name of the DynamoDB table the data should be written to.
13+
* DYNAMODB_WRITE_UNITS is the target write throughput for the job. This setting is agnostic to the capacity mode (provisioned on on-demand) of your DynamoDB table.
14+
15+
```shell
16+
./setup-migration.sh SETUP_STACK_NAME MIGRATION_STACK_NAME KEYSPACE_TABLE TABLE_NAME DYNAMODB_TABLE_NAME DYNAMODB_WRITE_UNITS
17+
18+
```
19+
20+
By default this script will copy all rows and all columns into DynamoDB. You can choose any arbitrary throughput, but you must increase the requested number of Glue workers through trial and error to achieve your desired throughput in DynamoDB.
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
AWSTemplateFormatVersion: 2010-09-09
2+
Description: 'Directly copy Keyspaces rows to DynamoDB'
3+
Parameters:
4+
KeyspaceName:
5+
NoEcho: false
6+
Description: Cassandra Keyspace name
7+
Type: String
8+
Default: mykeyspace
9+
MinLength: 3
10+
MaxLength: 48
11+
TableName:
12+
NoEcho: false
13+
Description: Cassandra Table name
14+
Type: String
15+
Default: mytable
16+
MinLength: 3
17+
MaxLength: 48
18+
ParentStack:
19+
NoEcho: false
20+
Description: Stack used to setup the spark cassandra connector
21+
Type: String
22+
Default: aksglue1
23+
MinLength: 3
24+
MaxLength: 48
25+
DynamoDBTableName:
26+
NoEcho: false
27+
Description: The DynamoDB table name to write the data into
28+
Type: String
29+
DynamoDBWriteUnits:
30+
NoEcho: false
31+
Description: Target write capacity units for the DynamoDB table
32+
Type: Number
33+
Default: 1000
34+
MaxValue: 80000
35+
Resources:
36+
GlueJob:
37+
Type: AWS::Glue::Job
38+
Properties:
39+
Command:
40+
Name: glueetl
41+
ScriptLocation: !Sub
42+
- "s3://${IMPORTBUCKETNAME}/scripts/${ParentStack}-${AWS::StackName}-migration.scala"
43+
- IMPORTBUCKETNAME:
44+
Fn::ImportValue:
45+
!Sub 'KeyspacesBucketNameExport-${ParentStack}'
46+
DefaultArguments:
47+
"--job-language": "scala"
48+
"--user-jars-first": "true"
49+
"--extra-jars": !Sub
50+
- 's3://${IMPORTBUCKETNAME}/jars/spark-cassandra-connector-assembly_2.12-3.1.0.jar,s3://${IMPORTBUCKETNAME}/jars/aws-sigv4-auth-cassandra-java-driver-plugin-4.0.9-shaded.jar,s3://${IMPORTBUCKETNAME}/jars/spark-extension_2.12-2.8.0-3.4.jar,s3://${IMPORTBUCKETNAME}/jars/amazon-keyspaces-helpers-1.0-SNAPSHOT.jar'
51+
- IMPORTBUCKETNAME:
52+
Fn::ImportValue:
53+
!Sub 'KeyspacesBucketNameExport-${ParentStack}'
54+
"--extra-files": !Sub
55+
- 's3://${IMPORTBUCKETNAME}/conf/keyspaces-application.conf'
56+
- IMPORTBUCKETNAME:
57+
Fn::ImportValue:
58+
!Sub 'KeyspacesBucketNameExport-${ParentStack}'
59+
"--enable-metrics": "true"
60+
"--enable-continuous-cloudwatch-log": "true"
61+
"--enable-spark-ui": "true"
62+
"--spark-event-logs-path": !Sub
63+
- "s3://${IMPORTBUCKETNAME}/spark-logs/"
64+
- IMPORTBUCKETNAME:
65+
Fn::ImportValue:
66+
!Sub 'KeyspacesBucketNameExport-${ParentStack}'
67+
"--write-shuffle-files-to-s3": "true"
68+
"--write-shuffle-spills-to-s3": "true"
69+
"--TempDir": !Sub
70+
- 's3://${IMPORTBUCKETNAME}/shuffle-space/export-sample/'
71+
- IMPORTBUCKETNAME:
72+
Fn::ImportValue:
73+
!Sub 'KeyspacesBucketNameExport-${ParentStack}'
74+
"--DYNAMODB_WRITE_UNITS": !Sub '${DynamoDBWriteUnits}'
75+
"--KEYSPACE_NAME": !Sub '${KeyspaceName}'
76+
"--TABLE_NAME": !Sub '${TableName}'
77+
"--DYNAMODB_TABLE_NAME": !Sub '${DynamoDBTableName}'
78+
"--DRIVER_CONF": "keyspaces-application.conf"
79+
#"--DISTINCT_KEYS": "id,create_date"
80+
"--class": "GlueApp"
81+
#Connections:
82+
# ConnectionsList
83+
Description: 'export to s3'
84+
#ExecutionClass: String
85+
#ExecutionProperty:
86+
#ExecutionProperty
87+
GlueVersion: "3.0"
88+
#LogUri: String
89+
#MaxCapacity: Double
90+
#MaxRetries: Double
91+
Name: !Sub ['AmazonKeyspacesExportToS3-${STACKNAME}', STACKNAME: !Join [ "-", [!Ref ParentStack, !Ref AWS::StackName]]]
92+
#NonOverridableArguments: Json
93+
#NotificationProperty:
94+
#NotificationProperty
95+
NumberOfWorkers: 2
96+
Role:
97+
Fn::ImportValue:
98+
!Sub 'KeyspacesGlueJobServiceRoleExport-${ParentStack}'
99+
#SecurityConfiguration: String
100+
#Tags: Json
101+
#Timeout: Integer
102+
WorkerType: G.2X
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
import com.amazonaws.services.glue.GlueContext
2+
import com.amazonaws.services.glue.util.GlueArgParser
3+
import com.amazonaws.services.glue.util.Job
4+
import org.apache.spark.SparkContext
5+
import org.apache.spark.SparkConf
6+
import org.apache.spark.sql.Dataset
7+
import org.apache.spark.sql.Row
8+
import org.apache.spark.sql.SaveMode
9+
import org.apache.spark.sql.SparkSession
10+
import org.apache.spark.sql.functions.from_json
11+
import org.apache.spark.sql.streaming.Trigger
12+
import scala.collection.JavaConverters._
13+
import com.datastax.spark.connector._
14+
import org.apache.spark.sql.cassandra._
15+
import org.apache.spark.sql.SaveMode._
16+
import com.datastax.spark.connector._
17+
import com.datastax.spark.connector.cql._
18+
import com.datastax.oss.driver.api.core._
19+
import org.apache.spark.sql.functions.rand
20+
import com.amazonaws.services.glue.log.GlueLogger
21+
import java.time.ZonedDateTime
22+
import java.time.ZoneOffset
23+
import java.time.temporal.ChronoUnit
24+
import java.time.format.DateTimeFormatter
25+
import scala.collection.JavaConverters._
26+
import com.amazonaws.services.glue.DynamicFrame
27+
import com.amazonaws.services.glue.util.JsonOptions
28+
29+
30+
object GlueApp {
31+
32+
def main(sysArgs: Array[String]) {
33+
34+
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "KEYSPACE_NAME", "TABLE_NAME", "DRIVER_CONF", "DYNAMODB_TABLE_NAME", "DYNAMODB_WRITE_UNITS").toArray)
35+
36+
val driverConfFileName = args("DRIVER_CONF")
37+
38+
val conf = new SparkConf()
39+
.setAll(
40+
Seq(
41+
("spark.task.maxFailures", "100"),
42+
43+
("spark.cassandra.connection.config.profile.path", driverConfFileName),
44+
("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions"),
45+
("directJoinSetting", "on"),
46+
47+
("spark.cassandra.output.consistency.level", "LOCAL_QUORUM"),//WRITES
48+
("spark.cassandra.input.consistency.level", "LOCAL_ONE"),//READS
49+
50+
("spark.cassandra.sql.inClauseToJoinConversionThreshold", "0"),
51+
("spark.cassandra.sql.inClauseToFullScanConversionThreshold", "0"),
52+
("spark.cassandra.concurrent.reads", "50"),
53+
54+
("spark.cassandra.output.concurrent.writes", "5"),
55+
("spark.cassandra.output.batch.grouping.key", "none"),
56+
("spark.cassandra.output.batch.size.rows", "1"),
57+
("spark.cassandra.output.batch.size.rows", "1"),
58+
("spark.cassandra.output.ignoreNulls", "true")
59+
))
60+
61+
62+
val spark: SparkContext = new SparkContext(conf)
63+
val glueContext: GlueContext = new GlueContext(spark)
64+
val sparkSession: SparkSession = glueContext.getSparkSession
65+
66+
import sparkSession.implicits._
67+
68+
Job.init(args("JOB_NAME"), glueContext, args.asJava)
69+
70+
val logger = new GlueLogger
71+
72+
//validation steps for peers and partitioner
73+
val connector = CassandraConnector.apply(conf);
74+
val session = connector.openSession();
75+
val peersCount = session.execute("SELECT * FROM system.peers").all().size()
76+
77+
val partitioner = session.execute("SELECT partitioner from system.local").one().getString("partitioner")
78+
79+
logger.info("Total number of seeds:" + peersCount)
80+
logger.info("Configured partitioner:" + partitioner)
81+
82+
if(peersCount == 0){
83+
throw new Exception("No system peers found. Check required permissions to read from the system.peers table. If using VPCE check permissions for describing VPCE endpoints. https://docs.aws.amazon.com/keyspaces/latest/devguide/vpc-endpoints.html")
84+
}
85+
86+
if(partitioner.equals("com.amazonaws.cassandra.DefaultPartitioner")){
87+
throw new Exception("Sark requires the use of RandomPartitioner or Murmur3Partitioner. See Working with partioners in Amazon Keyspaces documentation. https://docs.aws.amazon.com/keyspaces/latest/devguide/working-with-partitioners.html")
88+
}
89+
90+
val tableName = args("TABLE_NAME")
91+
val keyspaceName = args("KEYSPACE_NAME")
92+
val dynamodbTableName = args("DYNAMODB_TABLE_NAME")
93+
val dynamodbWriteUnits = args("DYNAMODB_WRITE_UNITS")
94+
95+
val tableDf = sparkSession.read
96+
.format("org.apache.spark.sql.cassandra")
97+
.options(Map( "table" -> tableName,
98+
"keyspace" -> keyspaceName,
99+
"pushdown" -> "false"))//set to true when executing against Apache Cassandra, false when working with Keyspaces
100+
.load()
101+
//.filter("my_column=='somevalue' AND my_othercolumn=='someothervalue'")
102+
103+
104+
val dynamicFrame = DynamicFrame(tableDf, glueContext)
105+
106+
val dynamoDbSink = glueContext.getSinkWithFormat(
107+
connectionType = "dynamodb",
108+
options = JsonOptions(Map(
109+
"dynamodb.output.tableName" -> dynamodbTableName,
110+
"dynamodb.throughput.write.percent" -> "1.0",
111+
"dynamodb.throughput.write" -> dynamodbWriteUnits
112+
))
113+
)
114+
115+
dynamoDbSink.writeDynamicFrame(dynamicFrame)
116+
}
117+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
#!/bin/bash
2+
3+
echo "Positional Arguments: PARENT_STACK_NAME, STACK_NAME, KEYSPACE_NAME, TABLE_NAME, DYNAMODB_TABLE_NAME, DYNAMODB_WRITE_UNITS"
4+
echo ""
5+
echo "PARENT_STACK_NAME: Stack name used for setting up the connector"
6+
echo "STACK_NAME: Stack name used for setting up glue job"
7+
echo "KEYSPACE_NAME: Keyspace to export from"
8+
echo "TABLE_NAME: Table to export from"
9+
echo "DYNAMODB_TABLE_NAME: The DynamoDB table name to import into"
10+
echo "DYNAMODB_WRITE_UNITS: For on-demand or provisioned tables, the amount of consumed capacity to target"
11+
12+
PARENT_STACK_NAME=${1:-aksglue}
13+
STACK_NAME="${2:-$PARENT_STACK_NAME-migration}"
14+
KEYSPACE_NAME=${3:-mykeyspace}
15+
TABLE_NAME=${4:-mytable}
16+
DYNAMODB_TABLE_NAME=${5}
17+
DYNAMODB_WRITE_UNITS=${6}
18+
19+
echo "Parent stack used: ${PARENT_STACK_NAME}"
20+
echo "Stack name used: ${STACK_NAME}"
21+
echo "Keyspace used used: ${KEYSPACE_NAME}"
22+
echo "Table used: ${TABLE_NAME}"
23+
echo "DynamoDB target table: ${DYNAMODB_TABLE_NAME}"
24+
echo "Target write rate: ${DYNAMODB_WRITE_UNITS}"
25+
26+
if ! command -v aws &> /dev/null; then
27+
echo "AWS CLI \"aws\" is not installed. aws is required for deploying artifacts to s3. See https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html "
28+
exit 1
29+
fi
30+
31+
export KEYSPACES_GLUE_BUCKET=$(aws cloudformation describe-stacks --query "Stacks[?StackName==\`$PARENT_STACK_NAME\`][].Outputs[?ExportName==\`KeyspacesBucketNameExport-$PARENT_STACK_NAME\`]".OutputValue --output text)
32+
33+
if [ -z "${KEYSPACES_GLUE_BUCKET}" ]; then
34+
echo "Parent stack not found. Cloudformation Export not found KeyspacesBucketNameExport-$PARENT_STACK_NAME"
35+
exit 1
36+
fi
37+
38+
39+
echo "Moving script to bucket ${KEYSPACES_GLUE_BUCKET}"
40+
41+
aws s3api put-object --bucket $KEYSPACES_GLUE_BUCKET --key scripts/$PARENT_STACK_NAME-$STACK_NAME-migration.scala --body migration-sample.scala || exit 1
42+
43+
aws cloudformation create-stack --stack-name ${STACK_NAME} --parameters ParameterKey=ParentStack,ParameterValue=$PARENT_STACK_NAME ParameterKey=KeyspaceName,ParameterValue=$KEYSPACE_NAME ParameterKey=TableName,ParameterValue=$TABLE_NAME ParameterKey=DynamoDBTableName,ParameterValue=${DYNAMODB_TABLE_NAME} ParameterKey=DynamoDBWriteUnits,ParameterValue=$DYNAMODB_WRITE_UNITS --template-body 'file://glue-job-ks-to-dynamodb.yaml' || exit 1 #--debug
44+
45+
echo Waiting for CloudFormation stack to complete ...
46+
aws cloudformation wait stack-create-complete --stack-name ${STACK_NAME} || exit 1

0 commit comments

Comments
 (0)