Skip to content

Commit a38fced

Browse files
committed
Keyspaces to DynamoDB
1 parent 36c3ac5 commit a38fced

File tree

4 files changed

+285
-0
lines changed

4 files changed

+285
-0
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
## Using Glue migration example
2+
This example provides scala script for directly copying an Amazon Keyspaces table to DynamoDB. This allows you to migrate data to DynamoDB without setting up a spark cluster.
3+
4+
## Prerequisites
5+
* Setup Spark Cassandra connector using provided [setup script](../)
6+
7+
### Setup migration from Keyspaces to DynamoDB
8+
The following script sets up AWS Glue job to directly copy the Keyspaces table to DynamoDB. The script takes the following parameters
9+
* PARENT_STACK_NAME is the stack name used to create the spark cassandra connector with Glue. [setup script](../)
10+
* MIGRATION_STACK_NAME is the stack name used to create migration glue job.
11+
* KEYSPACE_NAME and TABLE_NAME Keyspaces and table is the fully qualified name of the table you wish to migrate.
12+
* DYNAMODB_TABLE_NAME is the name of the DynamoDB table the data should be written to.
13+
* DYNAMODB_WRITE_UNITS is the target write throughput for the job. This setting is agnostic to the capacity mode (provisioned on on-demand) of your DynamoDB table.
14+
15+
```shell
16+
./setup-migration.sh SETUP_STACK_NAME MIGRATION_STACK_NAME KEYSPACE_TABLE TABLE_NAME DYNAMODB_TABLE_NAME DYNAMODB_WRITE_UNITS
17+
18+
```
19+
20+
By default this script will copy all rows and all columns into DynamoDB
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
AWSTemplateFormatVersion: 2010-09-09
2+
Description: 'Directly copy Keyspaces rows to DynamoDB'
3+
Parameters:
4+
KeyspaceName:
5+
NoEcho: false
6+
Description: Cassandra Keyspace name
7+
Type: String
8+
Default: mykeyspace
9+
MinLength: 3
10+
MaxLength: 48
11+
TableName:
12+
NoEcho: false
13+
Description: Cassandra Table name
14+
Type: String
15+
Default: mytable
16+
MinLength: 3
17+
MaxLength: 48
18+
ParentStack:
19+
NoEcho: false
20+
Description: Stack used to setup the spark cassandra connector
21+
Type: String
22+
Default: aksglue1
23+
MinLength: 3
24+
MaxLength: 48
25+
DynamoDBTableName:
26+
NoEcho: false
27+
Description: The DynamoDB table name to write the data into
28+
Type: String
29+
DynamoDBWriteUnits:
30+
NoEcho: false
31+
Description: Target write capacity units for the DynamoDB table
32+
Type: Number
33+
Default: 1000
34+
MaxValue: 80000
35+
Resources:
36+
GlueJob:
37+
Type: AWS::Glue::Job
38+
Properties:
39+
Command:
40+
Name: glueetl
41+
ScriptLocation: !Sub
42+
- "s3://${IMPORTBUCKETNAME}/scripts/${ParentStack}-${AWS::StackName}-migration.scala"
43+
- IMPORTBUCKETNAME:
44+
Fn::ImportValue:
45+
!Sub 'KeyspacesBucketNameExport-${ParentStack}'
46+
DefaultArguments:
47+
"--job-language": "scala"
48+
"--user-jars-first": "true"
49+
"--extra-jars": !Sub
50+
- 's3://${IMPORTBUCKETNAME}/jars/spark-cassandra-connector-assembly_2.12-3.1.0.jar,s3://${IMPORTBUCKETNAME}/jars/aws-sigv4-auth-cassandra-java-driver-plugin-4.0.9-shaded.jar,s3://${IMPORTBUCKETNAME}/jars/spark-extension_2.12-2.8.0-3.4.jar,s3://${IMPORTBUCKETNAME}/jars/amazon-keyspaces-helpers-1.0-SNAPSHOT.jar'
51+
- IMPORTBUCKETNAME:
52+
Fn::ImportValue:
53+
!Sub 'KeyspacesBucketNameExport-${ParentStack}'
54+
"--extra-files": !Sub
55+
- 's3://${IMPORTBUCKETNAME}/conf/keyspaces-application.conf'
56+
- IMPORTBUCKETNAME:
57+
Fn::ImportValue:
58+
!Sub 'KeyspacesBucketNameExport-${ParentStack}'
59+
"--enable-metrics": "true"
60+
"--enable-continuous-cloudwatch-log": "true"
61+
"--enable-spark-ui": "true"
62+
"--spark-event-logs-path": !Sub
63+
- "s3://${IMPORTBUCKETNAME}/spark-logs/"
64+
- IMPORTBUCKETNAME:
65+
Fn::ImportValue:
66+
!Sub 'KeyspacesBucketNameExport-${ParentStack}'
67+
"--write-shuffle-files-to-s3": "true"
68+
"--write-shuffle-spills-to-s3": "true"
69+
"--TempDir": !Sub
70+
- 's3://${IMPORTBUCKETNAME}/shuffle-space/export-sample/'
71+
- IMPORTBUCKETNAME:
72+
Fn::ImportValue:
73+
!Sub 'KeyspacesBucketNameExport-${ParentStack}'
74+
"--DYNAMODB_WRITE_UNITS": !Sub '${DynamoDBWriteUnits}'
75+
"--KEYSPACE_NAME": !Sub '${KeyspaceName}'
76+
"--TABLE_NAME": !Sub '${TableName}'
77+
"--DYNAMODB_TABLE_NAME": !Sub '${DynamoDBTableName}'
78+
"--DRIVER_CONF": "keyspaces-application.conf"
79+
#"--DISTINCT_KEYS": "id,create_date"
80+
"--class": "GlueApp"
81+
#Connections:
82+
# ConnectionsList
83+
Description: 'export to s3'
84+
#ExecutionClass: String
85+
#ExecutionProperty:
86+
#ExecutionProperty
87+
GlueVersion: "3.0"
88+
#LogUri: String
89+
#MaxCapacity: Double
90+
#MaxRetries: Double
91+
Name: !Sub ['AmazonKeyspacesExportToS3-${STACKNAME}', STACKNAME: !Join [ "-", [!Ref ParentStack, !Ref AWS::StackName]]]
92+
#NonOverridableArguments: Json
93+
#NotificationProperty:
94+
#NotificationProperty
95+
NumberOfWorkers: 2
96+
Role:
97+
Fn::ImportValue:
98+
!Sub 'KeyspacesGlueJobServiceRoleExport-${ParentStack}'
99+
#SecurityConfiguration: String
100+
#Tags: Json
101+
#Timeout: Integer
102+
WorkerType: G.2X
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
import com.amazonaws.services.glue.GlueContext
2+
import com.amazonaws.services.glue.util.GlueArgParser
3+
import com.amazonaws.services.glue.util.Job
4+
import org.apache.spark.SparkContext
5+
import org.apache.spark.SparkConf
6+
import org.apache.spark.sql.Dataset
7+
import org.apache.spark.sql.Row
8+
import org.apache.spark.sql.SaveMode
9+
import org.apache.spark.sql.SparkSession
10+
import org.apache.spark.sql.functions.from_json
11+
import org.apache.spark.sql.streaming.Trigger
12+
import scala.collection.JavaConverters._
13+
import com.datastax.spark.connector._
14+
import org.apache.spark.sql.cassandra._
15+
import org.apache.spark.sql.SaveMode._
16+
import com.datastax.spark.connector._
17+
import com.datastax.spark.connector.cql._
18+
import com.datastax.oss.driver.api.core._
19+
import org.apache.spark.sql.functions.rand
20+
import com.amazonaws.services.glue.log.GlueLogger
21+
import java.time.ZonedDateTime
22+
import java.time.ZoneOffset
23+
import java.time.temporal.ChronoUnit
24+
import java.time.format.DateTimeFormatter
25+
import scala.collection.JavaConverters._
26+
import com.amazonaws.services.glue.DynamicFrame
27+
import com.amazonaws.services.glue.util.JsonOptions
28+
29+
30+
object GlueApp {
31+
32+
def main(sysArgs: Array[String]) {
33+
34+
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "KEYSPACE_NAME", "TABLE_NAME", "DRIVER_CONF", "DYNAMODB_TABLE_NAME", "DYNAMODB_WRITE_UNITS").toArray)
35+
36+
val driverConfFileName = args("DRIVER_CONF")
37+
38+
val conf = new SparkConf()
39+
.setAll(
40+
Seq(
41+
("spark.task.maxFailures", "100"),
42+
43+
("spark.cassandra.connection.config.profile.path", driverConfFileName),
44+
("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions"),
45+
("directJoinSetting", "on"),
46+
47+
("spark.cassandra.output.consistency.level", "LOCAL_QUORUM"),//WRITES
48+
("spark.cassandra.input.consistency.level", "LOCAL_ONE"),//READS
49+
50+
("spark.cassandra.sql.inClauseToJoinConversionThreshold", "0"),
51+
("spark.cassandra.sql.inClauseToFullScanConversionThreshold", "0"),
52+
("spark.cassandra.concurrent.reads", "50"),
53+
54+
("spark.cassandra.output.concurrent.writes", "5"),
55+
("spark.cassandra.output.batch.grouping.key", "none"),
56+
("spark.cassandra.output.batch.size.rows", "1"),
57+
("spark.cassandra.output.batch.size.rows", "1"),
58+
("spark.cassandra.output.ignoreNulls", "true")
59+
))
60+
61+
62+
val spark: SparkContext = new SparkContext(conf)
63+
val glueContext: GlueContext = new GlueContext(spark)
64+
val sparkSession: SparkSession = glueContext.getSparkSession
65+
66+
import sparkSession.implicits._
67+
68+
Job.init(args("JOB_NAME"), glueContext, args.asJava)
69+
70+
val logger = new GlueLogger
71+
72+
//validation steps for peers and partitioner
73+
val connector = CassandraConnector.apply(conf);
74+
val session = connector.openSession();
75+
val peersCount = session.execute("SELECT * FROM system.peers").all().size()
76+
77+
val partitioner = session.execute("SELECT partitioner from system.local").one().getString("partitioner")
78+
79+
logger.info("Total number of seeds:" + peersCount)
80+
logger.info("Configured partitioner:" + partitioner)
81+
82+
if(peersCount == 0){
83+
throw new Exception("No system peers found. Check required permissions to read from the system.peers table. If using VPCE check permissions for describing VPCE endpoints. https://docs.aws.amazon.com/keyspaces/latest/devguide/vpc-endpoints.html")
84+
}
85+
86+
if(partitioner.equals("com.amazonaws.cassandra.DefaultPartitioner")){
87+
throw new Exception("Sark requires the use of RandomPartitioner or Murmur3Partitioner. See Working with partioners in Amazon Keyspaces documentation. https://docs.aws.amazon.com/keyspaces/latest/devguide/working-with-partitioners.html")
88+
}
89+
90+
val tableName = args("TABLE_NAME")
91+
val keyspaceName = args("KEYSPACE_NAME")
92+
val dynamodbTableName = args("DYNAMODB_TABLE_NAME")
93+
val dynamodbWriteUnits = args("DYNAMODB_WRITE_UNITS")
94+
95+
val tableDf = sparkSession.read
96+
.format("org.apache.spark.sql.cassandra")
97+
.options(Map( "table" -> tableName,
98+
"keyspace" -> keyspaceName,
99+
"pushdown" -> "false"))//set to true when executing against Apache Cassandra, false when working with Keyspaces
100+
.load()
101+
//.filter("my_column=='somevalue' AND my_othercolumn=='someothervalue'")
102+
103+
104+
val dynamicFrame = DynamicFrame(tableDf, glueContext)
105+
106+
val dynamoDbSink = glueContext.getSinkWithFormat(
107+
connectionType = "dynamodb",
108+
options = JsonOptions(Map(
109+
"dynamodb.output.tableName" -> dynamodbTableName,
110+
"dynamodb.throughput.write.percent" -> "1.0",
111+
"dynamodb.throughput.write" -> dynamodbWriteUnits
112+
))
113+
)
114+
115+
dynamoDbSink.writeDynamicFrame(dynamicFrame)
116+
}
117+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
#!/bin/bash
2+
3+
echo "Positional Arguments: PARENT_STACK_NAME, STACK_NAME, KEYSPACE_NAME, TABLE_NAME, DYNAMODB_TABLE_NAME, DYNAMODB_WRITE_UNITS"
4+
echo ""
5+
echo "PARENT_STACK_NAME: Stack name used for setting up the connector"
6+
echo "STACK_NAME: Stack name used for setting up glue job"
7+
echo "KEYSPACE_NAME: Keyspace to export from"
8+
echo "TABLE_NAME: Table to export from"
9+
echo "DYNAMODB_TABLE_NAME: The DynamoDB table name to import into"
10+
echo "DYNAMODB_WRITE_UNITS: For on-demand or provisioned tables, the amount of consumed capacity to target"
11+
12+
PARENT_STACK_NAME=${1:-aksglue}
13+
STACK_NAME="${2:-$PARENT_STACK_NAME-migration}"
14+
KEYSPACE_NAME=${3:-mykeyspace}
15+
TABLE_NAME=${4:-mytable}
16+
DYNAMODB_TABLE_NAME=${5}
17+
DYNAMODB_WRITE_UNITS=${6}
18+
19+
echo "Parent stack used: ${PARENT_STACK_NAME}"
20+
echo "Stack name used: ${STACK_NAME}"
21+
echo "Keyspace used used: ${KEYSPACE_NAME}"
22+
echo "Table used: ${TABLE_NAME}"
23+
echo "DynamoDB target table: ${DYNAMODB_TABLE_NAME}"
24+
echo "Target write rate: ${DYNAMODB_WRITE_UNITS}"
25+
26+
if ! command -v aws &> /dev/null; then
27+
echo "AWS CLI \"aws\" is not installed. aws is required for deploying artifacts to s3. See https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html "
28+
exit 1
29+
fi
30+
31+
export KEYSPACES_GLUE_BUCKET=$(aws cloudformation describe-stacks --query "Stacks[?StackName==\`$PARENT_STACK_NAME\`][].Outputs[?ExportName==\`KeyspacesBucketNameExport-$PARENT_STACK_NAME\`]".OutputValue --output text)
32+
33+
if [ -z "${KEYSPACES_GLUE_BUCKET}" ]; then
34+
echo "Parent stack not found. Cloudformation Export not found KeyspacesBucketNameExport-$PARENT_STACK_NAME"
35+
exit 1
36+
fi
37+
38+
39+
echo "Moving script to bucket ${KEYSPACES_GLUE_BUCKET}"
40+
41+
aws s3api put-object --bucket $KEYSPACES_GLUE_BUCKET --key scripts/$PARENT_STACK_NAME-$STACK_NAME-migration.scala --body migration-sample.scala || exit 1
42+
43+
aws cloudformation create-stack --stack-name ${STACK_NAME} --parameters ParameterKey=ParentStack,ParameterValue=$PARENT_STACK_NAME ParameterKey=KeyspaceName,ParameterValue=$KEYSPACE_NAME ParameterKey=TableName,ParameterValue=$TABLE_NAME ParameterKey=DynamoDBTableName,ParameterValue=${DYNAMODB_TABLE_NAME} ParameterKey=DynamoDBWriteUnits,ParameterValue=$DYNAMODB_WRITE_UNITS --template-body 'file://glue-job-ks-to-dynamodb.yaml' || exit 1 #--debug
44+
45+
echo Waiting for CloudFormation stack to complete ...
46+
aws cloudformation wait stack-create-complete --stack-name ${STACK_NAME} || exit 1

0 commit comments

Comments
 (0)