Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Docs/durable-execution-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ For better observability, you can name individual branches (matching the JS SDK
```csharp
// Named branches for easier debugging and testing
var results = await context.ParallelAsync(
new NamedBranch<object>[]
new DurableBranch<object>[]
{
new("fetch_user", async (ctx) => await ctx.StepAsync(async (step) => await FetchUserData(userId))),
new("fetch_orders", async (ctx) => await ctx.StepAsync(async (step) => await FetchOrderHistory(userId))),
Expand Down Expand Up @@ -1416,6 +1416,13 @@ public class CompletionConfig
{
public int? MinSuccessful { get; set; }
public int? ToleratedFailureCount { get; set; }
/// <summary>
/// Maximum tolerated failure ratio, expressed as a value in the range
/// <c>0.0</c> to <c>1.0</c> (inclusive). For example, <c>0.25</c> means
/// "tolerate up to 25% failures; fail when the failure ratio strictly
/// exceeds 25%". <c>null</c> = no ratio-based threshold. Validated by the
/// setter; out-of-range values throw <see cref="ArgumentOutOfRangeException"/>.
/// </summary>
public double? ToleratedFailurePercentage { get; set; }

public static CompletionConfig AllSuccessful() => new() { ToleratedFailureCount = 0 };
Expand Down
30 changes: 30 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/BatchItemStatus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Status of an individual item in a <see cref="IBatchResult{T}"/>.
/// </summary>
/// <remarks>
/// Mirrors the wire-state of the per-branch checkpoint at the moment the batch
/// resolved. Items that finished produce <see cref="Succeeded"/> or
/// <see cref="Failed"/>; items still in flight when the batch's
/// <see cref="CompletionConfig"/> short-circuits remain in <see cref="Started"/>.
/// </remarks>
public enum BatchItemStatus
{
/// <summary>
/// The branch ran to completion and produced a result.
/// </summary>
Succeeded,

/// <summary>
/// The branch ran to completion and threw.
/// </summary>
Failed,

/// <summary>
/// The branch was still in flight when the batch's <see cref="CompletionConfig"/>
/// resolved (e.g., <see cref="CompletionConfig.FirstSuccessful"/> returned
/// before this branch finished).
/// </summary>
Started
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Defines completion criteria for parallel/map operations.
/// </summary>
/// <remarks>
/// Construct via the static factories (<see cref="AllSuccessful"/>,
/// <see cref="AllCompleted"/>, <see cref="FirstSuccessful"/>) or set the
/// individual properties directly. Multiple criteria combine: the operation
/// resolves as soon as any criterion is met (success short-circuit) or violated
/// (failure short-circuit).
/// </remarks>
public sealed class CompletionConfig
{
private double? _toleratedFailurePercentage;

/// <summary>
/// Minimum number of <see cref="BatchItemStatus.Succeeded"/> items required
/// before the operation resolves successfully. <c>null</c> = no minimum.
/// </summary>
public int? MinSuccessful { get; set; }

/// <summary>
/// Maximum tolerated <see cref="BatchItemStatus.Failed"/> count. When the
/// failure count <i>strictly exceeds</i> this value, the operation resolves
/// with <see cref="CompletionReason.FailureToleranceExceeded"/>.
/// <c>null</c> = no count-based failure threshold.
/// </summary>
public int? ToleratedFailureCount { get; set; }

/// <summary>
/// Maximum tolerated failure ratio, expressed as a value in the range
/// <c>0.0</c> to <c>1.0</c> (inclusive). For example, <c>0.25</c> means
/// "tolerate up to 25% failures; fail when the failure ratio strictly
/// exceeds 25%". <c>null</c> = no ratio-based failure threshold.
/// </summary>
/// <exception cref="System.ArgumentOutOfRangeException">
/// Thrown by the setter if the value is outside <c>[0.0, 1.0]</c>.
/// </exception>
public double? ToleratedFailurePercentage
{
get => _toleratedFailurePercentage;
set
{
if (value is { } v && (v < 0.0 || v > 1.0))
{
throw new ArgumentOutOfRangeException(nameof(value), v,
"ToleratedFailurePercentage must be a ratio in [0.0, 1.0].");
}
_toleratedFailurePercentage = value;
}
}

/// <summary>
/// All items must succeed. Equivalent to
/// <see cref="ToleratedFailureCount"/> = 0. The default for
/// <see cref="ParallelConfig.CompletionConfig"/>.
/// </summary>
public static CompletionConfig AllSuccessful() => new() { ToleratedFailureCount = 0 };

/// <summary>
/// Run every branch regardless of failures; surface failures per-item via
/// <see cref="IBatchResult{T}.Failed"/>. Resolution does not auto-throw —
/// the caller can inspect the result and call
/// <see cref="IBatchResult{T}.ThrowIfError"/> if they want strict-success
/// behavior.
/// </summary>
public static CompletionConfig AllCompleted() => new();

/// <summary>
/// Resolve as soon as one branch succeeds. Remaining in-flight branches are
/// reported as <see cref="BatchItemStatus.Started"/>.
/// </summary>
public static CompletionConfig FirstSuccessful() => new() { MinSuccessful = 1 };
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Why a batch operation (<see cref="IDurableContext.ParallelAsync{T}(IReadOnlyList{System.Func{IDurableContext, System.Threading.Tasks.Task{T}}}, string?, ParallelConfig?, System.Threading.CancellationToken)"/>
/// or future Map) resolved.
/// </summary>
public enum CompletionReason
{
/// <summary>
/// Every branch finished — no <see cref="CompletionConfig"/> short-circuit
/// was triggered. Branches may be a mix of <see cref="BatchItemStatus.Succeeded"/>
/// and <see cref="BatchItemStatus.Failed"/>.
/// </summary>
AllCompleted,

/// <summary>
/// <see cref="CompletionConfig.MinSuccessful"/> branches succeeded; remaining
/// branches were left in <see cref="BatchItemStatus.Started"/>.
/// </summary>
MinSuccessfulReached,

/// <summary>
/// <see cref="CompletionConfig.ToleratedFailureCount"/> or
/// <see cref="CompletionConfig.ToleratedFailurePercentage"/> was exceeded.
/// The batch is considered failed and surfaces a
/// <see cref="ParallelException"/> when awaited.
/// </summary>
FailureToleranceExceeded
}
13 changes: 13 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/DurableBranch.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// A named branch for
/// <see cref="IDurableContext.ParallelAsync{T}(IReadOnlyList{DurableBranch{T}}, string?, ParallelConfig?, System.Threading.CancellationToken)"/>.
/// Names appear in execution traces and on the wire <c>OperationUpdate.Name</c>
/// field, and surface on <see cref="IBatchItem{T}.Name"/>.
/// </summary>
/// <typeparam name="T">The branch's result type.</typeparam>
/// <param name="Name">Human-readable branch name. Required.</param>
/// <param name="Func">The user function executed inside the branch's
/// child context.</param>
public sealed record DurableBranch<T>(string Name, Func<IDurableContext, Task<T>> Func);
94 changes: 85 additions & 9 deletions Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,95 @@ private Task<T> RunChildContext<T>(

var operationId = _idGenerator.NextId();

// Capture this DurableContext's collaborators; the child shares state,
// termination, batcher, ARN, and Lambda context — but uses a child
// OperationIdGenerator so its operation IDs are deterministically
// namespaced under the parent op ID.
IDurableContext ChildFactory(string parentOpId) => new DurableContext(
_state, _terminationManager, _idGenerator.CreateChild(parentOpId),
_durableExecutionArn, LambdaContext, _batcher);

var op = new ChildContextOperation<T>(
operationId, name, func, config, serializer, ChildFactory,
operationId, name, func, config, serializer, MakeChildFactory(),
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}

public Task<IBatchResult<T>> ParallelAsync<T>(
IReadOnlyList<Func<IDurableContext, Task<T>>> branches,
string? name = null,
ParallelConfig? config = null,
CancellationToken cancellationToken = default)
=> RunParallel(WrapToDurableBranches(branches), name, config, cancellationToken);

public Task<IBatchResult<T>> ParallelAsync<T>(
IReadOnlyList<DurableBranch<T>> branches,
string? name = null,
ParallelConfig? config = null,
CancellationToken cancellationToken = default)
=> RunParallel(branches, name, config, cancellationToken);

private static IReadOnlyList<DurableBranch<T>> WrapToDurableBranches<T>(
IReadOnlyList<Func<IDurableContext, Task<T>>> branches)
{
if (branches == null) throw new ArgumentNullException(nameof(branches));

var result = new DurableBranch<T>[branches.Count];
for (var i = 0; i < branches.Count; i++)
{
var func = branches[i];
if (func == null)
throw new ArgumentException($"Branch at index {i} is null.", nameof(branches));
// Default name is the index — surfaces in execution traces and on
// IBatchItem<T>.Name. Users wanting custom names use the
// DurableBranch<T> overload.
result[i] = new DurableBranch<T>(i.ToString(System.Globalization.CultureInfo.InvariantCulture), func);
}
return result;
}

private Task<IBatchResult<T>> RunParallel<T>(
IReadOnlyList<DurableBranch<T>> branches,
string? name,
ParallelConfig? config,
CancellationToken cancellationToken)
{
if (branches == null) throw new ArgumentNullException(nameof(branches));
for (var i = 0; i < branches.Count; i++)
{
if (branches[i] == null)
throw new ArgumentException($"Branch at index {i} is null.", nameof(branches));
if (branches[i].Func == null)
throw new ArgumentException($"Branch at index {i} has a null Func.", nameof(branches));
}

var effectiveConfig = config ?? new ParallelConfig();
if (effectiveConfig.NestingType == NestingType.Flat)
{
throw new NotSupportedException(
"NestingType.Flat is not yet supported in the .NET Durable Execution SDK. " +
"Use NestingType.Nested (the default) for now.");
}

var serializer = LambdaContext.Serializer
?? throw new InvalidOperationException(
"No ILambdaSerializer is registered on ILambdaContext.Serializer. " +
"Register a serializer via LambdaBootstrapBuilder.Create(handler, serializer) " +
"(or in tests, set TestLambdaContext.Serializer).");

var operationId = _idGenerator.NextId();
var op = new Internal.ParallelOperation<T>(
operationId, name, branches, effectiveConfig, serializer, MakeChildFactory(),
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}

/// <summary>
/// Builds the factory used by <see cref="ChildContextOperation{T}"/> (and
/// each <see cref="Internal.ParallelOperation{T}"/> branch) to construct
/// the inner <see cref="IDurableContext"/>. The child shares state,
/// termination, batcher, ARN, and Lambda context — but uses a child
/// <see cref="OperationIdGenerator"/> so its operation IDs are
/// deterministically namespaced under the parent op ID.
/// </summary>
private Func<string, IDurableContext> MakeChildFactory()
{
return parentOpId => new DurableContext(
_state, _terminationManager, _idGenerator.CreateChild(parentOpId),
_durableExecutionArn, LambdaContext, _batcher);
}
}

internal sealed class DurableExecutionContext : IExecutionContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,36 @@ public ChildContextException(string message) : base(message) { }
/// <summary>Creates a <see cref="ChildContextException"/> wrapping an inner exception.</summary>
public ChildContextException(string message, Exception innerException) : base(message, innerException) { }
}

/// <summary>
/// Thrown when a parallel operation resolves with
/// <see cref="CompletionReason.FailureToleranceExceeded"/>. The aggregate
/// <see cref="IBatchResult"/> is preserved on <see cref="Result"/> so callers
/// can inspect per-branch outcomes.
/// </summary>
/// <remarks>
/// This is the base type for parallel failures. Subclasses may be added in
/// future releases (for example, a dedicated
/// <c>ParallelFailureToleranceExceededException</c>); catching
/// <see cref="ParallelException"/> remains forward-compatible.
/// </remarks>
public class ParallelException : DurableExecutionException
{
/// <summary>
/// The aggregate result of the parallel operation. Type-erased — cast to
/// <c>IBatchResult&lt;T&gt;</c> if the per-branch result type is known.
/// </summary>
public IBatchResult? Result { get; init; }

/// <summary>
/// Why the parallel operation resolved.
/// </summary>
public CompletionReason CompletionReason { get; init; }

/// <summary>Creates an empty <see cref="ParallelException"/>.</summary>
public ParallelException() { }
/// <summary>Creates a <see cref="ParallelException"/> with the given message.</summary>
public ParallelException(string message) : base(message) { }
/// <summary>Creates a <see cref="ParallelException"/> wrapping an inner exception.</summary>
public ParallelException(string message, Exception innerException) : base(message, innerException) { }
}
38 changes: 38 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/IBatchItem.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// One item inside an <see cref="IBatchResult{T}"/> — the outcome of a single
/// branch (parallel) or item (map).
/// </summary>
/// <typeparam name="T">The branch/item result type.</typeparam>
public interface IBatchItem<T>
{
/// <summary>
/// Zero-based position in the original branches/items list. Stable across
/// replays.
/// </summary>
int Index { get; }

/// <summary>
/// Optional human-readable name for this branch/item.
/// Surfaces on the wire <c>OperationUpdate.Name</c> field for observability.
/// </summary>
string? Name { get; }

/// <summary>
/// Status of this item at the moment the batch resolved.
/// </summary>
BatchItemStatus Status { get; }

/// <summary>
/// The branch/item result. Populated only when <see cref="Status"/> is
/// <see cref="BatchItemStatus.Succeeded"/>.
/// </summary>
T? Result { get; }

/// <summary>
/// The branch/item failure. Populated only when <see cref="Status"/> is
/// <see cref="BatchItemStatus.Failed"/>.
/// </summary>
DurableExecutionException? Error { get; }
}
Loading
Loading