diff --git a/.gitignore b/.gitignore index f25225c3..9889b0dd 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ __pycache__/ .env eo_api.egg-info/ data/downloads +data/derived data/artifacts data/pygeoapi docs/internal/ diff --git a/data/datasets/chirps3.yaml b/data/datasets/chirps3.yaml index 6f6a2064..e97a0a4a 100644 --- a/data/datasets/chirps3.yaml +++ b/data/datasets/chirps3.yaml @@ -13,3 +13,32 @@ resolution: 5 km x 5 km source: CHIRPS v3 source_url: https://www.chc.ucsb.edu/data/chirps3 + +- id: chirps3_precipitation_weekly + name: Total precipitation weekly (CHIRPS3) + short_name: Total precipitation weekly + variable: precip + period_type: weekly + sync_kind: derived + resample: + source_dataset_id: chirps3_precipitation_daily + method: sum + week_start: monday + units: mm + resolution: 5 km x 5 km + source: CHIRPS v3 + source_url: https://www.chc.ucsb.edu/data/chirps3 + +- id: chirps3_precipitation_monthly + name: Total precipitation monthly (CHIRPS3) + short_name: Total precipitation monthly + variable: precip + period_type: monthly + sync_kind: derived + resample: + source_dataset_id: chirps3_precipitation_daily + method: sum + units: mm + resolution: 5 km x 5 km + source: CHIRPS v3 + source_url: https://www.chc.ucsb.edu/data/chirps3 diff --git a/data/datasets/era5_land.yaml b/data/datasets/era5_land.yaml index 7c03f811..415d2408 100644 --- a/data/datasets/era5_land.yaml +++ b/data/datasets/era5_land.yaml @@ -18,6 +18,20 @@ source: ERA5-Land Reanalysis source_url: https://cds.climate.copernicus.eu/datasets/reanalysis-era5-land +- id: era5land_temperature_daily + name: Daily mean 2m temperature (ERA5-Land) + short_name: Daily mean 2m temperature + variable: t2m + period_type: daily + sync_kind: derived + resample: + source_dataset_id: era5land_temperature_hourly + method: mean + units: degC + resolution: 9 km x 9 km + source: ERA5-Land Reanalysis + source_url: https://cds.climate.copernicus.eu/datasets/reanalysis-era5-land + - id: era5land_precipitation_hourly name: Total precipitation (ERA5-Land) short_name: Total precipitation @@ -38,3 +52,17 @@ resolution: 9 km x 9 km source: ERA5-Land Reanalysis source_url: https://cds.climate.copernicus.eu/datasets/reanalysis-era5-land + +- id: era5land_precipitation_daily + name: Daily total precipitation (ERA5-Land) + short_name: Daily total precipitation + variable: tp + period_type: daily + sync_kind: derived + resample: + source_dataset_id: era5land_precipitation_hourly + method: sum + units: mm + resolution: 9 km x 9 km + source: ERA5-Land Reanalysis + source_url: https://cds.climate.copernicus.eu/datasets/reanalysis-era5-land diff --git a/src/climate_api/data_accessor/services/accessor.py b/src/climate_api/data_accessor/services/accessor.py index 43a82f3b..67aa8932 100644 --- a/src/climate_api/data_accessor/services/accessor.py +++ b/src/climate_api/data_accessor/services/accessor.py @@ -5,6 +5,7 @@ import tempfile from typing import Any +import numpy as np import xarray as xr from ...data_manager.services.downloader import get_cache_files, get_zarr_path @@ -135,8 +136,8 @@ def _coverage_from_dataset(*, ds: xr.Dataset, period_type: str) -> dict[str, Any time_dim = get_time_dim(ds) lon_dim, lat_dim = get_lon_lat_dims(ds) - start = numpy_datetime_to_period_string(ds[time_dim].min(), period_type) # type: ignore[arg-type] - end = numpy_datetime_to_period_string(ds[time_dim].max(), period_type) # type: ignore[arg-type] + start = _period_string_scalar(numpy_datetime_to_period_string(ds[time_dim].min(), period_type)) # type: ignore[arg-type] + end = _period_string_scalar(numpy_datetime_to_period_string(ds[time_dim].max(), period_type)) # type: ignore[arg-type] xmin, xmax = ds[lon_dim].min().item(), ds[lon_dim].max().item() ymin, ymax = ds[lat_dim].min().item(), ds[lat_dim].max().item() @@ -150,6 +151,13 @@ def _coverage_from_dataset(*, ds: xr.Dataset, period_type: str) -> dict[str, Any } +def _period_string_scalar(value: Any) -> str: + """Normalize a numpy scalar or 0-d array period string to plain Python str.""" + if isinstance(value, np.ndarray): + return str(value.item()) + return str(value) + + def xarray_to_temporary_netcdf(ds: xr.Dataset) -> str: """Write a dataset to a temporary NetCDF file and return the path.""" fd = tempfile.NamedTemporaryFile(suffix=".nc", delete=False) diff --git a/src/climate_api/data_registry/services/datasets.py b/src/climate_api/data_registry/services/datasets.py index 9f394681..258775ff 100644 --- a/src/climate_api/data_registry/services/datasets.py +++ b/src/climate_api/data_registry/services/datasets.py @@ -10,8 +10,10 @@ SCRIPT_DIR = Path(__file__).parent.resolve() CONFIGS_DIR = SCRIPT_DIR.parent.parent.parent.parent / "data" / "datasets" -SUPPORTED_SYNC_KINDS = {"temporal", "release", "static"} +SUPPORTED_SYNC_KINDS = {"temporal", "release", "static", "derived"} SUPPORTED_SYNC_EXECUTIONS = {"append", "rematerialize"} +SUPPORTED_RESAMPLE_METHODS = {"mean", "sum", "min", "max", "first", "last"} +SUPPORTED_WEEK_STARTS = {"monday", "sunday"} def list_datasets() -> list[dict[str, Any]]: @@ -71,6 +73,20 @@ def _validate_dataset_template(dataset: object, *, file_path: Path) -> None: f"'{sync_execution}'. Supported values: {supported}" ) + resample = dataset.get("resample") + if sync_kind == "derived": + if resample is None: + raise ValueError( + f"Dataset template '{dataset_id}' in {file_path.name} must define resample when sync_kind is derived" + ) + elif resample is not None: + raise ValueError( + f"Dataset template '{dataset_id}' in {file_path.name} may only define resample when sync_kind is derived" + ) + + if resample is not None: + _validate_resample(resample, dataset=dataset, dataset_id=dataset_id, file_path=file_path) + sync_availability = dataset.get("sync_availability") if sync_availability is not None: _validate_sync_availability(sync_availability, dataset_id=dataset_id, file_path=file_path) @@ -89,3 +105,46 @@ def _validate_sync_availability(sync_availability: object, *, dataset_id: str, f f"Dataset template '{dataset_id}' in {file_path.name} has invalid " "sync_availability.latest_available_function" ) + + +def _validate_resample( + resample: object, + *, + dataset: dict[str, Any], + dataset_id: str, + file_path: Path, +) -> None: + """Validate optional derived resampling metadata.""" + if not isinstance(resample, dict): + raise ValueError(f"Dataset template '{dataset_id}' in {file_path.name} has invalid resample") + + source_dataset_id = resample.get("source_dataset_id") + if not isinstance(source_dataset_id, str) or not source_dataset_id: + raise ValueError(f"Dataset template '{dataset_id}' in {file_path.name} has invalid resample.source_dataset_id") + + method = resample.get("method") + if not isinstance(method, str) or not method: + raise ValueError(f"Dataset template '{dataset_id}' in {file_path.name} has invalid resample.method") + if method not in SUPPORTED_RESAMPLE_METHODS: + supported = ", ".join(sorted(SUPPORTED_RESAMPLE_METHODS)) + raise ValueError( + f"Dataset template '{dataset_id}' in {file_path.name} has unsupported resample.method " + f"'{method}'. Supported values: {supported}" + ) + + week_start = resample.get("week_start") + period_type = dataset.get("period_type") + if week_start is not None: + if period_type != "weekly": + raise ValueError( + f"Dataset template '{dataset_id}' in {file_path.name} may only define " + "resample.week_start when period_type is weekly" + ) + if not isinstance(week_start, str) or not week_start: + raise ValueError(f"Dataset template '{dataset_id}' in {file_path.name} has invalid resample.week_start") + if week_start not in SUPPORTED_WEEK_STARTS: + supported = ", ".join(sorted(SUPPORTED_WEEK_STARTS)) + raise ValueError( + f"Dataset template '{dataset_id}' in {file_path.name} has unsupported resample.week_start " + f"'{week_start}'. Supported values: {supported}" + ) diff --git a/src/climate_api/ingestions/schemas.py b/src/climate_api/ingestions/schemas.py index 9c34b032..8e04227a 100644 --- a/src/climate_api/ingestions/schemas.py +++ b/src/climate_api/ingestions/schemas.py @@ -26,6 +26,7 @@ class SyncKind(StrEnum): TEMPORAL = "temporal" RELEASE = "release" STATIC = "static" + DERIVED = "derived" class SyncAction(StrEnum): diff --git a/src/climate_api/ingestions/services.py b/src/climate_api/ingestions/services.py index 6b41b48f..cb82f60e 100644 --- a/src/climate_api/ingestions/services.py +++ b/src/climate_api/ingestions/services.py @@ -321,6 +321,56 @@ def mutate(records: list[ArtifactRecord]) -> ArtifactRecord: return _mutate_records(mutate) +def store_materialized_zarr_artifact( + *, + dataset: dict[str, object], + start: str, + end: str | None, + extent_id: str | None, + bbox: list[float] | None, + zarr_path: Path, + overwrite: bool, + publish: bool, +) -> ArtifactRecord: + """Store metadata for a locally materialized Zarr artifact.""" + period_type = str(dataset["period_type"]) + start = _normalize_request_period(start, period_type=period_type, field_name="start") + end = _normalize_optional_request_period(end, period_type=period_type, field_name="end") + request_scope = ArtifactRequestScope( + start=start, + end=end, + extent_id=extent_id, + bbox=(bbox[0], bbox[1], bbox[2], bbox[3]) if bbox is not None else None, + ) + coverage_data = get_data_coverage_for_paths(dataset, zarr_path=str(zarr_path.resolve())) + if not coverage_data.get("has_data", True): + raise HTTPException(status_code=409, detail="Materialized artifact contains no data for the requested scope") + coverage = ArtifactCoverage( + temporal=CoverageTemporal(**coverage_data["coverage"]["temporal"]), + spatial=CoverageSpatial(**coverage_data["coverage"]["spatial"]), + ) + request_scope = request_scope.model_copy(update={"end": coverage.temporal.end}) + + record = ArtifactRecord( + artifact_id=str(uuid4()), + dataset_id=str(dataset["id"]), + dataset_name=str(dataset["name"]), + variable=str(dataset["variable"]), + format=ArtifactFormat.ZARR, + path=str(zarr_path.resolve()), + asset_paths=[str(zarr_path.resolve())], + variables=[str(dataset["variable"])], + request_scope=request_scope, + coverage=coverage, + created_at=datetime.now(UTC), + publication=ArtifactPublication(), + ) + stored_record = _upsert_artifact_record(record, prefer_zarr=True, publish=publish, overwrite=overwrite) + if publish and stored_record.publication.status != PublicationStatus.PUBLISHED: + return publish_artifact_record(stored_record.artifact_id) + return stored_record + + def sync_dataset( *, dataset_id: str, @@ -455,6 +505,44 @@ def mutate(records: list[ArtifactRecord]) -> ArtifactRecord: return _mutate_records(mutate) +def _upsert_artifact_record( + record: ArtifactRecord, + *, + prefer_zarr: bool, + publish: bool, + overwrite: bool, +) -> ArtifactRecord: + """Persist a new or replacement artifact record for the same logical request scope.""" + if not overwrite: + return _store_artifact_record(record, prefer_zarr=prefer_zarr, publish=publish) + + def mutate(records: list[ArtifactRecord]) -> ArtifactRecord: + existing = _find_existing_artifact_in_records( + records=records, + dataset_id=record.dataset_id, + request_scope=record.request_scope, + prefer_zarr=prefer_zarr, + ) + if existing is None: + records.append(record) + return record + + replacement = record.model_copy( + update={ + "artifact_id": existing.artifact_id, + "publication": existing.publication, + } + ) + for index, current in enumerate(records): + if current.artifact_id != existing.artifact_id: + continue + records[index] = replacement + return replacement + raise HTTPException(status_code=404, detail=f"Artifact '{existing.artifact_id}' not found") + + return _mutate_records(mutate) + + def _mutate_records(mutation: Callable[[list[ArtifactRecord]], ArtifactRecord]) -> ArtifactRecord: """Apply a read-modify-write mutation under an exclusive file lock.""" ensure_store() @@ -558,6 +646,8 @@ def _default_request_end(period_type: str) -> str: return datetime_to_period_string(utc_now(), period_type) if period_type == "daily": return utc_today().isoformat() + if period_type == "weekly": + return datetime_to_period_string(utc_now(), period_type) if period_type == "monthly": today = utc_today() return f"{today.year:04d}-{today.month:02d}" diff --git a/src/climate_api/ingestions/sync_engine.py b/src/climate_api/ingestions/sync_engine.py index e0870fd5..be14f51b 100644 --- a/src/climate_api/ingestions/sync_engine.py +++ b/src/climate_api/ingestions/sync_engine.py @@ -15,7 +15,7 @@ import inspect import logging from collections.abc import Callable -from datetime import date, timedelta +from datetime import date, datetime, timedelta from typing import Any from climate_api.ingestions.schemas import ArtifactRecord, SyncAction, SyncDetail, SyncKind, SyncResponse @@ -25,6 +25,7 @@ datetime_to_period_string, normalize_period_string, parse_hourly_period_string, + parse_period_string_to_datetime, utc_now, utc_today, ) @@ -75,6 +76,19 @@ def plan_sync( target_end=current_end, target_end_source="current_coverage", ) + if sync_kind == SyncKind.DERIVED: + return SyncDetail( + source_dataset_id=latest_artifact.dataset_id, + extent_id=latest_artifact.request_scope.extent_id, + sync_kind=sync_kind, + action=SyncAction.NOT_SYNCABLE, + reason="derived_sync_not_implemented", + message="This derived dataset does not support sync execution yet.", + current_start=current_start, + current_end=current_end, + target_end=current_end, + target_end_source="current_coverage", + ) period_type = str(source_dataset["period_type"]) normalized_requested_end = requested_end.strip() if isinstance(requested_end, str) else None @@ -304,6 +318,10 @@ def _next_period_start(latest_period_end: str, *, period_type: str) -> str: if period_type == "daily": current = date.fromisoformat(latest_period_end) return (current + timedelta(days=1)).isoformat() + if period_type == "weekly": + current = parse_period_string_to_datetime(latest_period_end).date() + next_week = datetime.combine(current + timedelta(days=7), datetime.min.time()) + return datetime_to_period_string(next_week, period_type) if period_type == "monthly": current = date.fromisoformat(f"{latest_period_end}-01") month = current.month + 1 @@ -322,6 +340,8 @@ def _default_target_end(*, period_type: str) -> str: return datetime_to_period_string(utc_now(), period_type) if period_type == "daily": return today.isoformat() + if period_type == "weekly": + return datetime_to_period_string(utc_now(), period_type) if period_type == "monthly": return f"{today.year:04d}-{today.month:02d}" if period_type == "yearly": diff --git a/src/climate_api/main.py b/src/climate_api/main.py index 9aa3c573..e18ab460 100644 --- a/src/climate_api/main.py +++ b/src/climate_api/main.py @@ -11,6 +11,7 @@ from climate_api.data_registry import routes as dataset_template_routes from climate_api.extents import routes as extent_routes from climate_api.ingestions import routes as ingestion_routes +from climate_api.processing import routes as processing_routes from climate_api.pygeoapi_app import mount_pygeoapi from climate_api.stac import routes as stac_routes from climate_api.system import routes as system_routes @@ -79,6 +80,7 @@ async def add_zarr_browser_access_headers( app.include_router(stac_routes.router, prefix="/stac", tags=["STAC"]) app.include_router(extent_routes.router, prefix="/extents", tags=["Extents"]) app.include_router(dataset_template_routes.router, prefix="/dataset-templates", tags=["Dataset templates"]) +app.include_router(processing_routes.router, prefix="/processes", tags=["Processes"]) app.include_router(ingestion_routes.datasets_router, prefix="/datasets", tags=["Datasets"]) app.include_router(ingestion_routes.ingestions_router, prefix="/ingestions", tags=["Ingestions"]) app.include_router(ingestion_routes.zarr_router, prefix="/zarr", tags=["Zarr"]) diff --git a/src/climate_api/processing/__init__.py b/src/climate_api/processing/__init__.py new file mode 100644 index 00000000..2b4e80f2 --- /dev/null +++ b/src/climate_api/processing/__init__.py @@ -0,0 +1 @@ +"""Derived data processing services.""" diff --git a/src/climate_api/processing/resample.py b/src/climate_api/processing/resample.py new file mode 100644 index 00000000..bcd81cb4 --- /dev/null +++ b/src/climate_api/processing/resample.py @@ -0,0 +1,310 @@ +"""Temporal resampling for derived managed datasets.""" + +from __future__ import annotations + +import logging +import shutil +from datetime import UTC, datetime, timedelta +from pathlib import Path +from typing import Any, cast + +import numpy as np +import xarray as xr +from fastapi import HTTPException + +from climate_api.data_accessor.services.accessor import open_zarr_dataset +from climate_api.data_manager.services.utils import get_time_dim +from climate_api.data_registry.services import datasets as registry_datasets +from climate_api.ingestions import services as ingestion_services +from climate_api.ingestions import sync_engine +from climate_api.ingestions.schemas import ArtifactFormat, ArtifactRecord, ArtifactRequestScope, PublicationStatus +from climate_api.publications.services import managed_dataset_id_for_scope +from climate_api.shared.time import datetime_to_period_string, parse_period_string_to_datetime + +logger = logging.getLogger(__name__) + +DERIVED_DATA_DIR = Path(__file__).resolve().parent.parent.parent.parent / "data" / "derived" +_PERIOD_ORDER = {"hourly": 0, "daily": 1, "weekly": 2, "monthly": 3, "yearly": 4} + + +def materialize_resampled_artifact( + *, + target_dataset: dict[str, Any], + start: str, + end: str | None, + extent_id: str | None, + bbox: list[float] | None, + overwrite: bool, + publish: bool, +) -> ArtifactRecord: + """Materialize a derived dataset by resampling an existing managed source dataset.""" + resample = _require_resample_config(target_dataset) + resolved_end = end or ingestion_services._default_request_end(str(target_dataset["period_type"])) + normalized_start = ingestion_services._normalize_request_period( + start, + period_type=str(target_dataset["period_type"]), + field_name="start", + ) + normalized_end = ingestion_services._normalize_request_period( + resolved_end, + period_type=str(target_dataset["period_type"]), + field_name="end", + ) + + existing = ingestion_services._find_existing_artifact( + dataset_id=str(target_dataset["id"]), + request_scope=ArtifactRequestScope( + start=normalized_start, + end=normalized_end, + extent_id=extent_id, + bbox=(bbox[0], bbox[1], bbox[2], bbox[3]) if bbox is not None and extent_id is None else None, + ), + prefer_zarr=True, + ) + if existing is not None and not overwrite: + if publish and existing.publication.status != PublicationStatus.PUBLISHED: + return ingestion_services.publish_artifact_record(existing.artifact_id) + return existing + + source_dataset_id = str(resample["source_dataset_id"]) + source_dataset = registry_datasets.get_dataset(source_dataset_id) + if source_dataset is None: + raise HTTPException(status_code=404, detail=f"Source dataset template '{source_dataset_id}' not found") + + _validate_period_hierarchy(source_period_type=str(source_dataset["period_type"]), target_dataset=target_dataset) + source_artifact = _resolve_source_artifact( + source_dataset_id=source_dataset_id, + extent_id=extent_id, + bbox=bbox, + ) + if source_artifact.format != ArtifactFormat.ZARR: + raise HTTPException(status_code=409, detail="Resampling currently requires a Zarr-backed source artifact") + + source_bbox = bbox + if source_bbox is None and source_artifact.request_scope.bbox is not None: + source_bbox = list(source_artifact.request_scope.bbox) + target_managed_dataset_id = managed_dataset_id_for_scope( + str(target_dataset["id"]), + extent_id=extent_id, + bbox=source_bbox if extent_id is None else None, + ) + zarr_path = DERIVED_DATA_DIR / f"{target_managed_dataset_id}.zarr" + + source_ds = open_zarr_dataset(source_artifact.path or source_artifact.asset_paths[0]) + try: + resampled = _resample_dataset( + source_ds=source_ds, + source_period_type=str(source_dataset["period_type"]), + resample_config=resample, + target_period_type=str(target_dataset["period_type"]), + start=normalized_start, + end=normalized_end, + ) + if resampled.sizes.get(get_time_dim(resampled), 0) == 0: + raise HTTPException(status_code=409, detail="Source artifact does not contain any complete target periods") + existing_realized = _find_existing_resampled_artifact( + target_dataset=target_dataset, + extent_id=extent_id, + bbox=source_bbox if extent_id is None else None, + start=normalized_start, + resampled=resampled, + ) + if existing_realized is not None and not overwrite: + if publish and existing_realized.publication.status != PublicationStatus.PUBLISHED: + return ingestion_services.publish_artifact_record(existing_realized.artifact_id) + return existing_realized + _write_resampled_zarr(resampled, zarr_path) + finally: + source_ds.close() + + return ingestion_services.store_materialized_zarr_artifact( + dataset=target_dataset, + start=normalized_start, + end=normalized_end, + extent_id=extent_id, + bbox=source_bbox if extent_id is None else None, + zarr_path=zarr_path, + overwrite=overwrite, + publish=publish, + ) + + +def _require_resample_config(target_dataset: dict[str, Any]) -> dict[str, Any]: + resample = target_dataset.get("resample") + if not isinstance(resample, dict): + raise HTTPException( + status_code=400, + detail=f"Dataset '{target_dataset.get('id')}' is not configured for resampling", + ) + return resample + + +def _validate_period_hierarchy(*, source_period_type: str, target_dataset: dict[str, Any]) -> None: + target_period_type = str(target_dataset["period_type"]) + if source_period_type not in _PERIOD_ORDER or target_period_type not in _PERIOD_ORDER: + raise HTTPException( + status_code=400, + detail=f"Unsupported source/target period types: {source_period_type} -> {target_period_type}", + ) + if _PERIOD_ORDER[source_period_type] >= _PERIOD_ORDER[target_period_type]: + raise HTTPException( + status_code=400, + detail=( + f"Resampling requires a coarser target period than source: {source_period_type} -> {target_period_type}" + ), + ) + + +def _resolve_source_artifact( + *, + source_dataset_id: str, + extent_id: str | None, + bbox: list[float] | None, +) -> ArtifactRecord: + managed_dataset_id = managed_dataset_id_for_scope(source_dataset_id, extent_id=extent_id, bbox=bbox) + return ingestion_services.get_latest_artifact_for_dataset_or_404(managed_dataset_id) + + +def _resample_dataset( + *, + source_ds: xr.Dataset, + source_period_type: str, + resample_config: dict[str, Any], + target_period_type: str, + start: str, + end: str, +) -> xr.Dataset: + time_dim = get_time_dim(source_ds) + target_end_exclusive = parse_period_string_to_datetime( + sync_engine._next_period_start(end, period_type=target_period_type) + ).replace(tzinfo=None) + target_start = parse_period_string_to_datetime(start).replace(tzinfo=None) + subset = source_ds.where(source_ds[time_dim] >= np.datetime64(target_start), drop=True) + subset = subset.where(subset[time_dim] < np.datetime64(target_end_exclusive), drop=True) + if subset.sizes.get(time_dim, 0) == 0: + raise HTTPException(status_code=409, detail="Source artifact contains no data for the requested resample range") + source_start = _coord_to_datetime(subset[time_dim].values[0]) + source_end = _coord_to_datetime(subset[time_dim].values[-1]) + + with xr.set_options(keep_attrs=True): + resampler = subset.resample( + { + time_dim: _resample_frequency( + target_period_type=target_period_type, + week_start=str(resample_config.get("week_start", "monday")), + ) + }, + label="left", + closed="left", + ) + result = cast(xr.Dataset, getattr(resampler, str(resample_config["method"]))()) + result = _drop_incomplete_edge_periods( + result=result, + source_start=source_start, + source_end=source_end, + source_period_type=source_period_type, + target_period_type=target_period_type, + ) + return result + + +def _resample_frequency(*, target_period_type: str, week_start: str) -> str: + if target_period_type == "daily": + return "1D" + if target_period_type == "weekly": + return "W-MON" if week_start == "monday" else "W-SUN" + if target_period_type == "monthly": + return "MS" + if target_period_type == "yearly": + return "YS" + raise HTTPException(status_code=400, detail=f"Unsupported target period_type '{target_period_type}' for resampling") + + +def _drop_incomplete_edge_periods( + *, + result: xr.Dataset, + source_start: datetime, + source_end: datetime, + source_period_type: str, + target_period_type: str, +) -> xr.Dataset: + time_dim = get_time_dim(result) + if result.sizes.get(time_dim, 0) == 0: + return result + + first_output_start = _coord_to_datetime(result[time_dim].values[0]) + if source_start > first_output_start: + result = result.isel({time_dim: slice(1, None)}) + if result.sizes.get(time_dim, 0) == 0: + return result + + last_output_start = _coord_to_datetime(result[time_dim].values[-1]) + next_target_start = parse_period_string_to_datetime( + sync_engine._next_period_start( + datetime_to_period_string(last_output_start.replace(tzinfo=UTC), target_period_type), + period_type=target_period_type, + ) + ).replace(tzinfo=None) + required_source_end = _previous_source_period_start(next_target_start, source_period_type=source_period_type) + if source_end < required_source_end: + return result.isel({time_dim: slice(0, -1)}) + return result + + +def _find_existing_resampled_artifact( + *, + target_dataset: dict[str, Any], + extent_id: str | None, + bbox: list[float] | None, + start: str, + resampled: xr.Dataset, +) -> ArtifactRecord | None: + time_dim = get_time_dim(resampled) + realized_end = datetime_to_period_string( + _coord_to_datetime(resampled[time_dim].values[-1]).replace(tzinfo=UTC), + str(target_dataset["period_type"]), + ) + return ingestion_services._find_existing_artifact( + dataset_id=str(target_dataset["id"]), + request_scope=ArtifactRequestScope( + start=start, + end=realized_end, + extent_id=extent_id, + bbox=(bbox[0], bbox[1], bbox[2], bbox[3]) if bbox is not None else None, + ), + prefer_zarr=True, + ) + + +def _previous_source_period_start(boundary: datetime, *, source_period_type: str) -> datetime: + if source_period_type == "hourly": + return boundary - timedelta(hours=1) + if source_period_type == "daily": + return boundary - timedelta(days=1) + if source_period_type == "weekly": + return boundary - timedelta(days=7) + if source_period_type == "monthly": + previous_month_last_day = boundary.replace(day=1) - timedelta(days=1) + return previous_month_last_day.replace(day=1) + if source_period_type == "yearly": + return boundary.replace(year=boundary.year - 1, month=1, day=1) + raise HTTPException(status_code=400, detail=f"Unsupported source period_type '{source_period_type}' for resampling") + + +def _write_resampled_zarr(ds: xr.Dataset, zarr_path: Path) -> None: + DERIVED_DATA_DIR.mkdir(parents=True, exist_ok=True) + if zarr_path.exists(): + shutil.rmtree(zarr_path) + ds_chunked = ds.chunk("auto").unify_chunks() + try: + ds_chunked.to_zarr(zarr_path, mode="w", consolidated=True) + finally: + ds_chunked.close() + ds.close() + + +def _coord_to_datetime(value: object) -> datetime: + if isinstance(value, datetime): + return value + np_value = np.datetime64(cast(Any, value)) + return datetime.fromisoformat(np.datetime_as_string(np_value, unit="s")) diff --git a/src/climate_api/processing/routes.py b/src/climate_api/processing/routes.py new file mode 100644 index 00000000..75f3c2fe --- /dev/null +++ b/src/climate_api/processing/routes.py @@ -0,0 +1,24 @@ +"""Routes for derived processing operations.""" + +from fastapi import APIRouter + +from climate_api.data_registry.routes import _get_dataset_or_404 +from climate_api.processing import services +from climate_api.processing.schemas import ResampleProcessRequest, ResampleProcessResponse + +router = APIRouter() + + +@router.post("/resample", response_model=ResampleProcessResponse) +def run_resample_process(request: ResampleProcessRequest) -> ResampleProcessResponse: + """Materialize a derived dataset by resampling an existing managed source dataset.""" + dataset = _get_dataset_or_404(request.dataset_id) + artifact_id, dataset_summary = services.run_resample_process( + dataset=dataset, + start=request.start, + end=request.end, + extent_id=request.extent_id, + overwrite=request.overwrite, + publish=request.publish, + ) + return ResampleProcessResponse(artifact_id=artifact_id, status="completed", dataset=dataset_summary) diff --git a/src/climate_api/processing/schemas.py b/src/climate_api/processing/schemas.py new file mode 100644 index 00000000..e4d479d1 --- /dev/null +++ b/src/climate_api/processing/schemas.py @@ -0,0 +1,33 @@ +"""Pydantic schemas for derived data processing routes.""" + +from pydantic import BaseModel, Field + +from climate_api.ingestions.schemas import DatasetRecord + + +class ResampleProcessRequest(BaseModel): + """Request payload for materializing a derived resampled dataset.""" + + dataset_id: str = Field(description="Target derived dataset template id from the Climate API registry.") + start: str = Field(description="Start period to materialize in the target dataset period format.") + end: str | None = Field(default=None, description="Optional end period to materialize.") + extent_id: str | None = Field( + default=None, + description="Configured Climate API extent identifier used to resolve spatial scope for the source dataset.", + ) + overwrite: bool = Field( + default=False, + description="Whether to force rematerialization of an existing matching derived artifact.", + ) + publish: bool = Field( + default=True, + description="Whether to publish the resulting derived dataset through pygeoapi.", + ) + + +class ResampleProcessResponse(BaseModel): + """Synchronous response returned after materializing a derived resampled dataset.""" + + artifact_id: str = Field(description="Identifier of the materialized derived artifact.") + status: str = Field(description="Execution status of the resample request.") + dataset: DatasetRecord = Field(description="Managed dataset summary produced or resolved by the resample request.") diff --git a/src/climate_api/processing/services.py b/src/climate_api/processing/services.py new file mode 100644 index 00000000..5721c31a --- /dev/null +++ b/src/climate_api/processing/services.py @@ -0,0 +1,29 @@ +"""Services for derived processing workflows.""" + +from __future__ import annotations + +from climate_api.ingestions import services as ingestion_services +from climate_api.ingestions.schemas import DatasetRecord +from climate_api.processing.resample import materialize_resampled_artifact + + +def run_resample_process( + *, + dataset: dict[str, object], + start: str, + end: str | None, + extent_id: str | None, + overwrite: bool, + publish: bool, +) -> tuple[str, DatasetRecord]: + """Materialize one derived resampled dataset and return its artifact id plus dataset summary.""" + artifact = materialize_resampled_artifact( + target_dataset=dataset, + start=start, + end=end, + extent_id=extent_id, + bbox=None, + overwrite=overwrite, + publish=publish, + ) + return artifact.artifact_id, ingestion_services.get_dataset_summary_for_artifact_or_404(artifact.artifact_id) diff --git a/src/climate_api/publications/services.py b/src/climate_api/publications/services.py index e7c28a14..ef3ca34f 100644 --- a/src/climate_api/publications/services.py +++ b/src/climate_api/publications/services.py @@ -163,17 +163,31 @@ def _provider_axes(record: ArtifactRecord) -> tuple[str, str, str]: ds.close() -def _collection_id_for(record: ArtifactRecord) -> str: - """Build a stable collection identifier for a logical dataset scope.""" - if record.request_scope.extent_id: - scope_key = record.request_scope.extent_id - elif record.request_scope.bbox: - bbox = ",".join(f"{value:.6f}" for value in record.request_scope.bbox) - scope_hash = f"{adler32(bbox.encode('utf-8')):08x}" +def managed_dataset_id_for_scope( + dataset_id: str, + *, + extent_id: str | None, + bbox: tuple[float, float, float, float] | list[float] | None, +) -> str: + """Build a stable managed dataset identifier from dataset scope inputs.""" + if extent_id: + scope_key = extent_id + elif bbox: + bbox_str = ",".join(f"{value:.6f}" for value in bbox) + scope_hash = f"{adler32(bbox_str.encode('utf-8')):08x}" scope_key = f"bbox_{scope_hash}" else: scope_key = "global" - return f"{record.dataset_id}_{scope_key}" + return f"{dataset_id}_{scope_key}" + + +def _collection_id_for(record: ArtifactRecord) -> str: + """Build a stable collection identifier for a logical dataset scope.""" + return managed_dataset_id_for_scope( + record.dataset_id, + extent_id=record.request_scope.extent_id, + bbox=record.request_scope.bbox, + ) def managed_dataset_id_for(record: ArtifactRecord) -> str: diff --git a/src/climate_api/shared/time.py b/src/climate_api/shared/time.py index 07ce02cb..66410148 100644 --- a/src/climate_api/shared/time.py +++ b/src/climate_api/shared/time.py @@ -1,10 +1,13 @@ """Time helpers shared across Climate API modules.""" +import re from datetime import UTC, date, datetime -from typing import Any +from typing import Any, cast import numpy as np +_WEEKLY_PERIOD_PATTERN = re.compile(r"^(?P\d{4})-W(?P\d{2})$") + def _normalize_datetime_for_period(value: datetime) -> datetime: """Convert aware datetimes to UTC before deriving dataset-native periods.""" @@ -13,6 +16,14 @@ def _normalize_datetime_for_period(value: datetime) -> datetime: return value +def _coerce_numpy_datetime(value: object) -> datetime: + """Convert a numpy or Python datetime-like scalar to a datetime.""" + if isinstance(value, datetime): + return value + np_value = np.datetime64(cast(Any, value)) + return datetime.fromisoformat(np.datetime_as_string(np_value, unit="s")) + + def datetime_to_period_string(value: datetime, period_type: str) -> str: """Convert a datetime to the dataset-native period string format.""" value = _normalize_datetime_for_period(value) @@ -20,11 +31,14 @@ def datetime_to_period_string(value: datetime, period_type: str) -> str: return value.replace(minute=0, second=0, microsecond=0).strftime("%Y-%m-%dT%H") if period_type == "daily": return value.date().isoformat() + if period_type == "weekly": + iso_year, iso_week, _ = value.isocalendar() + return f"{iso_year:04d}-W{iso_week:02d}" if period_type == "monthly": return f"{value.year:04d}-{value.month:02d}" if period_type == "yearly": return str(value.year) - return value.isoformat() + raise ValueError(f"Unsupported period_type '{period_type}'") def utc_now() -> datetime: @@ -44,6 +58,16 @@ def parse_hourly_period_string(value: str) -> datetime: return datetime.fromisoformat(value) +def parse_weekly_period_string(value: str) -> datetime: + """Parse a dataset-native weekly period string or full ISO datetime.""" + match = _WEEKLY_PERIOD_PATTERN.fullmatch(value) + if match is not None: + iso_year = int(match.group("year")) + iso_week = int(match.group("week")) + return datetime.combine(date.fromisocalendar(iso_year, iso_week, 1), datetime.min.time()) + return datetime.fromisoformat(value) + + def normalize_period_string(value: str, period_type: str) -> str: """Normalize an input period string to the dataset-native period format.""" if period_type == "hourly": @@ -56,6 +80,11 @@ def normalize_period_string(value: str, period_type: str) -> str: return datetime_to_period_string(datetime.fromisoformat(value), period_type) except ValueError as exc: raise ValueError(f"Invalid daily period '{value}'; expected YYYY-MM-DD or ISO datetime") from exc + if period_type == "weekly": + try: + return datetime_to_period_string(parse_weekly_period_string(value), period_type) + except ValueError as exc: + raise ValueError(f"Invalid weekly period '{value}'; expected YYYY-Www or ISO datetime") from exc if period_type == "monthly": try: if len(value) == 7: @@ -78,6 +107,8 @@ def normalize_period_string(value: str, period_type: str) -> str: def parse_period_string_to_datetime(value: str) -> datetime: """Parse a dataset-native period string to a UTC datetime.""" normalized = value.strip() + if _WEEKLY_PERIOD_PATTERN.fullmatch(normalized) is not None: + return parse_weekly_period_string(normalized).replace(tzinfo=UTC) if "T" not in normalized: if len(normalized) == 4: normalized = f"{normalized}-01-01T00:00:00" @@ -94,9 +125,14 @@ def parse_period_string_to_datetime(value: str) -> datetime: def numpy_datetime_to_period_string(datetimes: np.ndarray[Any, Any], period_type: str) -> np.ndarray[Any, Any]: """Convert an array of numpy datetimes to truncated period strings.""" - # TODO: this and numpy_period_string should be merged - s = np.datetime_as_string(datetimes, unit="s") - - # Map periods to string lengths: YYYY-MM-DDTHH (13), YYYY-MM-DD (10), etc. - lengths = {"hourly": 13, "daily": 10, "monthly": 7, "yearly": 4} - return s.astype(f"U{lengths[period_type]}") + if period_type != "weekly": + lengths = {"hourly": 13, "daily": 10, "monthly": 7, "yearly": 4} + return np.datetime_as_string(datetimes, unit="s").astype(f"U{lengths[period_type]}") + + arr = np.asarray(datetimes, dtype="datetime64[s]") + to_period_string = np.vectorize( + lambda value: datetime_to_period_string(_coerce_numpy_datetime(value), period_type), + otypes=[str], + ) + result = to_period_string(arr) + return cast(np.ndarray[Any, Any], result.astype("U8")) diff --git a/tests/test_dataset_registry.py b/tests/test_dataset_registry.py index cca324d1..31d4b9ca 100644 --- a/tests/test_dataset_registry.py +++ b/tests/test_dataset_registry.py @@ -63,6 +63,51 @@ def test_dataset_registry_accepts_supported_sync_kind( assert datasets.list_datasets()[0]["id"] == "valid_temporal" +def test_dataset_registry_accepts_derived_sync_kind( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + registry_file = tmp_path / "valid_derived.yaml" + registry_file.write_text( + """ +- id: derived_weekly + name: Derived weekly + variable: value + period_type: weekly + sync_kind: derived + resample: + source_dataset_id: source_daily + method: sum + week_start: monday +""", + encoding="utf-8", + ) + monkeypatch.setattr(datasets, "CONFIGS_DIR", tmp_path) + + assert datasets.list_datasets()[0]["sync_kind"] == "derived" + + +def test_dataset_registry_rejects_derived_sync_kind_without_resample( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + registry_file = tmp_path / "derived_missing_resample.yaml" + registry_file.write_text( + """ +- id: derived_missing_resample + name: Derived missing resample + variable: value + period_type: weekly + sync_kind: derived +""", + encoding="utf-8", + ) + monkeypatch.setattr(datasets, "CONFIGS_DIR", tmp_path) + + with pytest.raises(ValueError, match="must define resample when sync_kind is derived"): + datasets.list_datasets() + + def test_dataset_registry_rejects_unsupported_sync_execution( monkeypatch: pytest.MonkeyPatch, tmp_path: Path, @@ -174,3 +219,76 @@ def test_dataset_registry_accepts_sync_availability_function( assert datasets.list_datasets()[0]["sync_availability"]["latest_available_function"].endswith( "lagged_latest_available" ) + + +def test_dataset_registry_rejects_invalid_resample_method( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + registry_file = tmp_path / "invalid_resample_method.yaml" + registry_file.write_text( + """ +- id: derived_invalid_method + name: Derived invalid method + variable: value + period_type: weekly + sync_kind: derived + resample: + source_dataset_id: source_daily + method: median +""", + encoding="utf-8", + ) + monkeypatch.setattr(datasets, "CONFIGS_DIR", tmp_path) + + with pytest.raises(ValueError, match="unsupported resample.method 'median'"): + datasets.list_datasets() + + +def test_dataset_registry_rejects_resample_for_non_derived_sync_kind( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + registry_file = tmp_path / "temporal_with_resample.yaml" + registry_file.write_text( + """ +- id: temporal_with_resample + name: Temporal with resample + variable: value + period_type: weekly + sync_kind: temporal + resample: + source_dataset_id: source_daily + method: sum +""", + encoding="utf-8", + ) + monkeypatch.setattr(datasets, "CONFIGS_DIR", tmp_path) + + with pytest.raises(ValueError, match="may only define resample when sync_kind is derived"): + datasets.list_datasets() + + +def test_dataset_registry_rejects_week_start_for_non_weekly_target( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + registry_file = tmp_path / "invalid_resample_week_start.yaml" + registry_file.write_text( + """ +- id: derived_monthly_with_week_start + name: Derived monthly with week start + variable: value + period_type: monthly + sync_kind: derived + resample: + source_dataset_id: source_daily + method: sum + week_start: monday +""", + encoding="utf-8", + ) + monkeypatch.setattr(datasets, "CONFIGS_DIR", tmp_path) + + with pytest.raises(ValueError, match="may only define resample.week_start when period_type is weekly"): + datasets.list_datasets() diff --git a/tests/test_datasets_sync.py b/tests/test_datasets_sync.py index 9711ec3c..a85827b1 100644 --- a/tests/test_datasets_sync.py +++ b/tests/test_datasets_sync.py @@ -352,8 +352,8 @@ def now(cls, tz: tzinfo | None = None) -> "FixedDateTime": def test_default_target_end_rejects_unsupported_period_type() -> None: - with pytest.raises(ValueError, match="Unsupported period_type 'weekly' for sync"): - sync_engine._default_target_end(period_type="weekly") + with pytest.raises(ValueError, match="Unsupported period_type 'fortnightly' for sync"): + sync_engine._default_target_end(period_type="fortnightly") def test_next_period_start_preserves_hourly_period_format() -> None: @@ -362,6 +362,26 @@ def test_next_period_start_preserves_hourly_period_format() -> None: assert result == "2026-04-21T14" +def test_default_weekly_target_end_uses_iso_week_format(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(sync_engine, "utc_now", lambda: datetime(2026, 4, 21, 13, 47, 31, tzinfo=UTC)) + + result = sync_engine._default_target_end(period_type="weekly") + + assert result == "2026-W17" + + +def test_next_period_start_preserves_weekly_period_format() -> None: + result = sync_engine._next_period_start("2026-W17", period_type="weekly") + + assert result == "2026-W18" + + +def test_next_period_start_rolls_weekly_period_across_iso_year_boundary() -> None: + result = sync_engine._next_period_start("2020-W53", period_type="weekly") + + assert result == "2021-W01" + + def test_sync_dataset_static_policy_returns_not_syncable_without_period_arithmetic( monkeypatch: pytest.MonkeyPatch, ) -> None: @@ -420,6 +440,25 @@ def test_plan_sync_static_policy_ignores_period_normalization() -> None: assert result.reason == "static_dataset" +def test_plan_sync_derived_policy_returns_not_syncable_without_download_path() -> None: + latest = _artifact( + artifact_id="a1", + source_dataset_id="chirps3_precipitation_weekly", + managed_dataset_id="chirps3_precipitation_weekly_sle", + end="2026-W17", + ) + + result = sync_engine.plan_sync( + source_dataset={"id": "chirps3_precipitation_weekly", "period_type": "weekly", "sync_kind": "derived"}, + latest_artifact=latest, + requested_end=None, + ) + + assert result.sync_kind == SyncKind.DERIVED + assert result.action == SyncAction.NOT_SYNCABLE + assert result.reason == "derived_sync_not_implemented" + + def test_plan_sync_dataset_returns_plan_without_creating_artifact(monkeypatch: pytest.MonkeyPatch) -> None: dataset_id = "chirps3_precipitation_daily_sle" latest = _artifact(artifact_id="a1", managed_dataset_id=dataset_id, end="2026-01-31") diff --git a/tests/test_processing_resample.py b/tests/test_processing_resample.py new file mode 100644 index 00000000..7351c22f --- /dev/null +++ b/tests/test_processing_resample.py @@ -0,0 +1,740 @@ +from datetime import UTC, datetime +from pathlib import Path + +import numpy as np +import pytest +import xarray as xr + +from climate_api.ingestions import services as ingestion_services +from climate_api.ingestions.schemas import ( + ArtifactCoverage, + ArtifactFormat, + ArtifactPublication, + ArtifactRecord, + ArtifactRequestScope, + CoverageSpatial, + CoverageTemporal, + PublicationStatus, +) +from climate_api.processing import resample + + +@pytest.fixture(autouse=True) +def isolated_artifact_store(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + artifacts_dir = tmp_path / "artifacts" + monkeypatch.setattr(ingestion_services, "ARTIFACTS_DIR", artifacts_dir) + monkeypatch.setattr(ingestion_services, "ARTIFACTS_INDEX_PATH", artifacts_dir / "records.json") + monkeypatch.setattr(resample, "DERIVED_DATA_DIR", tmp_path / "derived") + + +def _artifact( + *, + artifact_id: str, + dataset_id: str, + managed_dataset_id: str, + path: Path, + start: str, + end: str, +) -> ArtifactRecord: + return ArtifactRecord( + artifact_id=artifact_id, + dataset_id=dataset_id, + dataset_name=dataset_id, + variable="value", + format=ArtifactFormat.ZARR, + path=str(path), + asset_paths=[str(path)], + variables=["value"], + request_scope=ArtifactRequestScope( + start=start, + end=end, + extent_id="sle", + bbox=(1.0, 2.0, 3.0, 4.0), + ), + coverage=ArtifactCoverage( + temporal=CoverageTemporal(start=start, end=end), + spatial=CoverageSpatial(xmin=1.0, ymin=2.0, xmax=3.0, ymax=4.0), + ), + created_at=datetime(2026, 1, 10, tzinfo=UTC), + publication=ArtifactPublication( + status=PublicationStatus.PUBLISHED, + collection_id=managed_dataset_id, + pygeoapi_path=f"/ogcapi/collections/{managed_dataset_id}", + ), + ) + + +def test_materialize_resampled_artifact_builds_daily_dataset_from_hourly_source( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + source_path = tmp_path / "source_hourly.zarr" + time = np.array("2026-01-01T00", dtype="datetime64[h]") + np.arange(48) + ds = xr.Dataset( + {"value": (("time", "lat", "lon"), np.arange(48, dtype=float).reshape(48, 1, 1))}, + coords={"time": time, "lat": [2.0], "lon": [1.0]}, + ) + ds.to_zarr(source_path, mode="w", consolidated=True) + + source_artifact = _artifact( + artifact_id="source-hourly", + dataset_id="era5land_temperature_hourly", + managed_dataset_id="era5land_temperature_hourly_sle", + path=source_path, + start="2026-01-01T00", + end="2026-01-02T23", + ) + target_dataset = { + "id": "era5land_temperature_daily", + "name": "ERA5-Land daily temperature", + "variable": "value", + "period_type": "daily", + "resample": {"source_dataset_id": "era5land_temperature_hourly", "method": "mean"}, + } + + monkeypatch.setattr( + resample.registry_datasets, + "get_dataset", + lambda dataset_id: ( + {"id": dataset_id, "period_type": "hourly"} if dataset_id == "era5land_temperature_hourly" else None + ), + ) + monkeypatch.setattr( + resample.ingestion_services, + "get_latest_artifact_for_dataset_or_404", + lambda _: source_artifact, + ) + + artifact = resample.materialize_resampled_artifact( + target_dataset=target_dataset, + start="2026-01-01", + end="2026-01-02", + extent_id="sle", + bbox=None, + overwrite=False, + publish=False, + ) + + assert artifact.dataset_id == "era5land_temperature_daily" + assert artifact.coverage.temporal.start == "2026-01-01" + assert artifact.coverage.temporal.end == "2026-01-02" + result = xr.open_zarr(artifact.path, consolidated=True) + try: + assert result["value"].shape == (2, 1, 1) + assert result["value"].values[:, 0, 0].tolist() == [11.5, 35.5] + finally: + result.close() + + +def test_materialize_resampled_artifact_rejects_invalid_period_hierarchy( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + source_path = tmp_path / "source_daily_invalid_hierarchy.zarr" + time = np.array("2026-01-01", dtype="datetime64[D]") + np.arange(2) + ds = xr.Dataset( + {"value": (("time", "lat", "lon"), np.ones((2, 1, 1), dtype=float))}, + coords={"time": time, "lat": [2.0], "lon": [1.0]}, + ) + ds.to_zarr(source_path, mode="w", consolidated=True) + + source_artifact = _artifact( + artifact_id="source-daily-invalid", + dataset_id="chirps3_precipitation_daily", + managed_dataset_id="chirps3_precipitation_daily_sle", + path=source_path, + start="2026-01-01", + end="2026-01-02", + ) + target_dataset = { + "id": "chirps3_precipitation_hourly", + "name": "CHIRPS hourly precipitation", + "variable": "value", + "period_type": "hourly", + "resample": {"source_dataset_id": "chirps3_precipitation_daily", "method": "sum"}, + } + + monkeypatch.setattr( + resample.registry_datasets, + "get_dataset", + lambda dataset_id: ( + {"id": dataset_id, "period_type": "daily"} if dataset_id == "chirps3_precipitation_daily" else None + ), + ) + monkeypatch.setattr( + resample.ingestion_services, + "get_latest_artifact_for_dataset_or_404", + lambda _: source_artifact, + ) + + with pytest.raises(resample.HTTPException, match="Resampling requires a coarser target period than source"): + resample.materialize_resampled_artifact( + target_dataset=target_dataset, + start="2026-01-01T00", + end="2026-01-01T23", + extent_id="sle", + bbox=None, + overwrite=False, + publish=False, + ) + + +def test_materialize_resampled_artifact_returns_404_when_source_dataset_template_is_missing() -> None: + target_dataset = { + "id": "chirps3_precipitation_weekly", + "name": "CHIRPS weekly precipitation", + "variable": "value", + "period_type": "weekly", + "resample": {"source_dataset_id": "missing_daily", "method": "sum", "week_start": "monday"}, + } + + with pytest.raises(resample.HTTPException, match="Source dataset template 'missing_daily' not found"): + resample.materialize_resampled_artifact( + target_dataset=target_dataset, + start="2026-W02", + end="2026-W03", + extent_id="sle", + bbox=None, + overwrite=False, + publish=False, + ) + + +def test_materialize_resampled_artifact_drops_incomplete_trailing_week( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + source_path = tmp_path / "source_daily.zarr" + time = np.array("2026-01-05", dtype="datetime64[D]") + np.arange(10) + ds = xr.Dataset( + {"value": (("time", "lat", "lon"), np.ones((10, 1, 1), dtype=float))}, + coords={"time": time, "lat": [2.0], "lon": [1.0]}, + ) + ds.to_zarr(source_path, mode="w", consolidated=True) + + source_artifact = _artifact( + artifact_id="source-daily", + dataset_id="chirps3_precipitation_daily", + managed_dataset_id="chirps3_precipitation_daily_sle", + path=source_path, + start="2026-01-05", + end="2026-01-14", + ) + target_dataset = { + "id": "chirps3_precipitation_weekly", + "name": "CHIRPS weekly precipitation", + "variable": "value", + "period_type": "weekly", + "resample": { + "source_dataset_id": "chirps3_precipitation_daily", + "method": "sum", + "week_start": "monday", + }, + } + + monkeypatch.setattr( + resample.registry_datasets, + "get_dataset", + lambda dataset_id: ( + {"id": dataset_id, "period_type": "daily"} if dataset_id == "chirps3_precipitation_daily" else None + ), + ) + monkeypatch.setattr( + resample.ingestion_services, + "get_latest_artifact_for_dataset_or_404", + lambda _: source_artifact, + ) + + artifact = resample.materialize_resampled_artifact( + target_dataset=target_dataset, + start="2026-W02", + end="2026-W03", + extent_id="sle", + bbox=None, + overwrite=False, + publish=False, + ) + + assert artifact.coverage.temporal.start == "2026-W02" + assert artifact.coverage.temporal.end == "2026-W02" + result = xr.open_zarr(artifact.path, consolidated=True) + try: + assert result["value"].values[:, 0, 0].tolist() == [7.0] + finally: + result.close() + + +def test_materialize_resampled_artifact_drops_incomplete_leading_week( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + source_path = tmp_path / "source_daily_leading_partial.zarr" + time = np.array("2026-01-07", dtype="datetime64[D]") + np.arange(12) + ds = xr.Dataset( + {"value": (("time", "lat", "lon"), np.ones((12, 1, 1), dtype=float))}, + coords={"time": time, "lat": [2.0], "lon": [1.0]}, + ) + ds.to_zarr(source_path, mode="w", consolidated=True) + + source_artifact = _artifact( + artifact_id="source-daily-leading-partial", + dataset_id="chirps3_precipitation_daily", + managed_dataset_id="chirps3_precipitation_daily_sle", + path=source_path, + start="2026-01-07", + end="2026-01-18", + ) + target_dataset = { + "id": "chirps3_precipitation_weekly", + "name": "CHIRPS weekly precipitation", + "variable": "value", + "period_type": "weekly", + "resample": { + "source_dataset_id": "chirps3_precipitation_daily", + "method": "sum", + "week_start": "monday", + }, + } + + monkeypatch.setattr( + resample.registry_datasets, + "get_dataset", + lambda dataset_id: ( + {"id": dataset_id, "period_type": "daily"} if dataset_id == "chirps3_precipitation_daily" else None + ), + ) + monkeypatch.setattr( + resample.ingestion_services, + "get_latest_artifact_for_dataset_or_404", + lambda _: source_artifact, + ) + + artifact = resample.materialize_resampled_artifact( + target_dataset=target_dataset, + start="2026-W02", + end="2026-W03", + extent_id="sle", + bbox=None, + overwrite=False, + publish=False, + ) + + assert artifact.coverage.temporal.start == "2026-W03" + assert artifact.coverage.temporal.end == "2026-W03" + result = xr.open_zarr(artifact.path, consolidated=True) + try: + assert result["value"].values[:, 0, 0].tolist() == [7.0] + finally: + result.close() + + +def test_materialize_resampled_artifact_returns_409_when_source_has_no_data_in_requested_range( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + source_path = tmp_path / "source_daily_empty_range.zarr" + time = np.array("2026-01-01", dtype="datetime64[D]") + np.arange(7) + ds = xr.Dataset( + {"value": (("time", "lat", "lon"), np.ones((7, 1, 1), dtype=float))}, + coords={"time": time, "lat": [2.0], "lon": [1.0]}, + ) + ds.to_zarr(source_path, mode="w", consolidated=True) + + source_artifact = _artifact( + artifact_id="source-daily-empty", + dataset_id="chirps3_precipitation_daily", + managed_dataset_id="chirps3_precipitation_daily_sle", + path=source_path, + start="2026-01-01", + end="2026-01-07", + ) + target_dataset = { + "id": "chirps3_precipitation_weekly", + "name": "CHIRPS weekly precipitation", + "variable": "value", + "period_type": "weekly", + "resample": { + "source_dataset_id": "chirps3_precipitation_daily", + "method": "sum", + "week_start": "monday", + }, + } + + monkeypatch.setattr( + resample.registry_datasets, + "get_dataset", + lambda dataset_id: ( + {"id": dataset_id, "period_type": "daily"} if dataset_id == "chirps3_precipitation_daily" else None + ), + ) + monkeypatch.setattr( + resample.ingestion_services, + "get_latest_artifact_for_dataset_or_404", + lambda _: source_artifact, + ) + + with pytest.raises( + resample.HTTPException, + match="Source artifact contains no data for the requested resample range", + ): + resample.materialize_resampled_artifact( + target_dataset=target_dataset, + start="2026-W10", + end="2026-W10", + extent_id="sle", + bbox=None, + overwrite=False, + publish=False, + ) + + +def test_materialize_resampled_artifact_builds_monthly_dataset_from_daily_source( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + source_path = tmp_path / "source_daily_monthly.zarr" + time = np.array("2026-01-01", dtype="datetime64[D]") + np.arange(31) + ds = xr.Dataset( + {"value": (("time", "lat", "lon"), np.ones((31, 1, 1), dtype=float))}, + coords={"time": time, "lat": [2.0], "lon": [1.0]}, + ) + ds.to_zarr(source_path, mode="w", consolidated=True) + + source_artifact = _artifact( + artifact_id="source-daily-monthly", + dataset_id="chirps3_precipitation_daily", + managed_dataset_id="chirps3_precipitation_daily_sle", + path=source_path, + start="2026-01-01", + end="2026-01-31", + ) + target_dataset = { + "id": "chirps3_precipitation_monthly", + "name": "CHIRPS monthly precipitation", + "variable": "value", + "period_type": "monthly", + "resample": { + "source_dataset_id": "chirps3_precipitation_daily", + "method": "sum", + }, + } + + monkeypatch.setattr( + resample.registry_datasets, + "get_dataset", + lambda dataset_id: ( + {"id": dataset_id, "period_type": "daily"} if dataset_id == "chirps3_precipitation_daily" else None + ), + ) + monkeypatch.setattr( + resample.ingestion_services, + "get_latest_artifact_for_dataset_or_404", + lambda _: source_artifact, + ) + + artifact = resample.materialize_resampled_artifact( + target_dataset=target_dataset, + start="2026-01", + end="2026-01", + extent_id="sle", + bbox=None, + overwrite=False, + publish=False, + ) + + assert artifact.coverage.temporal.start == "2026-01" + assert artifact.coverage.temporal.end == "2026-01" + result = xr.open_zarr(artifact.path, consolidated=True) + try: + assert result["value"].values[:, 0, 0].tolist() == [31.0] + finally: + result.close() + + +def test_materialize_resampled_artifact_reuses_existing_artifact_when_overwrite_is_false( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + source_path = tmp_path / "source_hourly.zarr" + time = np.array("2026-01-01T00", dtype="datetime64[h]") + np.arange(24) + ds = xr.Dataset( + {"value": (("time", "lat", "lon"), np.arange(24, dtype=float).reshape(24, 1, 1))}, + coords={"time": time, "lat": [2.0], "lon": [1.0]}, + ) + ds.to_zarr(source_path, mode="w", consolidated=True) + + source_artifact = _artifact( + artifact_id="source-hourly", + dataset_id="era5land_temperature_hourly", + managed_dataset_id="era5land_temperature_hourly_sle", + path=source_path, + start="2026-01-01T00", + end="2026-01-01T23", + ) + target_dataset = { + "id": "era5land_temperature_daily", + "name": "ERA5-Land daily temperature", + "variable": "value", + "period_type": "daily", + "resample": {"source_dataset_id": "era5land_temperature_hourly", "method": "mean"}, + } + + monkeypatch.setattr( + resample.registry_datasets, + "get_dataset", + lambda dataset_id: ( + {"id": dataset_id, "period_type": "hourly"} if dataset_id == "era5land_temperature_hourly" else None + ), + ) + monkeypatch.setattr( + resample.ingestion_services, + "get_latest_artifact_for_dataset_or_404", + lambda _: source_artifact, + ) + + first = resample.materialize_resampled_artifact( + target_dataset=target_dataset, + start="2026-01-01", + end="2026-01-01", + extent_id="sle", + bbox=None, + overwrite=False, + publish=False, + ) + + monkeypatch.setattr( + resample.ingestion_services, + "get_latest_artifact_for_dataset_or_404", + lambda _: pytest.fail("existing derived artifact should be reused before resolving source artifact"), + ) + second = resample.materialize_resampled_artifact( + target_dataset=target_dataset, + start="2026-01-01", + end="2026-01-01", + extent_id="sle", + bbox=None, + overwrite=False, + publish=False, + ) + + assert second.artifact_id == first.artifact_id + + +def test_materialize_resampled_artifact_reuses_existing_artifact_by_realized_end( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + source_path = tmp_path / "source_daily_realized_reuse.zarr" + time = np.array("2026-01-05", dtype="datetime64[D]") + np.arange(10) + ds = xr.Dataset( + {"value": (("time", "lat", "lon"), np.ones((10, 1, 1), dtype=float))}, + coords={"time": time, "lat": [2.0], "lon": [1.0]}, + ) + ds.to_zarr(source_path, mode="w", consolidated=True) + + source_artifact = _artifact( + artifact_id="source-daily-realized-reuse", + dataset_id="chirps3_precipitation_daily", + managed_dataset_id="chirps3_precipitation_daily_sle", + path=source_path, + start="2026-01-05", + end="2026-01-14", + ) + target_dataset = { + "id": "chirps3_precipitation_weekly", + "name": "CHIRPS weekly precipitation", + "variable": "value", + "period_type": "weekly", + "resample": { + "source_dataset_id": "chirps3_precipitation_daily", + "method": "sum", + "week_start": "monday", + }, + } + + monkeypatch.setattr( + resample.registry_datasets, + "get_dataset", + lambda dataset_id: ( + {"id": dataset_id, "period_type": "daily"} if dataset_id == "chirps3_precipitation_daily" else None + ), + ) + monkeypatch.setattr( + resample.ingestion_services, + "get_latest_artifact_for_dataset_or_404", + lambda _: source_artifact, + ) + + first = resample.materialize_resampled_artifact( + target_dataset=target_dataset, + start="2026-W02", + end="2026-W03", + extent_id="sle", + bbox=None, + overwrite=False, + publish=False, + ) + second = resample.materialize_resampled_artifact( + target_dataset=target_dataset, + start="2026-W02", + end="2026-W03", + extent_id="sle", + bbox=None, + overwrite=False, + publish=False, + ) + + assert second.artifact_id == first.artifact_id + + +def test_materialize_resampled_artifact_publishes_reused_existing_artifact_when_requested( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + source_path = tmp_path / "source_hourly_publish_existing.zarr" + time = np.array("2026-01-01T00", dtype="datetime64[h]") + np.arange(24) + ds = xr.Dataset( + {"value": (("time", "lat", "lon"), np.arange(24, dtype=float).reshape(24, 1, 1))}, + coords={"time": time, "lat": [2.0], "lon": [1.0]}, + ) + ds.to_zarr(source_path, mode="w", consolidated=True) + + source_artifact = _artifact( + artifact_id="source-hourly-publish-existing", + dataset_id="era5land_temperature_hourly", + managed_dataset_id="era5land_temperature_hourly_sle", + path=source_path, + start="2026-01-01T00", + end="2026-01-01T23", + ) + target_dataset = { + "id": "era5land_temperature_daily", + "name": "ERA5-Land daily temperature", + "variable": "value", + "period_type": "daily", + "resample": {"source_dataset_id": "era5land_temperature_hourly", "method": "mean"}, + } + + monkeypatch.setattr( + resample.registry_datasets, + "get_dataset", + lambda dataset_id: ( + {"id": dataset_id, "period_type": "hourly"} if dataset_id == "era5land_temperature_hourly" else None + ), + ) + monkeypatch.setattr( + resample.ingestion_services, + "get_latest_artifact_for_dataset_or_404", + lambda _: source_artifact, + ) + + existing = resample.materialize_resampled_artifact( + target_dataset=target_dataset, + start="2026-01-01", + end="2026-01-01", + extent_id="sle", + bbox=None, + overwrite=False, + publish=False, + ) + + published = existing.model_copy( + update={ + "publication": existing.publication.model_copy(update={"status": PublicationStatus.PUBLISHED}), + } + ) + publish_calls: list[str] = [] + + def _publish_artifact_record(artifact_id: str) -> ArtifactRecord: + publish_calls.append(artifact_id) + return published + + monkeypatch.setattr(resample.ingestion_services, "publish_artifact_record", _publish_artifact_record) + + reused = resample.materialize_resampled_artifact( + target_dataset=target_dataset, + start="2026-01-01", + end="2026-01-01", + extent_id="sle", + bbox=None, + overwrite=False, + publish=True, + ) + + assert publish_calls == [existing.artifact_id] + assert reused.publication.status == PublicationStatus.PUBLISHED + + +def test_materialize_resampled_artifact_rematerializes_when_overwrite_is_true( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + source_path = tmp_path / "source_hourly_overwrite.zarr" + time = np.array("2026-01-01T00", dtype="datetime64[h]") + np.arange(24) + initial_ds = xr.Dataset( + {"value": (("time", "lat", "lon"), np.arange(24, dtype=float).reshape(24, 1, 1))}, + coords={"time": time, "lat": [2.0], "lon": [1.0]}, + ) + initial_ds.to_zarr(source_path, mode="w", consolidated=True) + + source_artifact = _artifact( + artifact_id="source-hourly-overwrite", + dataset_id="era5land_temperature_hourly", + managed_dataset_id="era5land_temperature_hourly_sle", + path=source_path, + start="2026-01-01T00", + end="2026-01-01T23", + ) + target_dataset = { + "id": "era5land_temperature_daily", + "name": "ERA5-Land daily temperature", + "variable": "value", + "period_type": "daily", + "resample": {"source_dataset_id": "era5land_temperature_hourly", "method": "mean"}, + } + + monkeypatch.setattr( + resample.registry_datasets, + "get_dataset", + lambda dataset_id: ( + {"id": dataset_id, "period_type": "hourly"} if dataset_id == "era5land_temperature_hourly" else None + ), + ) + monkeypatch.setattr( + resample.ingestion_services, + "get_latest_artifact_for_dataset_or_404", + lambda _: source_artifact, + ) + + first = resample.materialize_resampled_artifact( + target_dataset=target_dataset, + start="2026-01-01", + end="2026-01-01", + extent_id="sle", + bbox=None, + overwrite=False, + publish=False, + ) + + updated_ds = xr.Dataset( + {"value": (("time", "lat", "lon"), (np.arange(24, dtype=float) + 24).reshape(24, 1, 1))}, + coords={"time": time, "lat": [2.0], "lon": [1.0]}, + ) + updated_ds.to_zarr(source_path, mode="w", consolidated=True) + + second = resample.materialize_resampled_artifact( + target_dataset=target_dataset, + start="2026-01-01", + end="2026-01-01", + extent_id="sle", + bbox=None, + overwrite=True, + publish=False, + ) + + assert second.artifact_id == first.artifact_id + result = xr.open_zarr(second.path, consolidated=True) + try: + assert result["value"].values[:, 0, 0].tolist() == [35.5] + finally: + result.close() diff --git a/tests/test_processing_routes.py b/tests/test_processing_routes.py new file mode 100644 index 00000000..5beeefda --- /dev/null +++ b/tests/test_processing_routes.py @@ -0,0 +1,120 @@ +from datetime import UTC, datetime +from typing import Any + +import pytest +from fastapi.testclient import TestClient + +from climate_api.ingestions.schemas import ( + ArtifactCoverage, + CoverageSpatial, + CoverageTemporal, + DatasetPublication, + DatasetRecord, + PublicationStatus, +) +from climate_api.processing import routes as processing_routes +from climate_api.processing import services as processing_services + + +def _target_dataset() -> dict[str, object]: + return { + "id": "chirps3_precipitation_weekly", + "name": "CHIRPS weekly precipitation", + "variable": "value", + "period_type": "weekly", + "sync_kind": "derived", + "resample": { + "source_dataset_id": "chirps3_precipitation_daily", + "method": "sum", + "week_start": "monday", + }, + } + + +def _dataset_record(dataset_id: str) -> DatasetRecord: + return DatasetRecord( + dataset_id=dataset_id, + source_dataset_id="chirps3_precipitation_weekly", + dataset_name="CHIRPS weekly precipitation", + short_name="CHIRPS weekly", + variable="value", + period_type="weekly", + units="mm", + resolution="5 km x 5 km", + source="CHIRPS v3", + source_url="https://example.com/chirps", + extent=ArtifactCoverage( + temporal=CoverageTemporal(start="2026-W02", end="2026-W03"), + spatial=CoverageSpatial(xmin=1.0, ymin=2.0, xmax=3.0, ymax=4.0), + ), + last_updated=datetime(2026, 1, 21, tzinfo=UTC), + links=[], + publication=DatasetPublication( + status=PublicationStatus.PUBLISHED, + published_at=datetime(2026, 1, 21, tzinfo=UTC), + ), + ) + + +def test_post_resample_returns_completed_response( + client: TestClient, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr(processing_routes, "_get_dataset_or_404", lambda _: _target_dataset()) + monkeypatch.setattr( + processing_services, + "run_resample_process", + lambda **kwargs: ("artifact-123", _dataset_record("chirps3_precipitation_weekly_sle")), + ) + + response = client.post( + "/processes/resample", + json={ + "dataset_id": "chirps3_precipitation_weekly", + "start": "2026-W02", + "end": "2026-W03", + "extent_id": "sle", + "overwrite": False, + "publish": True, + }, + ) + + assert response.status_code == 200 + payload = response.json() + assert payload["artifact_id"] == "artifact-123" + assert payload["status"] == "completed" + assert payload["dataset"]["dataset_id"] == "chirps3_precipitation_weekly_sle" + + +def test_post_resample_passes_target_dataset_and_extent_scope( + client: TestClient, + monkeypatch: pytest.MonkeyPatch, +) -> None: + captured: dict[str, Any] = {} + monkeypatch.setattr(processing_routes, "_get_dataset_or_404", lambda _: _target_dataset()) + + def fake_run_resample_process(**kwargs: object) -> tuple[str, DatasetRecord]: + captured.update(kwargs) + return "artifact-456", _dataset_record("chirps3_precipitation_weekly_sle") + + monkeypatch.setattr(processing_services, "run_resample_process", fake_run_resample_process) + + response = client.post( + "/processes/resample", + json={ + "dataset_id": "chirps3_precipitation_weekly", + "start": "2026-W02", + "end": "2026-W03", + "extent_id": "sle", + "overwrite": True, + "publish": False, + }, + ) + + assert response.status_code == 200 + assert captured["dataset"]["id"] == "chirps3_precipitation_weekly" + assert captured["start"] == "2026-W02" + assert captured["end"] == "2026-W03" + assert captured["extent_id"] == "sle" + assert captured["overwrite"] is True + assert captured["publish"] is False diff --git a/tests/test_shared_time.py b/tests/test_shared_time.py index a2d90cb4..3da9c759 100644 --- a/tests/test_shared_time.py +++ b/tests/test_shared_time.py @@ -20,6 +20,14 @@ def test_normalize_period_string_converts_aware_daily_datetime_to_utc_period() - assert normalize_period_string("2026-04-21T00:30:00+02:00", "daily") == "2026-04-20" +def test_normalize_period_string_accepts_dataset_native_weekly_period() -> None: + assert normalize_period_string("2026-W17", "weekly") == "2026-W17" + + +def test_normalize_period_string_converts_datetime_to_weekly_period() -> None: + assert normalize_period_string("2026-04-21T13:30:00+00:00", "weekly") == "2026-W17" + + def test_datetime_to_period_string_converts_aware_monthly_datetime_to_utc_period() -> None: from datetime import datetime @@ -28,12 +36,18 @@ def test_datetime_to_period_string_converts_aware_monthly_datetime_to_utc_period assert datetime_to_period_string(value, "monthly") == "2026-04" -def test_normalize_period_string_rejects_unsupported_period_type() -> None: - with pytest.raises(ValueError, match="Unsupported period_type 'weekly'"): - normalize_period_string("2026-W17", "weekly") +def test_normalize_period_string_rejects_invalid_weekly_period() -> None: + with pytest.raises(ValueError, match="Invalid weekly period '2026-W54'; expected YYYY-Www or ISO datetime"): + normalize_period_string("2026-W54", "weekly") def test_parse_period_string_to_datetime_accepts_dataset_native_hourly_period() -> None: parsed = parse_period_string_to_datetime("2026-04-21T13") assert parsed.isoformat() == "2026-04-21T13:00:00+00:00" + + +def test_parse_period_string_to_datetime_accepts_dataset_native_weekly_period() -> None: + parsed = parse_period_string_to_datetime("2026-W17") + + assert parsed.isoformat() == "2026-04-20T00:00:00+00:00"