Skip to content

Commit

Permalink
testing with waitUntil
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Galitzky <amgalitz@amazon.com>
  • Loading branch information
amitgalitz committed Jan 12, 2024
1 parent 11087de commit 9e3abdc
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.client.RestClientBuilder.DEFAULT_MAX_CONN_PER_ROUTE;
import static org.opensearch.client.RestClientBuilder.DEFAULT_MAX_CONN_TOTAL;
Expand Down Expand Up @@ -490,6 +491,25 @@ protected void getAndAssertWorkflowStatus(String workflowId, State stateStatus,

}

public void waitForProvisioningStatus(String workflowId, ProvisioningProgress provisioningProgress) throws InterruptedException {
AtomicBoolean provisioningStatusDone = new AtomicBoolean(false);
waitUntil(() -> {
try {
Response response = getWorkflowStatus(workflowId, true);
assertEquals(RestStatus.OK, TestHelpers.restStatus(response));
Map<String, Object> responseMap = entityAsMap(response);
String state = (String) responseMap.get(CommonValue.PROVISIONING_PROGRESS_FIELD);
if (provisioningProgress.name().equals(state)) {
provisioningStatusDone.set(true);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return provisioningStatusDone.get();
}, 90, TimeUnit.SECONDS);
assertTrue(provisioningStatusDone.get());
}

protected void getAndAssertWorkflowStep() throws Exception {
Response response = getWorkflowStep();
assertEquals(RestStatus.OK, TestHelpers.restStatus(response));
Expand All @@ -505,11 +525,12 @@ protected void getAndAssertWorkflowStep() throws Exception {
protected List<ResourceCreated> getResourcesCreated(String workflowId, int timeout) throws Exception {

// wait and ensure state is completed/done
assertBusy(
() -> { getAndAssertWorkflowStatus(workflowId, State.COMPLETED, ProvisioningProgress.DONE); },
timeout,
TimeUnit.SECONDS
);
// assertBusy(
// () -> { getAndAssertWorkflowStatus(workflowId, State.COMPLETED, ProvisioningProgress.DONE); },
// timeout,
// TimeUnit.SECONDS
// );
waitForProvisioningStatus(workflowId, ProvisioningProgress.DONE);

Response response = getWorkflowStatus(workflowId, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.common.CommonValue.CREDENTIAL_FIELD;
Expand Down Expand Up @@ -188,11 +187,14 @@ public void testCreateAndProvisionRemoteModelWorkflow() throws Exception {
response = provisionWorkflow(workflowId);
assertEquals(RestStatus.OK, TestHelpers.restStatus(response));
Thread.sleep(1000);
assertBusy(
() -> { getAndAssertWorkflowStatus(workflowId, State.PROVISIONING, ProvisioningProgress.IN_PROGRESS); },
30,
TimeUnit.SECONDS
);

// waitForProvisioningStatus(workflowId, ProvisioningProgress.DONE);

// assertBusy(
// () -> { getAndAssertWorkflowStatus(workflowId, State.PROVISIONING, ProvisioningProgress.IN_PROGRESS); },
// 30,
// TimeUnit.SECONDS
// );

// Wait until provisioning has completed successfully before attempting to retrieve created resources
List<ResourceCreated> resourcesCreated = getResourcesCreated(workflowId, 90);
Expand All @@ -207,7 +209,7 @@ public void testCreateAndProvisionRemoteModelWorkflow() throws Exception {
assertNotNull(resourcesCreated.get(2).resourceId());

// // Deprovision the workflow to avoid opening circut breaker when running additional tests
// Response deprovisionResponse = deprovisionWorkflow(workflowId);
Response deprovisionResponse = deprovisionWorkflow(workflowId);

// wait for deprovision to complete
Thread.sleep(5000);
Expand All @@ -222,8 +224,8 @@ public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception {
Map<String, Object> responseMap = entityAsMap(response);
String workflowId = (String) responseMap.get(WORKFLOW_ID);
// wait and ensure state is completed/done
assertBusy(() -> { getAndAssertWorkflowStatus(workflowId, State.COMPLETED, ProvisioningProgress.DONE); }, 60, TimeUnit.SECONDS);

// assertBusy(() -> { getAndAssertWorkflowStatus(workflowId, State.COMPLETED, ProvisioningProgress.DONE); }, 60, TimeUnit.SECONDS);
waitForProvisioningStatus(workflowId, ProvisioningProgress.DONE);
// Hit Search State API with the workflow id created above
String query = "{\"query\":{\"ids\":{\"values\":[\"" + workflowId + "\"]}}}";
SearchResponse searchResponse = searchWorkflowState(query);
Expand All @@ -246,12 +248,12 @@ public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception {

// Hit Deprovision API
Response deprovisionResponse = deprovisionWorkflow(workflowId);
assertBusy(
() -> { getAndAssertWorkflowStatus(workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); },
60,
TimeUnit.SECONDS
);

// assertBusy(
// () -> { getAndAssertWorkflowStatus(workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); },
// 60,
// TimeUnit.SECONDS
// );
waitForProvisioningStatus(workflowId, ProvisioningProgress.NOT_STARTED);
// Hit Delete API
Response deleteResponse = deleteWorkflow(workflowId);
assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse));
Expand Down

0 comments on commit 9e3abdc

Please sign in to comment.