Skip to content

Commit

Permalink
feat(cd): refactor deploy job to contain deployment business logic (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed Nov 6, 2023
1 parent 1f5fff5 commit 04a76aa
Show file tree
Hide file tree
Showing 17 changed files with 336 additions and 342 deletions.
23 changes: 17 additions & 6 deletions cd/manager/common/aws/ddb/dynamoDb.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,19 @@ type DynamoDb struct {
cursor time.Time
}

const defaultJobStateTtl = 2 * 7 * 24 * time.Hour // Two weeks
const buildHashTag = "sha_tag"

// buildState represents build/deploy commit hash information. This information is maintained in a legacy DynamoDB table
// used by our utility AWS Lambdas.
type buildState struct {
key manager.DeployComponent `dynamodbav:"key"`
deployTag string `dynamodbav:"deployTag"`
buildInfo map[string]interface{} `dynamodbav:"buildInfo"`
}

func NewDynamoDb(cfg aws.Config, cache manager.Cache) manager.Database {
env := os.Getenv("ENV")
env := os.Getenv(manager.EnvVar_Env)
// Use override endpoint, if specified, so that we can store jobs locally, while hitting regular AWS endpoints for
// other operations. This allows local testing without affecting CD manager instances running in AWS.
customEndpoint := os.Getenv("DB_AWS_ENDPOINT")
Expand Down Expand Up @@ -291,7 +302,7 @@ func (db DynamoDb) WriteJob(jobState job.JobState) error {
// Generate a new UUID for every job update
jobState.Id = uuid.New().String()
// Set entry expiration
jobState.Ttl = time.Now().Add(manager.DefaultJobStateTtl)
jobState.Ttl = time.Now().Add(defaultJobStateTtl)
if attributeValues, err := attributevalue.MarshalMapWithOptions(jobState, func(options *attributevalue.EncoderOptions) {
options.EncodeTime = func(time time.Time) (types.AttributeValue, error) {
return &types.AttributeValueMemberN{Value: strconv.FormatInt(time.UnixNano(), 10)}, nil
Expand Down Expand Up @@ -357,7 +368,7 @@ func (db DynamoDb) GetBuildHashes() (map[manager.DeployComponent]string, error)
} else {
commitHashes := make(map[manager.DeployComponent]string, len(buildStates))
for _, state := range buildStates {
commitHashes[state.Key] = state.BuildInfo[manager.BuildHashTag].(string)
commitHashes[state.key] = state.buildInfo[buildHashTag].(string)
}
return commitHashes, nil
}
Expand All @@ -369,13 +380,13 @@ func (db DynamoDb) GetDeployHashes() (map[manager.DeployComponent]string, error)
} else {
commitHashes := make(map[manager.DeployComponent]string, len(buildStates))
for _, state := range buildStates {
commitHashes[state.Key] = state.DeployTag
commitHashes[state.key] = state.deployTag
}
return commitHashes, nil
}
}

func (db DynamoDb) getBuildStates() ([]manager.BuildState, error) {
func (db DynamoDb) getBuildStates() ([]buildState, error) {
ctx, cancel := context.WithTimeout(context.Background(), manager.DefaultHttpWaitTime)
defer cancel()

Expand All @@ -385,7 +396,7 @@ func (db DynamoDb) getBuildStates() ([]manager.BuildState, error) {
}); err != nil {
return nil, err
} else {
var buildStates []manager.BuildState
var buildStates []buildState
if err = attributevalue.UnmarshalListOfMapsWithOptions(scanOutput.Items, &buildStates); err != nil {
return nil, err
}
Expand Down
154 changes: 33 additions & 121 deletions cd/manager/common/aws/ecs/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,16 @@ type ecsFailure struct {
arn, detail, reason string
}

const (
deployType_Service string = "service"
deployType_Task string = "task"
)

const resourceTag = "Ceramic"

func NewEcs(cfg aws.Config) manager.Deployment {
ecrUri := os.Getenv("AWS_ACCOUNT_ID") + ".dkr.ecr." + os.Getenv("AWS_REGION") + ".amazonaws.com/"
return &Ecs{ecs.NewFromConfig(cfg), ssm.NewFromConfig(cfg), manager.EnvType(os.Getenv("ENV")), ecrUri}
return &Ecs{ecs.NewFromConfig(cfg), ssm.NewFromConfig(cfg), manager.EnvType(os.Getenv(manager.EnvVar_Env)), ecrUri}
}

func (e Ecs) LaunchServiceTask(cluster, service, family, container string, overrides map[string]string) (string, error) {
Expand Down Expand Up @@ -106,124 +113,29 @@ func (e Ecs) CheckTask(cluster, taskDefId string, running, stable bool, taskIds
return tasksFound && tasksInState, nil
}

func (e Ecs) GenerateEnvLayout(component manager.DeployComponent) (*manager.Layout, error) {
privateCluster := manager.CeramicEnvPfx()
publicCluster := manager.CeramicEnvPfx() + "-ex"
casCluster := manager.CeramicEnvPfx() + "-cas"
casV5Cluster := "app-cas-" + string(e.env)
ecrRepo, err := e.componentEcrRepo(component)
if err != nil {
log.Printf("generateEnvLayout: ecr repo error: %s, %v", component, err)
return nil, err
}
// Populate the service layout by retrieving the clusters/services from ECS
layout := &manager.Layout{Clusters: map[string]*manager.Cluster{}, Repo: ecrRepo}
casSchedulerFound := false
for _, cluster := range []string{privateCluster, publicCluster, casCluster, casV5Cluster} {
func (e Ecs) GetLayout(clusters []string) (*manager.Layout, error) {
layout := &manager.Layout{Clusters: map[string]*manager.Cluster{}}
for _, cluster := range clusters {
if clusterServices, err := e.listEcsServices(cluster); err != nil {
log.Printf("generateEnvLayout: list services error: %s, %v", cluster, err)
log.Printf("getLayout: list services error: %s, %v", cluster, err)
return nil, err
} else {
} else if len(clusterServices.ServiceArns) > 0 {
layout.Clusters[cluster] = &manager.Cluster{ServiceTasks: &manager.TaskSet{Tasks: map[string]*manager.Task{}}}
for _, serviceArn := range clusterServices.ServiceArns {
service := e.serviceNameFromArn(serviceArn)
if task := e.componentTask(component, cluster, service); task != nil {
if _, found := layout.Clusters[cluster]; !found {
// We found at least one matching task, so we can start populating the cluster layout.
layout.Clusters[cluster] = &manager.Cluster{ServiceTasks: &manager.TaskSet{Tasks: map[string]*manager.Task{}}}
}
descSvcOutput, err := e.describeEcsService(cluster, service)
if err != nil {
log.Printf("generateEnvLayout: describe service error: %s, %s, %v", cluster, service, err)
return nil, err
}
// Set the task definition to the one currently running. For most cases, this will be overwritten by
// a new definition, but for some cases, we might want to use a layout with currently running
// definitions and not updated ones, e.g. to check if an existing deployment is stable.
task.Id = *descSvcOutput.Services[0].TaskDefinition
layout.Clusters[cluster].ServiceTasks.Tasks[service] = task
casSchedulerFound = casSchedulerFound ||
((component == manager.DeployComponent_Cas) && strings.Contains(service, manager.ServiceSuffix_CasScheduler))
ecsService, err := e.describeEcsService(cluster, service)
if err != nil {
log.Printf("getLayout: describe service error: %s, %s, %v", cluster, service, err)
return nil, err
}
layout.Clusters[cluster].ServiceTasks.Tasks[service] = &manager.Task{Id: *ecsService.Services[0].TaskDefinition}
}
}
}
// If the CAS Scheduler service was present, add the CASv2 worker to the layout since it doesn't get updated through
// an ECS Service.
if casSchedulerFound {
layout.Clusters[casCluster].Tasks = &manager.TaskSet{Tasks: map[string]*manager.Task{
casCluster + "-" + manager.ServiceSuffix_CasWorker: {
Repo: "ceramic-prod-cas-runner",
Temp: true, // Anchor workers do not stay up permanently
Name: manager.ContainerName_CasWorker,
},
}}
}
return layout, nil
}

func (e Ecs) componentTask(component manager.DeployComponent, cluster, service string) *manager.Task {
// Skip any ELP services (e.g. "ceramic-elp-1-1-node")
serviceNameParts := strings.Split(service, "-")
if (len(serviceNameParts) >= 2) && (serviceNameParts[1] == manager.ServiceSuffix_Elp) {
return nil
}
switch component {
case manager.DeployComponent_Ceramic:
if strings.Contains(service, manager.ServiceSuffix_CeramicNode) {
return &manager.Task{Name: manager.ContainerName_CeramicNode}
}
case manager.DeployComponent_Ipfs:
if strings.Contains(service, manager.ServiceSuffix_IpfsNode) {
return &manager.Task{Name: manager.ContainerName_IpfsNode}
}
case manager.DeployComponent_Cas:
// All pre-CASv5 services are only present in the CAS cluster
if cluster == manager.CeramicEnvPfx()+"-cas" {
// Until all environments are moved to CASv2, the CAS Scheduler (CASv2) and CAS Worker (CASv1) ECS Services will
// exist in some environments and not others. This is ok because only if a service exists in an environment will
// we attempt to update it during a deployment.
if strings.Contains(service, manager.ServiceSuffix_CasApi) {
return &manager.Task{Name: manager.ContainerName_CasApi}
} else if strings.Contains(service, manager.ServiceSuffix_CasScheduler) {
// CASv2
return &manager.Task{Name: manager.ContainerName_CasScheduler}
} else if strings.Contains(service, manager.ServiceSuffix_CasWorker) { // CASv1
return &manager.Task{
Repo: "ceramic-prod-cas-runner",
Temp: true, // Anchor workers do not stay up permanently
Name: manager.ContainerName_CasWorker,
}
}
}
case manager.DeployComponent_CasV5:
// All CASv5 services will exist in a separate "app-cas" cluster
if cluster == "app-cas-"+string(e.env) {
if strings.Contains(service, manager.ServiceSuffix_CasScheduler) {
return &manager.Task{Name: manager.ContainerName_CasV5Scheduler}
}
}
default:
log.Printf("componentTask: unknown component: %s", component)
}
return nil
}

func (e Ecs) componentEcrRepo(component manager.DeployComponent) (string, error) {
switch component {
case manager.DeployComponent_Ceramic:
return "ceramic-prod", nil
case manager.DeployComponent_Ipfs:
return "go-ipfs-prod", nil
case manager.DeployComponent_Cas:
return "ceramic-prod-cas", nil
case manager.DeployComponent_CasV5:
return "app-cas-scheduler", nil
default:
return "", fmt.Errorf("componentTask: unknown component: %s", component)
}
}

func (e Ecs) UpdateEnv(layout *manager.Layout, commitHash string) error {
func (e Ecs) UpdateLayout(layout *manager.Layout, commitHash string) error {
for clusterName, cluster := range layout.Clusters {
clusterRepo := layout.Repo
if len(cluster.Repo) > 0 {
Expand All @@ -236,7 +148,7 @@ func (e Ecs) UpdateEnv(layout *manager.Layout, commitHash string) error {
return nil
}

func (e Ecs) CheckEnv(layout *manager.Layout) (bool, error) {
func (e Ecs) CheckLayout(layout *manager.Layout) (bool, error) {
for clusterName, cluster := range layout.Clusters {
if deployed, err := e.checkEnvCluster(cluster, clusterName); err != nil {
return false, err
Expand Down Expand Up @@ -294,7 +206,7 @@ func (e Ecs) runEcsTask(cluster, family, container string, networkConfig *types.
LaunchType: "FARGATE",
NetworkConfiguration: networkConfig,
StartedBy: aws.String(manager.ServiceName),
Tags: []types.Tag{{Key: aws.String(manager.ResourceTag), Value: aws.String(string(e.env))}},
Tags: []types.Tag{{Key: aws.String(resourceTag), Value: aws.String(string(e.env))}},
}
if (overrides != nil) && (len(overrides) > 0) {
overrideEnv := make([]types.KeyValuePair, 0, len(overrides))
Expand Down Expand Up @@ -348,7 +260,7 @@ func (e Ecs) updateEcsTaskDefinition(taskDefArn, image, containerName string) (s
RuntimePlatform: taskDef.RuntimePlatform,
TaskRoleArn: taskDef.TaskRoleArn,
Volumes: taskDef.Volumes,
Tags: []types.Tag{{Key: aws.String(manager.ResourceTag), Value: aws.String(string(e.env))}},
Tags: []types.Tag{{Key: aws.String(resourceTag), Value: aws.String(string(e.env))}},
}
if regTaskDefOutput, err := e.ecsClient.RegisterTaskDefinition(ctx, regTaskDefInput); err != nil {
log.Printf("updateEcsTaskDefinition: register task def error: %s, %s, %s, %v", taskDefArn, image, containerName, err)
Expand Down Expand Up @@ -508,27 +420,27 @@ func (e Ecs) listEcsTasks(cluster, family string) ([]string, error) {
}

func (e Ecs) updateEnvCluster(cluster *manager.Cluster, clusterName, clusterRepo, commitHash string) error {
if err := e.updateEnvTaskSet(cluster.ServiceTasks, manager.DeployType_Service, clusterName, clusterRepo, commitHash); err != nil {
if err := e.updateEnvTaskSet(cluster.ServiceTasks, deployType_Service, clusterName, clusterRepo, commitHash); err != nil {
return err
} else if err = e.updateEnvTaskSet(cluster.Tasks, manager.DeployType_Task, clusterName, clusterRepo, commitHash); err != nil {
} else if err = e.updateEnvTaskSet(cluster.Tasks, deployType_Task, clusterName, clusterRepo, commitHash); err != nil {
return err
}
return nil
}

func (e Ecs) updateEnvTaskSet(taskSet *manager.TaskSet, deployType manager.DeployType, cluster, clusterRepo, commitHash string) error {
func (e Ecs) updateEnvTaskSet(taskSet *manager.TaskSet, deployType string, cluster, clusterRepo, commitHash string) error {
if taskSet != nil {
for taskSetName, task := range taskSet.Tasks {
taskSetRepo := clusterRepo
if len(taskSet.Repo) > 0 {
taskSetRepo = taskSet.Repo
}
switch deployType {
case manager.DeployType_Service:
case deployType_Service:
if err := e.updateEnvServiceTask(task, cluster, taskSetName, taskSetRepo, commitHash); err != nil {
return err
}
case manager.DeployType_Task:
case deployType_Task:
if err := e.updateEnvTask(task, cluster, taskSetName, taskSetRepo, commitHash); err != nil {
return err
}
Expand Down Expand Up @@ -567,29 +479,29 @@ func (e Ecs) updateEnvTask(task *manager.Task, cluster, taskName, taskSetRepo, c
}

func (e Ecs) checkEnvCluster(cluster *manager.Cluster, clusterName string) (bool, error) {
if deployed, err := e.checkEnvTaskSet(cluster.ServiceTasks, manager.DeployType_Service, clusterName); err != nil {
if deployed, err := e.checkEnvTaskSet(cluster.ServiceTasks, deployType_Service, clusterName); err != nil {
return false, err
} else if !deployed {
return false, nil
} else if deployed, err = e.checkEnvTaskSet(cluster.Tasks, manager.DeployType_Task, clusterName); err != nil {
} else if deployed, err = e.checkEnvTaskSet(cluster.Tasks, deployType_Task, clusterName); err != nil {
return false, err
} else {
return deployed, nil
}
}

func (e Ecs) checkEnvTaskSet(taskSet *manager.TaskSet, deployType manager.DeployType, cluster string) (bool, error) {
func (e Ecs) checkEnvTaskSet(taskSet *manager.TaskSet, deployType string, cluster string) (bool, error) {
if taskSet != nil {
for _, task := range taskSet.Tasks {
switch deployType {
case manager.DeployType_Service:
case deployType_Service:
if deployed, err := e.checkEcsService(cluster, task.Id); err != nil {
return false, err
} else if !deployed {
return false, nil
}
return true, nil
case manager.DeployType_Task:
case deployType_Task:
// Only check tasks that are meant to stay up permanently
if !task.Temp {
if deployed, err := e.CheckTask(cluster, "", true, true, task.Id); err != nil {
Expand Down
30 changes: 21 additions & 9 deletions cd/manager/jobmanager/jobManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,26 @@ type JobManager struct {
waitGroup *sync.WaitGroup
}

const (
tests_Name = "Post-Deployment Tests"
tests_Org = "3box"
tests_Repo = "ceramic-tests"
tests_Ref = "main"
tests_Workflow = "run-durable.yml"
tests_Selector = "fast"
)

const defaultCasMaxAnchorWorkers = 1
const defaultCasMinAnchorWorkers = 0

func NewJobManager(cache manager.Cache, db manager.Database, d manager.Deployment, apiGw manager.ApiGw, repo manager.Repository, notifs manager.Notifs) (manager.Manager, error) {
maxAnchorJobs := manager.DefaultCasMaxAnchorWorkers
maxAnchorJobs := defaultCasMaxAnchorWorkers
if configMaxAnchorWorkers, found := os.LookupEnv("CAS_MAX_ANCHOR_WORKERS"); found {
if parsedMaxAnchorWorkers, err := strconv.Atoi(configMaxAnchorWorkers); err == nil {
maxAnchorJobs = parsedMaxAnchorWorkers
}
}
minAnchorJobs := manager.DefaultCasMinAnchorWorkers
minAnchorJobs := defaultCasMinAnchorWorkers
if configMinAnchorWorkers, found := os.LookupEnv("CAS_MIN_ANCHOR_WORKERS"); found {
if parsedMinAnchorWorkers, err := strconv.Atoi(configMinAnchorWorkers); err == nil {
minAnchorJobs = parsedMinAnchorWorkers
Expand All @@ -51,7 +63,7 @@ func NewJobManager(cache manager.Cache, db manager.Database, d manager.Deploymen
return nil, fmt.Errorf("newJobManager: invalid anchor worker config: %d, %d", minAnchorJobs, maxAnchorJobs)
}
paused, _ := strconv.ParseBool(os.Getenv("PAUSED"))
return &JobManager{cache, db, d, apiGw, repo, notifs, maxAnchorJobs, minAnchorJobs, paused, manager.EnvType(os.Getenv("ENV")), new(sync.WaitGroup)}, nil
return &JobManager{cache, db, d, apiGw, repo, notifs, maxAnchorJobs, minAnchorJobs, paused, manager.EnvType(os.Getenv(manager.EnvVar_Env)), new(sync.WaitGroup)}, nil
}

func (m *JobManager) NewJob(jobState job.JobState) (job.JobState, error) {
Expand Down Expand Up @@ -474,14 +486,14 @@ func (m *JobManager) postProcessJob(jobState job.JobState) {
Type: job.JobType_Workflow,
Params: map[string]interface{}{
job.JobParam_Source: manager.ServiceName,
job.WorkflowJobParam_Name: manager.Tests_Name,
job.WorkflowJobParam_Org: manager.Tests_Org,
job.WorkflowJobParam_Repo: manager.Tests_Repo,
job.WorkflowJobParam_Ref: manager.Tests_Ref,
job.WorkflowJobParam_Workflow: manager.Tests_Workflow,
job.WorkflowJobParam_Name: tests_Name,
job.WorkflowJobParam_Org: tests_Org,
job.WorkflowJobParam_Repo: tests_Repo,
job.WorkflowJobParam_Ref: tests_Ref,
job.WorkflowJobParam_Workflow: tests_Workflow,
job.WorkflowJobParam_Inputs: map[string]interface{}{
job.WorkflowJobParam_Environment: m.env,
job.WorkflowJobParam_TestSelector: manager.Tests_Selector,
job.WorkflowJobParam_TestSelector: tests_Selector,
},
},
}); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cd/manager/jobs/anchor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type anchorJob struct {
}

func AnchorJob(jobState job.JobState, db manager.Database, notifs manager.Notifs, d manager.Deployment) manager.JobSm {
return &anchorJob{baseJob{jobState, db, notifs}, os.Getenv("ENV"), d}
return &anchorJob{baseJob{jobState, db, notifs}, os.Getenv(manager.EnvVar_Env), d}
}

func (a anchorJob) Advance() (job.JobState, error) {
Expand Down
Loading

0 comments on commit 04a76aa

Please sign in to comment.