Skip to content

Commit

Permalink
feat: deploy casv5 scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed Jun 6, 2023
1 parent 05eb8f0 commit ae0721e
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 12 deletions.
50 changes: 46 additions & 4 deletions cd/manager/aws/dynamoDb.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,16 @@ func NewDynamoDb(cfg aws.Config, cache manager.Cache) manager.Database {
cache,
time.UnixMilli(0),
}
if err = db.createTable(); err != nil {
log.Fatalf("dynamodb: table creation failed: %v", err)
if err = db.createJobTable(); err != nil {
log.Fatalf("dynamodb: job table creation failed: %v", err)
}
if err = db.createBuildTable(); err != nil {
log.Fatalf("dynamodb: build table creation failed: %v", err)
}
return db
}

func (db DynamoDb) createTable() error {
func (db DynamoDb) createJobTable() error {
// Create the table if it doesn't already exist
if exists, err := db.tableExists(db.jobTable); !exists {
ctx, cancel := context.WithTimeout(context.Background(), manager.DefaultHttpWaitTime)
Expand Down Expand Up @@ -143,7 +146,6 @@ func (db DynamoDb) createTable() error {
},
}
if _, err = db.client.CreateTable(ctx, &createTableInput); err != nil {
log.Printf("dynamodb: table creation error: %v", err)
return err
}
var exists bool
Expand All @@ -158,6 +160,46 @@ func (db DynamoDb) createTable() error {
return nil
}

func (db DynamoDb) createBuildTable() error {
// Create the table if it doesn't already exist
if exists, err := db.tableExists(db.buildTable); !exists {
ctx, cancel := context.WithTimeout(context.Background(), manager.DefaultHttpWaitTime)
defer cancel()

createTableInput := dynamodb.CreateTableInput{
AttributeDefinitions: []types.AttributeDefinition{
{
AttributeName: aws.String("key"),
AttributeType: "S",
},
},
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("key"),
KeyType: "HASH",
},
},
TableName: aws.String(db.buildTable),
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(1),
WriteCapacityUnits: aws.Int64(1),
},
}
if _, err = db.client.CreateTable(ctx, &createTableInput); err != nil {
return err
}
var exists bool
for i := 0; i < TableCreationRetries; i++ {
if exists, err = db.tableExists(db.buildTable); exists {
return nil
}
time.Sleep(TableCreationWait)
}
return err
}
return nil
}

func (db DynamoDb) tableExists(table string) (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), manager.DefaultHttpWaitTime)
defer cancel()
Expand Down
9 changes: 8 additions & 1 deletion cd/manager/aws/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (e Ecs) GenerateEnvLayout(component manager.DeployComponent) (*manager.Layo
privateCluster := manager.CeramicEnvPfx()
publicCluster := manager.CeramicEnvPfx() + "-ex"
casCluster := manager.CeramicEnvPfx() + "-cas"
casV5Cluster := "app-cas-" + os.Getenv("ENV")
ecrRepo, err := e.componentEcrRepo(component)
if err != nil {
log.Printf("generateEnvLayout: ecr repo error: %s, %v", component, err)
Expand All @@ -118,7 +119,7 @@ func (e Ecs) GenerateEnvLayout(component manager.DeployComponent) (*manager.Layo
// Populate the service layout by retrieving the clusters/services from ECS
layout := &manager.Layout{Clusters: map[string]*manager.Cluster{}, Repo: ecrRepo}
casSchedulerFound := false
for _, cluster := range []string{privateCluster, publicCluster, casCluster} {
for _, cluster := range []string{privateCluster, publicCluster, casCluster, casV5Cluster} {
if clusterServices, err := e.listEcsServices(cluster); err != nil {
log.Printf("generateEnvLayout: list services error: %s, %v", cluster, err)
return nil, err
Expand Down Expand Up @@ -183,6 +184,10 @@ func (e Ecs) componentTask(component manager.DeployComponent, service string) (*
Temp: true, // Anchor workers do not stay up permanently
}, true
}
case manager.DeployComponent_CasV5:
if strings.Contains(service, manager.ServiceSuffix_CasScheduler) {
return &manager.Task{}, true
}
default:
log.Printf("componentTask: unknown component: %s", component)
}
Expand All @@ -198,6 +203,8 @@ func (e Ecs) componentEcrRepo(component manager.DeployComponent) (string, error)
return "go-ipfs-" + envStr, nil
case manager.DeployComponent_Cas:
return manager.CeramicEnvPfx() + "-cas", nil
case manager.DeployComponent_CasV5:
return "app-cas-scheduler", nil
default:
return "", fmt.Errorf("componentTask: unknown component: %s", component)
}
Expand Down
2 changes: 2 additions & 0 deletions cd/manager/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type DeployComponent string
const (
DeployComponent_Ceramic DeployComponent = "ceramic"
DeployComponent_Cas DeployComponent = "cas"
DeployComponent_CasV5 DeployComponent = "casv5"
DeployComponent_Ipfs DeployComponent = "ipfs"
)

Expand All @@ -89,6 +90,7 @@ type DeployRepo string
const (
DeployRepo_Ceramic DeployRepo = "js-ceramic"
DeployRepo_Cas DeployRepo = "ceramic-anchor-service"
DeployRepo_CasV5 DeployRepo = "go-cas"
DeployRepo_Ipfs DeployRepo = "go-ipfs-daemon"
)

Expand Down
31 changes: 24 additions & 7 deletions cd/manager/notifs/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,16 +260,33 @@ func (n JobNotifs) getDeployHashes(jobState manager.JobState) string {
}
}
// Prepare component messages with GitHub commit hashes and hyperlinks
ceramicMsg := n.getComponentMsg(manager.DeployComponent_Ceramic, commitHashes[manager.DeployComponent_Ceramic])
casMsg := n.getComponentMsg(manager.DeployComponent_Cas, commitHashes[manager.DeployComponent_Cas])
ipfsMsg := n.getComponentMsg(manager.DeployComponent_Ipfs, commitHashes[manager.DeployComponent_Ipfs])
return fmt.Sprintf("%s\n%s\n%s", ceramicMsg, casMsg, ipfsMsg)
ceramicMsg := n.getComponentMsg(manager.DeployComponent_Ceramic, commitHashes)
casMsg := n.getComponentMsg(manager.DeployComponent_Cas, commitHashes)
casV5Msg := n.getComponentMsg(manager.DeployComponent_CasV5, commitHashes)
ipfsMsg := n.getComponentMsg(manager.DeployComponent_Ipfs, commitHashes)
return n.combineComponentMsgs(ceramicMsg, casMsg, casV5Msg, ipfsMsg)
}
}

func (n JobNotifs) getComponentMsg(component manager.DeployComponent, sha string) string {
repo := manager.ComponentRepo(component)
return fmt.Sprintf("[%s (%s)](https://github.com/%s/%s/commit/%s)", repo, sha[:12], manager.GitHubOrg, repo, sha)
func (n JobNotifs) getComponentMsg(component manager.DeployComponent, commitHashes map[manager.DeployComponent]string) string {
if commitHash, found := commitHashes[component]; found {
repo := manager.ComponentRepo(component)
return fmt.Sprintf("[%s (%s)](https://github.com/%s/%s/commit/%s)", repo, commitHash[:12], manager.GitHubOrg, repo, commitHash)
}
return ""
}

func (n JobNotifs) combineComponentMsgs(msgs ...string) string {
message := ""
for i, msg := range msgs {
if len(msg) > 0 {
message += msg
if i < len(msgs)-1 {
message += "\n"
}
}
}
return message
}

func (n JobNotifs) getActiveJobs(jobState manager.JobState) []discord.EmbedField {
Expand Down
4 changes: 4 additions & 0 deletions cd/manager/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func EnvBranch(component DeployComponent, env EnvType) string {
return EnvBranch_Dev
case DeployComponent_Cas:
return EnvBranch_Dev
case DeployComponent_CasV5:
return EnvBranch_Dev
default:
return EnvBranch_Qa
}
Expand All @@ -64,6 +66,8 @@ func ComponentRepo(component DeployComponent) DeployRepo {
return DeployRepo_Ceramic
case DeployComponent_Cas:
return DeployRepo_Cas
case DeployComponent_CasV5:
return DeployRepo_CasV5
case DeployComponent_Ipfs:
return DeployRepo_Ipfs
default:
Expand Down

0 comments on commit ae0721e

Please sign in to comment.