diff --git a/pyproject.toml b/pyproject.toml index 7a2df7ea8..b140ab067 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ opentelemetry = ["opentelemetry-api>=1.11.1,<2", "opentelemetry-sdk>=1.11.1,<2"] pydantic = ["pydantic>=2.0.0,<3"] openai-agents = ["openai-agents>=0.3,<0.7", "mcp>=1.9.4, <2"] google-adk = ["google-adk>=1.27.0,<2"] +workdir = ["fsspec>=2024.1.0"] [project.urls] Homepage = "https://github.com/temporalio/sdk-python" diff --git a/temporalio/contrib/workdir/README.md b/temporalio/contrib/workdir/README.md new file mode 100644 index 000000000..87dc48f8e --- /dev/null +++ b/temporalio/contrib/workdir/README.md @@ -0,0 +1,88 @@ +# Workspace Sync for Temporal Activities + +Sync a local directory with remote storage before and after a Temporal activity. Enables file-based activities to work across distributed workers where disk is not shared. + +## Problem + +Temporal activities that read/write files on local disk break when you scale to multiple worker instances — each worker has its own disk. This module solves that by syncing a remote storage location to a local temp directory before the activity runs, and pushing changes back after. + +## Install + +```bash +pip install temporalio[workdir] + +# With a specific cloud backend: +pip install temporalio[workdir] gcsfs # Google Cloud Storage +pip install temporalio[workdir] s3fs # Amazon S3 +pip install temporalio[workdir] adlfs # Azure Blob Storage +``` + +## Usage + +### As a context manager (generic, works anywhere) + +```python +from temporalio.contrib.workdir import Workspace + +async with Workspace("gs://my-bucket/pipeline/component-x") as ws: + # ws.path is a local Path — read and write files normally + data = json.loads((ws.path / "component.json").read_text()) + (ws.path / "result.csv").write_text("col1,col2\nval1,val2") + # On clean exit: local dir is archived and uploaded + # On exception: no upload (remote state unchanged) +``` + +### As a Temporal activity decorator + +```python +from temporalio import activity +from temporalio.contrib.workdir import workspace, get_workspace_path + +@workspace("gs://my-bucket/{workflow_id}/{activity_type}") +@activity.defn +async def extract(input: ExtractInput) -> ExtractOutput: + ws = get_workspace_path() + # Template vars resolved from activity.info() + source = (ws / "source.json").read_text() + (ws / "output.csv").write_text(process(source)) + return ExtractOutput(success=True) +``` + +### Custom template variables + +```python +@workspace( + "gs://my-bucket/{workflow_id}/components/{component}", + key_fn=lambda input: {"component": input.component_name}, +) +@activity.defn +async def register(input: RegisterInput) -> RegisterOutput: + ws = get_workspace_path() + ... +``` + +## How It Works + +1. **Pull**: On entry, downloads `{remote_url}.tar.gz` and unpacks to a temp directory +2. **Execute**: Your activity reads/writes files in the local directory +3. **Push**: On clean exit, packs the directory into `tar.gz` and uploads + +If the archive doesn't exist yet (first run), the local directory starts empty. If the activity raises an exception, no push happens — remote state is untouched. + +## Storage Backends + +Any backend supported by [fsspec](https://filesystem-spec.readthedocs.io/): + +| Scheme | Backend | Extra package | +|--------|---------|--------------| +| `gs://` | Google Cloud Storage | `gcsfs` | +| `s3://` | Amazon S3 | `s3fs` | +| `az://` | Azure Blob Storage | `adlfs` | +| `file://` | Local filesystem | (none) | +| `memory://` | In-memory (testing) | (none) | + +Pass backend-specific options as keyword arguments: + +```python +Workspace("gs://bucket/key", project="my-gcp-project", token="cloud") +``` diff --git a/temporalio/contrib/workdir/__init__.py b/temporalio/contrib/workdir/__init__.py new file mode 100644 index 000000000..e7d308a8e --- /dev/null +++ b/temporalio/contrib/workdir/__init__.py @@ -0,0 +1,20 @@ +"""Remote workspace sync for Temporal activities. + +This package provides a :class:`Workspace` that syncs a local directory with +remote storage (GCS, S3, Azure, local, etc.) before and after a Temporal +activity executes. This enables file-based activities to work correctly across +distributed workers where disk state is not shared. + +The storage backend is auto-detected from the URL scheme via `fsspec`_. + +.. _fsspec: https://filesystem-spec.readthedocs.io/ +""" + +from temporalio.contrib.workdir._temporal import get_workspace_path, workspace +from temporalio.contrib.workdir._workspace import Workspace + +__all__ = [ + "Workspace", + "get_workspace_path", + "workspace", +] diff --git a/temporalio/contrib/workdir/_archive.py b/temporalio/contrib/workdir/_archive.py new file mode 100644 index 000000000..20903934b --- /dev/null +++ b/temporalio/contrib/workdir/_archive.py @@ -0,0 +1,43 @@ +"""Archive utilities for packing/unpacking workspace directories.""" + +import io +import tarfile +from pathlib import Path + + +def pack(directory: Path) -> bytes: + """Pack a directory into a gzipped tar archive. + + Args: + directory: Local directory to archive. Must exist. + + Returns: + The tar.gz archive as bytes. + """ + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w:gz") as tar: + for entry in sorted(directory.rglob("*")): + if entry.is_file(): + arcname = str(entry.relative_to(directory)) + tar.add(str(entry), arcname=arcname) + return buf.getvalue() + + +def unpack(data: bytes, directory: Path) -> None: + """Unpack a gzipped tar archive into a directory. + + Args: + data: The tar.gz archive bytes. + directory: Target directory. Created if it doesn't exist. + """ + directory.mkdir(parents=True, exist_ok=True) + buf = io.BytesIO(data) + with tarfile.open(fileobj=buf, mode="r:gz") as tar: + # Security: prevent path traversal + for member in tar.getmembers(): + member_path = Path(directory / member.name).resolve() + if not str(member_path).startswith(str(directory.resolve())): + raise ValueError( + f"Archive member {member.name!r} would escape target directory" + ) + tar.extractall(path=str(directory), filter="data") diff --git a/temporalio/contrib/workdir/_temporal.py b/temporalio/contrib/workdir/_temporal.py new file mode 100644 index 000000000..b48f4b2da --- /dev/null +++ b/temporalio/contrib/workdir/_temporal.py @@ -0,0 +1,114 @@ +"""Temporal-specific integration for Workspace.""" + +from __future__ import annotations + +import contextvars +import functools +from collections.abc import Callable +from pathlib import Path +from typing import Any, TypeVar + +import temporalio.activity + +from temporalio.contrib.workdir._workspace import Workspace + +F = TypeVar("F", bound=Callable[..., Any]) + +_current_workspace_path: contextvars.ContextVar[Path | None] = contextvars.ContextVar( + "_current_workspace_path", default=None +) + + +def workspace( + remote_url_template: str, + key_fn: Callable[..., dict[str, str]] | None = None, + **workspace_kwargs: Any, +) -> Callable[[F], F]: + """Decorator that provides a :class:`Workspace` to a Temporal activity. + + The workspace path is available via :func:`get_workspace_path` inside + the activity body. The workspace is pulled before execution and pushed + after successful completion. + + Template variables in ``remote_url_template`` are resolved from + :func:`temporalio.activity.info` and, optionally, from ``key_fn``. + + Built-in template variables (from ``activity.info()``): + + - ``{workflow_id}`` + - ``{workflow_run_id}`` + - ``{activity_id}`` + - ``{activity_type}`` + - ``{task_queue}`` + + Example:: + + @workspace("gs://bucket/{workflow_id}/{activity_type}") + @activity.defn + async def process(input: ProcessInput) -> Output: + ws = get_workspace_path() + data = json.loads((ws / "config.json").read_text()) + ... + + # With key_fn for custom template vars: + @workspace( + "gs://bucket/{workflow_id}/{component}", + key_fn=lambda input: {"component": input.component_name}, + ) + @activity.defn + async def process(input: ProcessInput) -> Output: + ... + + Args: + remote_url_template: URL template with ``{var}`` placeholders. + key_fn: Optional function that receives the activity's positional + arguments and returns a dict of additional template variables. + **workspace_kwargs: Extra keyword arguments forwarded to + :class:`Workspace` (e.g., ``cleanup="keep"``). + """ + + def decorator(fn: F) -> F: + @functools.wraps(fn) + async def wrapper(*args: Any, **kwargs: Any) -> Any: + info = temporalio.activity.info() + template_vars: dict[str, str] = { + "workflow_id": info.workflow_id, + "workflow_run_id": info.workflow_run_id, + "activity_id": info.activity_id, + "activity_type": info.activity_type, + "task_queue": info.task_queue, + } + if key_fn is not None: + template_vars.update(key_fn(*args)) + + remote_url = remote_url_template.format(**template_vars) + + async with Workspace(remote_url, **workspace_kwargs) as ws: + token = _current_workspace_path.set(ws.path) + try: + return await fn(*args, **kwargs) + finally: + _current_workspace_path.reset(token) + + return wrapper # type: ignore[return-value] + + return decorator + + +def get_workspace_path() -> Path: + """Get the workspace path for the currently executing activity. + + Call this from inside an activity decorated with :func:`workspace`. + + Returns: + The local workspace :class:`~pathlib.Path`. + + Raises: + RuntimeError: If called outside a workspace-decorated activity. + """ + path = _current_workspace_path.get(None) + if path is None: + raise RuntimeError( + "get_workspace_path() called outside a workspace-decorated activity" + ) + return path diff --git a/temporalio/contrib/workdir/_workspace.py b/temporalio/contrib/workdir/_workspace.py new file mode 100644 index 000000000..16962467f --- /dev/null +++ b/temporalio/contrib/workdir/_workspace.py @@ -0,0 +1,130 @@ +"""Core Workspace class for syncing file trees with remote storage.""" + +from __future__ import annotations + +import shutil +import tempfile +from pathlib import Path +from typing import Literal +from urllib.parse import urlparse + +import fsspec + +from temporalio.contrib.workdir._archive import pack, unpack + + +class Workspace: + """Sync a local directory with a remote storage location. + + A Workspace maps a remote URL (the "key") to a local directory. On entry, + the remote archive is downloaded and unpacked. On clean exit, the local + directory is packed and uploaded back. + + Works with any storage backend supported by fsspec (GCS, S3, Azure, local + filesystem, etc.). The backend is auto-detected from the URL scheme. + + Usage:: + + async with Workspace("gs://bucket/state/component-x") as ws: + data = json.loads((ws.path / "component.json").read_text()) + (ws.path / "output.csv").write_text("a,b\\n1,2") + # On clean exit: local dir archived and uploaded to remote + + Args: + remote_url: Remote storage URL. The scheme determines the fsspec + backend (``gs://`` for GCS, ``s3://`` for S3, ``file://`` for + local, etc.). An ``.tar.gz`` suffix is appended automatically + for the archive file. + local_path: Local directory to use as the working copy. If ``None``, + a temporary directory is created. + cleanup: What to do with the local directory after push. + ``"auto"`` deletes it, ``"keep"`` leaves it in place. + storage_options: Extra keyword arguments passed to + ``fsspec.filesystem()``. Use for authentication, project IDs, etc. + """ + + def __init__( + self, + remote_url: str, + local_path: Path | None = None, + cleanup: Literal["auto", "keep"] = "auto", + **storage_options: object, + ) -> None: + self._remote_url = remote_url.rstrip("/") + self._archive_url = self._remote_url + ".tar.gz" + self._cleanup = cleanup + self._storage_options = storage_options + + parsed = urlparse(self._archive_url) + self._protocol = parsed.scheme or "file" + # fsspec expects path without scheme for most backends + self._remote_path = ( + parsed.netloc + parsed.path if parsed.netloc else parsed.path + ) + + self._fs = fsspec.filesystem(self._protocol, **storage_options) + + if local_path is not None: + self._local_path = local_path + self._owns_tempdir = False + else: + self._local_path = Path(tempfile.mkdtemp(prefix="temporal-workdir-")) + self._owns_tempdir = True + + @property + def path(self) -> Path: + """The local working directory. + + Read and write files here freely. Changes are pushed to remote storage + when the context manager exits cleanly. + """ + return self._local_path + + async def pull(self) -> None: + """Download and unpack the remote archive to the local directory. + + If no archive exists at the remote URL, the local directory is left + empty (first run). Existing local files are removed before unpacking. + """ + if not self._fs.exists(self._remote_path): + self._local_path.mkdir(parents=True, exist_ok=True) + return + + data = self._fs.cat_file(self._remote_path) + # Clear local dir before unpacking to avoid stale files + if self._local_path.exists(): + shutil.rmtree(self._local_path) + unpack(data, self._local_path) + + async def push(self) -> None: + """Pack the local directory and upload to remote storage. + + If the local directory is empty, the remote archive is deleted + (if it exists) to keep storage clean. + """ + files = list(self._local_path.rglob("*")) + if not any(f.is_file() for f in files): + # Empty workspace — remove remote archive if it exists + if self._fs.exists(self._remote_path): + self._fs.rm(self._remote_path) + return + + data = pack(self._local_path) + self._fs.pipe_file(self._remote_path, data) + + async def __aenter__(self) -> Workspace: + """Pull remote state and return the workspace.""" + await self.pull() + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: object, + ) -> None: + """Push local state on clean exit, then optionally clean up.""" + if exc_type is None: + await self.push() + if self._cleanup == "auto" and self._owns_tempdir: + shutil.rmtree(self._local_path, ignore_errors=True) diff --git a/tests/contrib/workdir/__init__.py b/tests/contrib/workdir/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/contrib/workdir/test_temporal.py b/tests/contrib/workdir/test_temporal.py new file mode 100644 index 000000000..fd3852c4c --- /dev/null +++ b/tests/contrib/workdir/test_temporal.py @@ -0,0 +1,96 @@ +"""Tests for the @workspace Temporal decorator.""" + +import json +import uuid +from dataclasses import dataclass +from datetime import timedelta + +import pytest + +from temporalio import activity, workflow +from temporalio.contrib.workdir import get_workspace_path, workspace +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + + +@dataclass +class ComponentInput: + """Input for test activities.""" + + component_name: str + data: str + + +@workspace( + "memory://temporal-test/{workflow_id}/{component}", + key_fn=lambda input: {"component": input.component_name}, +) +@activity.defn +async def write_component(input: ComponentInput) -> str: + """Activity that writes data to a workspace.""" + ws = get_workspace_path() + (ws / "component.json").write_text( + json.dumps({"name": input.component_name, "data": input.data}) + ) + return f"wrote {input.component_name}" + + +@workspace( + "memory://temporal-test/{workflow_id}/{component}", + key_fn=lambda input: {"component": input.component_name}, +) +@activity.defn +async def read_component(input: ComponentInput) -> str: + """Activity that reads data from a workspace.""" + ws = get_workspace_path() + content = json.loads((ws / "component.json").read_text()) + return content["data"] + + +@workflow.defn +class WriteReadWorkflow: + """Workflow that writes then reads from a workspace.""" + + @workflow.run + async def run(self, component_name: str, data: str) -> str: + input_obj = ComponentInput(component_name=component_name, data=data) + + await workflow.execute_activity( + write_component, + input_obj, + start_to_close_timeout=timedelta(seconds=30), + ) + + return await workflow.execute_activity( + read_component, + input_obj, + start_to_close_timeout=timedelta(seconds=30), + ) + + +class TestWorkspaceDecorator: + """Tests for the @workspace decorator with real Temporal workers.""" + + @pytest.fixture + async def env(self) -> WorkflowEnvironment: + """Start a local Temporal test environment.""" + return await WorkflowEnvironment.start_local() + + async def test_write_then_read(self, env: WorkflowEnvironment) -> None: + """Data written in one activity is readable in another.""" + task_queue = str(uuid.uuid4()) + + async with Worker( + env.client, + task_queue=task_queue, + workflows=[WriteReadWorkflow], + activities=[write_component, read_component], + ): + result = await env.client.execute_workflow( + WriteReadWorkflow.run, + args=["my-component", "hello-world"], + id=str(uuid.uuid4()), + task_queue=task_queue, + ) + + assert result == "hello-world" diff --git a/tests/contrib/workdir/test_workspace.py b/tests/contrib/workdir/test_workspace.py new file mode 100644 index 000000000..a3ad14017 --- /dev/null +++ b/tests/contrib/workdir/test_workspace.py @@ -0,0 +1,164 @@ +"""Tests for the Workspace class.""" + +import json +from pathlib import Path + +import pytest + +from temporalio.contrib.workdir import Workspace + + +@pytest.fixture +def memory_url() -> str: + """Return a unique memory:// URL for each test.""" + import uuid + + return f"memory://workdir-test/{uuid.uuid4()}" + + +class TestWorkspace: + """Tests for Workspace pull/push lifecycle.""" + + async def test_empty_remote_starts_empty_local(self, memory_url: str) -> None: + """First run with no remote archive creates an empty local dir.""" + async with Workspace(memory_url) as ws: + assert ws.path.exists() + assert list(ws.path.iterdir()) == [] + + async def test_roundtrip_single_file(self, memory_url: str) -> None: + """Write a file, push, then pull into a new workspace.""" + # Write + async with Workspace(memory_url) as ws: + (ws.path / "hello.txt").write_text("world") + + # Read back + async with Workspace(memory_url) as ws: + assert (ws.path / "hello.txt").read_text() == "world" + + async def test_roundtrip_nested_directories(self, memory_url: str) -> None: + """Nested directory structures survive the archive round-trip.""" + async with Workspace(memory_url) as ws: + (ws.path / "a" / "b").mkdir(parents=True) + (ws.path / "a" / "b" / "deep.json").write_text('{"nested": true}') + (ws.path / "top.txt").write_text("top") + + async with Workspace(memory_url) as ws: + assert json.loads((ws.path / "a" / "b" / "deep.json").read_text()) == { + "nested": True + } + assert (ws.path / "top.txt").read_text() == "top" + + async def test_overwrite_replaces_previous_state(self, memory_url: str) -> None: + """Second push replaces the first — no stale files from run 1.""" + # Run 1: write file_a + async with Workspace(memory_url) as ws: + (ws.path / "file_a.txt").write_text("a") + + # Run 2: write file_b only + async with Workspace(memory_url) as ws: + # file_a was pulled from run 1 + assert (ws.path / "file_a.txt").exists() + # Delete file_a, write file_b + (ws.path / "file_a.txt").unlink() + (ws.path / "file_b.txt").write_text("b") + + # Run 3: only file_b should exist + async with Workspace(memory_url) as ws: + assert not (ws.path / "file_a.txt").exists() + assert (ws.path / "file_b.txt").read_text() == "b" + + async def test_exception_skips_push(self, memory_url: str) -> None: + """If the activity raises, remote state is not updated.""" + # Write initial state + async with Workspace(memory_url) as ws: + (ws.path / "original.txt").write_text("safe") + + # Fail mid-activity — should not push + with pytest.raises(RuntimeError, match="boom"): + async with Workspace(memory_url) as ws: + (ws.path / "original.txt").write_text("corrupted") + (ws.path / "new_file.txt").write_text("should not persist") + raise RuntimeError("boom") + + # Original state preserved + async with Workspace(memory_url) as ws: + assert (ws.path / "original.txt").read_text() == "safe" + assert not (ws.path / "new_file.txt").exists() + + async def test_cleanup_auto_removes_tempdir(self, memory_url: str) -> None: + """Auto cleanup removes the temp directory after exit.""" + async with Workspace(memory_url, cleanup="auto") as ws: + tmpdir = ws.path + (ws.path / "file.txt").write_text("data") + + assert not tmpdir.exists() + + async def test_cleanup_keep_preserves_dir(self, memory_url: str) -> None: + """Keep cleanup leaves the local directory in place.""" + async with Workspace(memory_url, cleanup="keep") as ws: + tmpdir = ws.path + (ws.path / "file.txt").write_text("data") + + assert tmpdir.exists() + assert (tmpdir / "file.txt").read_text() == "data" + # Manual cleanup + import shutil + + shutil.rmtree(tmpdir) + + async def test_explicit_local_path( + self, memory_url: str, tmp_path: Path + ) -> None: + """User-specified local_path is used instead of a temp directory.""" + local = tmp_path / "my_workspace" + async with Workspace(memory_url, local_path=local) as ws: + assert ws.path == local + (ws.path / "data.txt").write_text("hello") + + # With explicit local_path + auto cleanup, dir should still exist + # (we only auto-clean tempdirs we created) + assert local.exists() + + async def test_empty_push_removes_remote_archive(self, memory_url: str) -> None: + """Pushing an empty directory removes the remote archive.""" + import fsspec + + fs = fsspec.filesystem("memory") + archive_path = memory_url.replace("memory://", "") + ".tar.gz" + + # Create initial state + async with Workspace(memory_url) as ws: + (ws.path / "data.txt").write_text("hello") + + assert fs.exists(archive_path) + + # Push empty + async with Workspace(memory_url) as ws: + (ws.path / "data.txt").unlink() + + assert not fs.exists(archive_path) + + async def test_binary_files(self, memory_url: str) -> None: + """Binary files survive the archive round-trip.""" + binary_data = bytes(range(256)) + + async with Workspace(memory_url) as ws: + (ws.path / "data.bin").write_bytes(binary_data) + + async with Workspace(memory_url) as ws: + assert (ws.path / "data.bin").read_bytes() == binary_data + + +class TestWorkspaceExplicitPullPush: + """Tests for using pull/push directly without context manager.""" + + async def test_manual_pull_push(self, memory_url: str) -> None: + """Pull and push can be called explicitly.""" + ws = Workspace(memory_url) + await ws.pull() + (ws.path / "manual.txt").write_text("works") + await ws.push() + + ws2 = Workspace(memory_url) + await ws2.pull() + assert (ws2.path / "manual.txt").read_text() == "works"