Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Development: Add endpoints for build agent information in local continuous integration #7785

Merged
merged 17 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package de.tum.in.www1.artemis.service.connectors.localci;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -11,6 +9,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import com.hazelcast.collection.IQueue;
Expand All @@ -26,6 +25,7 @@
import de.tum.in.www1.artemis.repository.ParticipationRepository;
import de.tum.in.www1.artemis.repository.ProgrammingExerciseRepository;
import de.tum.in.www1.artemis.security.SecurityUtils;
import de.tum.in.www1.artemis.service.connectors.localci.dto.LocalCIBuildAgentInformation;
import de.tum.in.www1.artemis.service.connectors.localci.dto.LocalCIBuildJobQueueItem;
import de.tum.in.www1.artemis.service.connectors.localci.dto.LocalCIBuildResult;
import de.tum.in.www1.artemis.service.programming.ProgrammingExerciseGradingService;
Expand Down Expand Up @@ -59,6 +59,8 @@ public class LocalCISharedBuildJobQueueService {
*/
private final IMap<String, LocalCIBuildJobQueueItem> processingJobs;

private final IMap<String, LocalCIBuildAgentInformation> buildAgentInformation;

private AtomicInteger localProcessingJobs = new AtomicInteger(0);

/**
Expand All @@ -83,6 +85,7 @@ public LocalCISharedBuildJobQueueService(HazelcastInstance hazelcastInstance, Ex
this.programmingExerciseGradingService = programmingExerciseGradingService;
this.programmingMessagingService = programmingMessagingService;
this.programmingExerciseRepository = programmingExerciseRepository;
this.buildAgentInformation = this.hazelcastInstance.getMap("buildAgentInformation");
this.processingJobs = this.hazelcastInstance.getMap("processingJobs");
this.sharedLock = this.hazelcastInstance.getCPSubsystem().getLock("buildJobQueueLock");
this.queue = this.hazelcastInstance.getQueue("buildJobQueue");
Expand Down Expand Up @@ -121,6 +124,12 @@ public List<LocalCIBuildJobQueueItem> getProcessingJobsForCourse(long courseId)
return processingJobs.values().stream().filter(job -> job.getCourseId() == courseId).toList();
}

public List<LocalCIBuildAgentInformation> getBuildAgentInformation() {
// Remove build agent information of offline nodes
removeOfflineNodes();
return buildAgentInformation.values().stream().toList();
}

/**
* Remove all queued build jobs for a participation from the shared build job queue.
*
Expand All @@ -137,35 +146,18 @@ public void removeQueuedJobsForParticipation(long participationId) {
}

/**
* Retrieve participation from database with retries.
* This is necessary because the participation might not be persisted in the database yet.
*
* @param participationId id of the participation
* Wait 3 minutes after startup and then every 1 minute update the build agent information of the local hazelcast member.
* This is necessary because the build agent information is not updated automatically when a node joins the cluster.
*/
private ProgrammingExerciseParticipation retrieveParticipationWithRetry(Long participationId) {
int maxRetries = 5;
int retries = 0;
ProgrammingExerciseParticipation participation;
Optional<Participation> tempParticipation;
while (retries < maxRetries) {
tempParticipation = participationRepository.findById(participationId);
if (tempParticipation.isPresent()) {
participation = (ProgrammingExerciseParticipation) tempParticipation.get();
return participation;
}
else {
log.debug("Could not retrieve participation with id {} from database", participationId);
log.info("Retrying to retrieve participation with id {} from database", participationId);
retries++;
try {
Thread.sleep(1000);
}
catch (InterruptedException e1) {
log.error("Error while waiting for participation with id {} to be persisted in database", participationId, e1);
}
}
@Scheduled(initialDelay = 60000, fixedRate = 60000) // 1 minute initial delay, 1 minute fixed rate
public void updateBuildAgentInformation() {
// Remove build agent information of offline nodes
removeOfflineNodes();

// Add build agent information of local hazelcast member to map if not already present
if (!buildAgentInformation.containsKey(hazelcastInstance.getCluster().getLocalMember().getAddress().toString())) {
updateLocalBuildAgentInformation();
}
throw new IllegalStateException("Could not retrieve participation with id " + participationId + " from database after " + maxRetries + " retries.");
}

/**
Expand All @@ -175,6 +167,11 @@ private ProgrammingExerciseParticipation retrieveParticipationWithRetry(Long par
private void checkAvailabilityAndProcessNextBuild() {
// Check conditions before acquiring the lock to avoid unnecessary locking
if (!nodeIsAvailable()) {
// Add build agent information of local hazelcast member to map if not already present
if (!buildAgentInformation.containsKey(hazelcastInstance.getCluster().getLocalMember().getAddress().toString())) {
updateLocalBuildAgentInformation();
}

log.info("Node has no available threads currently");
return;
}
Expand Down Expand Up @@ -211,20 +208,39 @@ private void checkAvailabilityAndProcessNextBuild() {
private LocalCIBuildJobQueueItem addToProcessingJobs() {
LocalCIBuildJobQueueItem buildJob = queue.poll();
if (buildJob != null) {
String hazelcastMemberAddress = hazelcastInstance.getCluster().getLocalMember().getAddress().toString();
buildJob.setBuildAgentAddress(hazelcastMemberAddress);
buildJob.setBuildStartDate(System.currentTimeMillis());
buildJob.setExpirationTime(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(180));
processingJobs.put(buildJob.getId(), buildJob);
localProcessingJobs.incrementAndGet();

updateLocalBuildAgentInformation();
}
return buildJob;
}

/**
* Checks whether the node has at least one thread available for a new build job.
*/
private boolean nodeIsAvailable() {
log.info("Current active threads: {}", localCIBuildExecutorService.getActiveCount());
return localProcessingJobs.get() < localCIBuildExecutorService.getMaximumPoolSize();
private void updateLocalBuildAgentInformation() {
// Add/update
String memberAddress = hazelcastInstance.getCluster().getLocalMember().getAddress().toString();
List<LocalCIBuildJobQueueItem> processingJobsOfMember = getProcessingJobsOfNode(memberAddress);
int numberOfCurrentBuildJobs = processingJobsOfMember.size();
int maxNumberOfConcurrentBuilds = localCIBuildExecutorService.getMaximumPoolSize();
LocalCIBuildAgentInformation info = new LocalCIBuildAgentInformation(memberAddress, maxNumberOfConcurrentBuilds, numberOfCurrentBuildJobs, processingJobsOfMember);
buildAgentInformation.put(memberAddress, info);
}

private List<LocalCIBuildJobQueueItem> getProcessingJobsOfNode(String memberAddress) {
return processingJobs.values().stream().filter(job -> Objects.equals(job.getBuildAgentAddress(), memberAddress)).toList();
}

private void removeOfflineNodes() {
List<String> memberAddresses = hazelcastInstance.getCluster().getMembers().stream().map(member -> member.getAddress().toString()).toList();
for (String key : buildAgentInformation.keySet()) {
if (!memberAddresses.contains(key)) {
buildAgentInformation.remove(key);
}
}
}

/**
Expand All @@ -251,6 +267,15 @@ private void processBuild(LocalCIBuildJobQueueItem buildJob) {
log.error("Cannot process build job for participation with id {} because it could not be retrieved from the database.", buildJob.getParticipationId());
processingJobs.remove(buildJob.getId());
localProcessingJobs.decrementAndGet();
updateLocalBuildAgentInformation();
checkAvailabilityAndProcessNextBuild();
return;
}
catch (Exception e) {
log.error("Cannot process build job for participation with id {} because of an unexpected error.", buildJob.getParticipationId(), e);
processingJobs.remove(buildJob.getId());
localProcessingJobs.decrementAndGet();
updateLocalBuildAgentInformation();
checkAvailabilityAndProcessNextBuild();
return;
}
Expand Down Expand Up @@ -286,6 +311,7 @@ private void processBuild(LocalCIBuildJobQueueItem buildJob) {
// after processing a build job, remove it from the processing jobs
processingJobs.remove(buildJob.getId());
localProcessingJobs.decrementAndGet();
updateLocalBuildAgentInformation();

// process next build job if node is available
checkAvailabilityAndProcessNextBuild();
Expand All @@ -294,6 +320,7 @@ private void processBuild(LocalCIBuildJobQueueItem buildJob) {

processingJobs.remove(buildJob.getId());
localProcessingJobs.decrementAndGet();
updateLocalBuildAgentInformation();

if (buildJob.getRetryCount() > 0) {
log.error("Build job failed for the second time: {}", buildJob);
Expand All @@ -314,6 +341,46 @@ private void processBuild(LocalCIBuildJobQueueItem buildJob) {
});
}

/**
* Checks whether the node has at least one thread available for a new build job.
*/
private boolean nodeIsAvailable() {
log.info("Current active threads: {}", localCIBuildExecutorService.getActiveCount());
return localProcessingJobs.get() < localCIBuildExecutorService.getMaximumPoolSize();
}

/**
* Retrieve participation from database with retries.
* This is necessary because the participation might not be persisted in the database yet.
*
* @param participationId id of the participation
*/
private ProgrammingExerciseParticipation retrieveParticipationWithRetry(Long participationId) {
int maxRetries = 5;
int retries = 0;
ProgrammingExerciseParticipation participation;
Optional<Participation> tempParticipation;
while (retries < maxRetries) {
tempParticipation = participationRepository.findById(participationId);
if (tempParticipation.isPresent()) {
participation = (ProgrammingExerciseParticipation) tempParticipation.get();
return participation;
}
else {
log.debug("Could not retrieve participation with id {} from database", participationId);
log.info("Retrying to retrieve participation with id {} from database", participationId);
retries++;
try {
Thread.sleep(1000);
}
catch (InterruptedException e1) {
log.error("Error while waiting for participation with id {} to be persisted in database", participationId, e1);
}
}
}
throw new IllegalStateException("Could not retrieve participation with id " + participationId + " from database after " + maxRetries + " retries.");
}

private class BuildJobItemListener implements ItemListener<LocalCIBuildJobQueueItem> {

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package de.tum.in.www1.artemis.service.connectors.localci.dto;

import java.io.Serial;
import java.io.Serializable;
import java.util.List;

public class LocalCIBuildAgentInformation implements Serializable {

@Serial
private static final long serialVersionUID = 1L;

private String name;

private int maxNumberOfConcurrentBuildJobs;

private int numberOfCurrentBuildJobs;

private List<LocalCIBuildJobQueueItem> runningBuildJobs;

public LocalCIBuildAgentInformation(String name, int maxNumberOfConcurrentBuildJobs, int numberOfCurrentBuildJobs, List<LocalCIBuildJobQueueItem> runningBuildJobs) {
this.name = name;
this.maxNumberOfConcurrentBuildJobs = maxNumberOfConcurrentBuildJobs;
this.numberOfCurrentBuildJobs = numberOfCurrentBuildJobs;
this.runningBuildJobs = runningBuildJobs;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getMaxNumberOfConcurrentBuildJobs() {
return maxNumberOfConcurrentBuildJobs;
}

public void setMaxNumberOfConcurrentBuildJobs(int maxNumberOfConcurrentBuildJobs) {
this.maxNumberOfConcurrentBuildJobs = maxNumberOfConcurrentBuildJobs;
}

public int getNumberOfCurrentBuildJobs() {
return numberOfCurrentBuildJobs;
}

public void setNumberOfCurrentBuildJobs(int numberOfCurrentBuildJobs) {
this.numberOfCurrentBuildJobs = numberOfCurrentBuildJobs;
}

public List<LocalCIBuildJobQueueItem> getRunningBuildJobs() {
return runningBuildJobs;
}

public void setRunningBuildJobs(List<LocalCIBuildJobQueueItem> runningBuildJobs) {
this.runningBuildJobs = runningBuildJobs;
}

@Override
public String toString() {
return "LocalCIBuildAgentInformation{" + "name='" + name + '\'' + ", maxNumberOfConcurrentBuildJobs=" + maxNumberOfConcurrentBuildJobs + ", numberOfCurrentBuildJobs="
+ numberOfCurrentBuildJobs + ", runningBuildJobs=" + runningBuildJobs + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public class LocalCIBuildJobQueueItem implements Serializable {

private String name;

private String buildAgentAddress;

private long participationId;

private String commitHash;
Expand Down Expand Up @@ -58,6 +60,14 @@ public void setName(String name) {
this.name = name;
}

public String getBuildAgentAddress() {
return buildAgentAddress;
}

public void setBuildAgentAddress(String buildAgentAddress) {
this.buildAgentAddress = buildAgentAddress;
}

public long getParticipationId() {
return participationId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import de.tum.in.www1.artemis.security.annotations.EnforceAdmin;
import de.tum.in.www1.artemis.service.connectors.localci.LocalCISharedBuildJobQueueService;
import de.tum.in.www1.artemis.service.connectors.localci.dto.LocalCIBuildAgentInformation;
import de.tum.in.www1.artemis.service.connectors.localci.dto.LocalCIBuildJobQueueItem;

@Profile("localci")
Expand Down Expand Up @@ -52,4 +53,17 @@ public ResponseEntity<List<LocalCIBuildJobQueueItem>> getRunningBuildJobs() {
List<LocalCIBuildJobQueueItem> runningBuildJobs = localCIBuildJobQueueService.getProcessingJobs();
return ResponseEntity.ok(runningBuildJobs);
}

/**
* Returns information on available build agents
*
* @return list of build agents information
*/
@GetMapping("/build-job-queue/build-agents")
@EnforceAdmin
public ResponseEntity<List<LocalCIBuildAgentInformation>> getBuildAgentInformation() {
log.debug("REST request to get information on available build agents");
List<LocalCIBuildAgentInformation> buildAgentInfo = localCIBuildJobQueueService.getBuildAgentInformation();
return ResponseEntity.ok(buildAgentInfo);
}
}
Loading