Workflow basics - Rust SDK
How to develop a Workflow
Workflows are the fundamental unit of a Temporal Application and it all starts with the development of a Workflow Definition.
In the Temporal Rust SDK programming model, a Workflow Definition is made of a Workflow struct and associated methods decorated with macros.
A Workflow is defined by:
- A struct that holds the Workflow state
- A
#[run]method that contains the main Workflow logic - An optional
#[init]method that initializes the Workflow - Optional
#[signal],#[query], and#[update]methods for external interaction
use temporalio_macros::{workflow, workflow_methods};
use temporalio_sdk::{WorkflowResult, workflow::WorkflowContextView};
#[workflow]
pub struct GreetingWorkflow {
name: String,
}
#[workflow_methods]
impl GreetingWorkflow {
#[init]
fn new(_ctx: &WorkflowContextView, name: String) -> Self {
Self { name }
}
#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<String> {
let name = ctx.state(|s| s.name.clone());
Ok(format!("Hello, {}!", name))
}
}
The #[workflow] macro marks the struct as a Workflow. The #[workflow_methods] macro is applied to the impl block containing the Workflow methods.
Workflow struct
The Workflow struct holds the state of your Workflow Execution. This state is persisted and recovered during replays. All fields in a Workflow struct should be serializable.
Workflow initialization
The #[init] method is optional and is called when the Workflow first starts. It receives the initial Workflow input parameters and initializes the Workflow struct:
#[init]
fn new(_ctx: &WorkflowContextView, name: String, age: u32) -> Self {
Self {
name,
age,
started_at: Instant::now(),
}
}
The #[init] method receives a WorkflowContextView, which provides read-only access to Workflow execution information.
Run method
The #[run] method is required and contains the main Workflow logic. It:
- Must be
async - Receives a mutable
WorkflowContext<Self> - Returns
WorkflowResult<T>where T is the Workflow return type - Executes exactly once per Workflow execution
#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<String> {
// Execute activities, timers, child workflows, etc.
let result = ctx.activity(
MyActivities::process,
input,
ActivityOptions::default(),
).await?;
Ok(result)
}
Define Workflow parameters
Temporal Workflows may have any number of custom parameters. However, we strongly recommend that objects are used as parameters, so that the object's individual fields may be altered without breaking the signature of the Workflow. All Workflow Definition parameters must be serializable.
A method annotated with #[init] can have any number of parameters. We recommend passing a single struct that contains all the input fields:
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]
pub struct ProcessingInput {
pub data: Vec<String>,
pub timeout_seconds: u32,
}
#[workflow]
pub struct ProcessingWorkflow {
data: Vec<String>,
timeout_seconds: u32,
}
#[workflow_methods]
impl ProcessingWorkflow {
#[init]
fn new(_ctx: &WorkflowContextView, input: ProcessingInput) -> Self {
Self {
data: input.data,
timeout_seconds: input.timeout_seconds,
}
}
#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<String> {
// Use the initialized state
Ok("Processing complete".to_string())
}
}
All Workflow input should be serializable by serde.
Define Workflow return parameters
Workflow return values must also be serializable. Returning results, returning errors, or throwing exceptions is fairly idiomatic in each language that is supported. However, Temporal APIs that must be used to get the result of a Workflow Execution will only ever receive one of either the result or the error.
The return type of a Workflow is WorkflowResult<T> where T implements Serialize. Success is represented by Ok(value) and failure by Err(...):
#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<ProcessingResult> {
// Can return a complex result type
let result = ProcessingResult {
status: "completed".to_string(),
records_processed: 100,
};
Ok(result)
}
Customize your Workflow Type
Workflows have a Type that is referred to as the Workflow name. By default, the Workflow type is the name of the Workflow struct. You can customize it by providing a name parameter to the #[workflow] macro:
#[workflow(name = "my-custom-workflow")]
pub struct GreetingWorkflow {
name: String,
}
The Workflow Type defaults to the struct name if not specified. For example, this Workflow would have the type GreetingWorkflow:
#[workflow]
pub struct GreetingWorkflow {
// ...
}
Workflow logic requirements
Workflow logic is constrained by deterministic execution requirements. For non-deterministic operations like API calls, LLM invocations, and database queries, use Activities.
Workflow code must be deterministic because the Temporal Server may replay your Workflow to reconstruct its state. This means:
Don't use nondeterministic functions
- No direct system time access - use
ctx.workflow_time()instead ofSystemTime::now() - No random number generation - use
ctx.random_seed()instead - No external I/O (network, filesystem, etc.) - perform these in Activities instead
- No UUID generation via random means - the SDK doesn't have a direct UUID function, but you can use Activities for non-deterministic operations
- Do not use
tokioorfuturesconcurrency primitives directly in Workflow code. Many of them, liketokio::select!,tokio::spawn,futures::select!, introduce non-deterministic behavior that will break Workflow replay.
Instead, use the deterministic wrappers provided in temporalio_sdk::workflows:
select!— deterministic select (polls in declaration order)join!— deterministic join for a fixed number of futuresjoin_all— deterministic join for a dynamic collection of futures
Use Workflow-safe primitives
The Rust SDK provides:
ctx.timer()- Wait for a durationctx.wait_condition(closure)- Wait until a condition is trueworkflows::select!- Deterministic select statementctx.start_activity()- Execute Activitiesctx.start_local_activity()- Execute local Activitiesctx.child_workflow()- Execute child Workflowsctx.cancelled()- Check if Workflow is cancelled
use std::time::Duration;
#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<String> {
// Good - deterministic timer
ctx.timer(Duration::from_secs(10)).await;
// Good - deterministic wait for condition
ctx.wait_condition(|s| s.values.len() >= 3).await;
// Bad - nondeterministic sleep
// tokio::time::sleep(Duration::from_secs(10)).await;
// Bad - nondeterministic time
// SystemTime::now()
Ok("Done".to_string())
}
Access Workflow State
Use ctx.state() for read-only access and ctx.state_mut() for mutable access to your Workflow state:
#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<String> {
// Read-only access
let name = ctx.state(|s| s.name.clone());
// Mutable access (for signal handlers or update handlers)
// Available in sync methods
Ok(name)
}
In synchronous Signal and Update handlers, you can mutate state directly via &mut self.
Workflow return types
The #[run] method must return WorkflowResult<T>. This is a type alias for Result<T, WorkflowExecution Error>.
For errors, use a WorkflowExecutionError:
#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<String> {
if some_validation_fails {
return Err(WorkflowExecutionError::new("validation_failed", "Input is invalid"));
}
Ok("Success".to_string())
}
Workflow errors will cause the Workflow Execution to fail and the error details will be available to clients.