From 804393969cfdf9a948171ff7992dcb75f2321ed4 Mon Sep 17 00:00:00 2001 From: Michiel Westerbeek Date: Fri, 16 May 2025 14:25:16 +0200 Subject: [PATCH 1/5] Added test with a signal channel and continue-as-new. This results in a stuck workflow. --- .../workflow_tests/continue_as_new.rs | 53 ++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/tests/integ_tests/workflow_tests/continue_as_new.rs b/tests/integ_tests/workflow_tests/continue_as_new.rs index 262612c50..0382994c2 100644 --- a/tests/integ_tests/workflow_tests/continue_as_new.rs +++ b/tests/integ_tests/workflow_tests/continue_as_new.rs @@ -1,6 +1,8 @@ use std::time::Duration; -use temporal_client::WorkflowOptions; +use temporal_client::{SignalWithStartOptions, WorkflowOptions}; use temporal_sdk::{WfContext, WfExitValue, WorkflowResult}; +use temporal_sdk_core::WorkflowClientTrait; +use temporal_sdk_core_protos::coresdk::IntoPayloadsExt; use temporal_sdk_core_protos::coresdk::workflow_commands::ContinueAsNewWorkflowExecution; use temporal_sdk_core_test_utils::CoreWfStarter; @@ -63,3 +65,52 @@ async fn continue_as_new_multiple_concurrent() { } worker.run_until_done().await.unwrap(); } + +const SIGNAME: &str = "signame"; + +async fn continue_as_new_wf_with_sigchan(ctx: WfContext) -> WorkflowResult<()> { + let continued_from_execution_run_id = + &ctx.workflow_initial_info().continued_from_execution_run_id; + + if continued_from_execution_run_id != "" { + Ok(WfExitValue::Normal(())) + } else { + let _sigchan = ctx.make_signal_channel(SIGNAME); + // Even if we drain the channel, the above line makes the workflow stuck. + // sigchan.drain_all(); + + return Ok(WfExitValue::continue_as_new( + ContinueAsNewWorkflowExecution { + arguments: vec![[2].into()].into(), + ..Default::default() + }, + )); + } +} + +#[tokio::test] +async fn continue_as_new_with_sigchan() { + let wf_name = "continue_as_new_with_sigchan"; + let mut starter = CoreWfStarter::new(wf_name); + starter.worker_config.no_remote_activities(true); + let mut worker = starter.worker().await; + worker.register_wf(wf_name.to_string(), continue_as_new_wf_with_sigchan); + + let client = starter.get_client().await; + let options = SignalWithStartOptions::builder() + .task_queue(worker.inner_mut().task_queue()) + .workflow_id(wf_name) + .workflow_type(wf_name) + .input(vec![[1].into()].into_payloads().unwrap()) + .signal_name(SIGNAME) + .signal_input(vec![b"tada".into()].into_payloads()) + .build() + .unwrap(); + let res = client + .signal_with_start_workflow_execution(options, WorkflowOptions::default()) + .await + .expect("request succeeds.qed"); + + worker.expect_workflow_completion(wf_name, Some(res.run_id)); + worker.run_until_done().await.unwrap(); +} From fbd64371fe2239883184186aae882dde67789cca Mon Sep 17 00:00:00 2001 From: Michiel Westerbeek Date: Fri, 16 May 2025 14:57:21 +0200 Subject: [PATCH 2/5] show that .next() properly drains --- .../integ_tests/workflow_tests/continue_as_new.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/tests/integ_tests/workflow_tests/continue_as_new.rs b/tests/integ_tests/workflow_tests/continue_as_new.rs index 0382994c2..bafb07fd0 100644 --- a/tests/integ_tests/workflow_tests/continue_as_new.rs +++ b/tests/integ_tests/workflow_tests/continue_as_new.rs @@ -1,3 +1,4 @@ +use futures_util::StreamExt; use std::time::Duration; use temporal_client::{SignalWithStartOptions, WorkflowOptions}; use temporal_sdk::{WfContext, WfExitValue, WorkflowResult}; @@ -5,6 +6,7 @@ use temporal_sdk_core::WorkflowClientTrait; use temporal_sdk_core_protos::coresdk::IntoPayloadsExt; use temporal_sdk_core_protos::coresdk::workflow_commands::ContinueAsNewWorkflowExecution; use temporal_sdk_core_test_utils::CoreWfStarter; +use tracing::info; async fn continue_as_new_wf(ctx: WfContext) -> WorkflowResult<()> { let run_ct = ctx.get_args()[0].data[0]; @@ -75,9 +77,15 @@ async fn continue_as_new_wf_with_sigchan(ctx: WfContext) -> WorkflowResult<()> { if continued_from_execution_run_id != "" { Ok(WfExitValue::Normal(())) } else { - let _sigchan = ctx.make_signal_channel(SIGNAME); - // Even if we drain the channel, the above line makes the workflow stuck. - // sigchan.drain_all(); + let mut sigchan = ctx.make_signal_channel(SIGNAME); + + // If we drain the channel with drain_all, we get no signals and the workflow stays stuck. + let signals = sigchan.drain_all(); + info!("Signals received: {:?}", signals); + + // If we drain it with .next(), we get a signal and the workflow gets unstuck. + // let signal = sigchan.next().await.unwrap(); + // info!("Signal received: {:?}", signal); return Ok(WfExitValue::continue_as_new( ContinueAsNewWorkflowExecution { From 851ddd9cc16e3902652120fbd8b89276cedb7645 Mon Sep 17 00:00:00 2001 From: Michiel Westerbeek Date: Fri, 16 May 2025 15:28:28 +0200 Subject: [PATCH 3/5] moved the test into signals tests, it's not related to continue_as_new --- .../workflow_tests/continue_as_new.rs | 55 ------------------- tests/integ_tests/workflow_tests/signals.rs | 44 ++++++++++++++- 2 files changed, 43 insertions(+), 56 deletions(-) diff --git a/tests/integ_tests/workflow_tests/continue_as_new.rs b/tests/integ_tests/workflow_tests/continue_as_new.rs index bafb07fd0..f68e88ec5 100644 --- a/tests/integ_tests/workflow_tests/continue_as_new.rs +++ b/tests/integ_tests/workflow_tests/continue_as_new.rs @@ -67,58 +67,3 @@ async fn continue_as_new_multiple_concurrent() { } worker.run_until_done().await.unwrap(); } - -const SIGNAME: &str = "signame"; - -async fn continue_as_new_wf_with_sigchan(ctx: WfContext) -> WorkflowResult<()> { - let continued_from_execution_run_id = - &ctx.workflow_initial_info().continued_from_execution_run_id; - - if continued_from_execution_run_id != "" { - Ok(WfExitValue::Normal(())) - } else { - let mut sigchan = ctx.make_signal_channel(SIGNAME); - - // If we drain the channel with drain_all, we get no signals and the workflow stays stuck. - let signals = sigchan.drain_all(); - info!("Signals received: {:?}", signals); - - // If we drain it with .next(), we get a signal and the workflow gets unstuck. - // let signal = sigchan.next().await.unwrap(); - // info!("Signal received: {:?}", signal); - - return Ok(WfExitValue::continue_as_new( - ContinueAsNewWorkflowExecution { - arguments: vec![[2].into()].into(), - ..Default::default() - }, - )); - } -} - -#[tokio::test] -async fn continue_as_new_with_sigchan() { - let wf_name = "continue_as_new_with_sigchan"; - let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); - let mut worker = starter.worker().await; - worker.register_wf(wf_name.to_string(), continue_as_new_wf_with_sigchan); - - let client = starter.get_client().await; - let options = SignalWithStartOptions::builder() - .task_queue(worker.inner_mut().task_queue()) - .workflow_id(wf_name) - .workflow_type(wf_name) - .input(vec![[1].into()].into_payloads().unwrap()) - .signal_name(SIGNAME) - .signal_input(vec![b"tada".into()].into_payloads()) - .build() - .unwrap(); - let res = client - .signal_with_start_workflow_execution(options, WorkflowOptions::default()) - .await - .expect("request succeeds.qed"); - - worker.expect_workflow_completion(wf_name, Some(res.run_id)); - worker.run_until_done().await.unwrap(); -} diff --git a/tests/integ_tests/workflow_tests/signals.rs b/tests/integ_tests/workflow_tests/signals.rs index 541be1ecf..09addc684 100644 --- a/tests/integ_tests/workflow_tests/signals.rs +++ b/tests/integ_tests/workflow_tests/signals.rs @@ -3,10 +3,11 @@ use std::collections::HashMap; use futures_util::StreamExt; use temporal_client::{SignalWithStartOptions, WorkflowClientTrait, WorkflowOptions}; use temporal_sdk::{ - ChildWorkflowOptions, Signal, SignalWorkflowOptions, WfContext, WorkflowResult, + ChildWorkflowOptions, Signal, SignalWorkflowOptions, WfContext, WfExitValue, WorkflowResult, }; use temporal_sdk_core_protos::{coresdk::IntoPayloadsExt, temporal::api::common::v1::Payload}; use temporal_sdk_core_test_utils::CoreWfStarter; +use tracing::info; use uuid::Uuid; const SIGNAME: &str = "signame"; @@ -162,3 +163,44 @@ async fn sends_signal_to_child() { .unwrap(); worker.run_until_done().await.unwrap(); } + +async fn drain_sigchan_wf(ctx: WfContext) -> WorkflowResult<()> { + let mut sigchan = ctx.make_signal_channel(SIGNAME); + + // If we drain the channel with drain_all, we get no signals and the workflow stays stuck. + let signals = sigchan.drain_all(); + info!("Signals received: {:?}", signals); + + // If we drain it with .next(), we get a signal and the workflow gets unstuck. + // let signal = sigchan.next().await.unwrap(); + // info!("Signal received: {:?}", signal); + + Ok(WfExitValue::Normal(())) +} + +#[tokio::test] +async fn signal_with_start_drain() { + let wf_name = "drain_sigchan"; + let mut starter = CoreWfStarter::new(wf_name); + starter.worker_config.no_remote_activities(true); + let mut worker = starter.worker().await; + worker.register_wf(wf_name.to_string(), drain_sigchan_wf); + + let client = starter.get_client().await; + let options = SignalWithStartOptions::builder() + .task_queue(worker.inner_mut().task_queue()) + .workflow_id(wf_name) + .workflow_type(wf_name) + .input(vec![[1].into()].into_payloads().unwrap()) + .signal_name(SIGNAME) + .signal_input(vec![b"tada".into()].into_payloads()) + .build() + .unwrap(); + let res = client + .signal_with_start_workflow_execution(options, WorkflowOptions::default()) + .await + .expect("request succeeds.qed"); + + worker.expect_workflow_completion(wf_name, Some(res.run_id)); + worker.run_until_done().await.unwrap(); +} From 18357fa73a8d217e5854aaebab951e53cbfa76cf Mon Sep 17 00:00:00 2001 From: Michiel Westerbeek Date: Fri, 16 May 2025 16:16:03 +0200 Subject: [PATCH 4/5] added a 0s timer to the sigchan drain workflow, which fixes the problem --- .../workflow_tests/continue_as_new.rs | 4 +--- tests/integ_tests/workflow_tests/signals.rs | 19 +++++++++++-------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/tests/integ_tests/workflow_tests/continue_as_new.rs b/tests/integ_tests/workflow_tests/continue_as_new.rs index f68e88ec5..b195812f3 100644 --- a/tests/integ_tests/workflow_tests/continue_as_new.rs +++ b/tests/integ_tests/workflow_tests/continue_as_new.rs @@ -1,12 +1,10 @@ -use futures_util::StreamExt; use std::time::Duration; -use temporal_client::{SignalWithStartOptions, WorkflowOptions}; +use temporal_client::WorkflowOptions; use temporal_sdk::{WfContext, WfExitValue, WorkflowResult}; use temporal_sdk_core::WorkflowClientTrait; use temporal_sdk_core_protos::coresdk::IntoPayloadsExt; use temporal_sdk_core_protos::coresdk::workflow_commands::ContinueAsNewWorkflowExecution; use temporal_sdk_core_test_utils::CoreWfStarter; -use tracing::info; async fn continue_as_new_wf(ctx: WfContext) -> WorkflowResult<()> { let run_ct = ctx.get_args()[0].data[0]; diff --git a/tests/integ_tests/workflow_tests/signals.rs b/tests/integ_tests/workflow_tests/signals.rs index 09addc684..2421fc448 100644 --- a/tests/integ_tests/workflow_tests/signals.rs +++ b/tests/integ_tests/workflow_tests/signals.rs @@ -1,6 +1,6 @@ -use std::collections::HashMap; - use futures_util::StreamExt; +use std::collections::HashMap; +use std::time::Duration; use temporal_client::{SignalWithStartOptions, WorkflowClientTrait, WorkflowOptions}; use temporal_sdk::{ ChildWorkflowOptions, Signal, SignalWorkflowOptions, WfContext, WfExitValue, WorkflowResult, @@ -165,16 +165,19 @@ async fn sends_signal_to_child() { } async fn drain_sigchan_wf(ctx: WfContext) -> WorkflowResult<()> { - let mut sigchan = ctx.make_signal_channel(SIGNAME); + let sigchan = ctx.make_signal_channel(SIGNAME); - // If we drain the channel with drain_all, we get no signals and the workflow stays stuck. + // If we drain the channel with drain_all here, we get no signals and the workflow stays stuck. + // let signals = sigchan.drain_all(); + // info!("Signals received: {:?}", signals); + + // If we add this timer, the workflow gets unstuck (even without drain_all). + ctx.timer(Duration::from_secs(0)).await; + + // If we drain the channel with drain_all here, *after* the timer, we get signals and the workflow is not stuck. let signals = sigchan.drain_all(); info!("Signals received: {:?}", signals); - // If we drain it with .next(), we get a signal and the workflow gets unstuck. - // let signal = sigchan.next().await.unwrap(); - // info!("Signal received: {:?}", signal); - Ok(WfExitValue::Normal(())) } From 207ada492b009a7141a2e2d7809d8f485a03599e Mon Sep 17 00:00:00 2001 From: Michiel Westerbeek Date: Fri, 16 May 2025 16:16:51 +0200 Subject: [PATCH 5/5] removed unused imports --- tests/integ_tests/workflow_tests/continue_as_new.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/integ_tests/workflow_tests/continue_as_new.rs b/tests/integ_tests/workflow_tests/continue_as_new.rs index b195812f3..262612c50 100644 --- a/tests/integ_tests/workflow_tests/continue_as_new.rs +++ b/tests/integ_tests/workflow_tests/continue_as_new.rs @@ -1,8 +1,6 @@ use std::time::Duration; use temporal_client::WorkflowOptions; use temporal_sdk::{WfContext, WfExitValue, WorkflowResult}; -use temporal_sdk_core::WorkflowClientTrait; -use temporal_sdk_core_protos::coresdk::IntoPayloadsExt; use temporal_sdk_core_protos::coresdk::workflow_commands::ContinueAsNewWorkflowExecution; use temporal_sdk_core_test_utils::CoreWfStarter;