From 6587c1deaa13af152a66722e66341706733e05ac Mon Sep 17 00:00:00 2001 From: Ollie Tooth Date: Fri, 15 May 2026 15:37:46 +0100 Subject: [PATCH 1/3] Update icechunk repo config defaults in ObjectStoreS3 for JASMIN OS. --- OceanDataStore/object_store.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/OceanDataStore/object_store.py b/OceanDataStore/object_store.py index b60546d9..8920b0b8 100644 --- a/OceanDataStore/object_store.py +++ b/OceanDataStore/object_store.py @@ -208,9 +208,9 @@ def get_mapper(self, def create_icechunk_repo(self, bucket: str, prefix: str, - storage_config_kwargs: dict = {}, + storage_config_kwargs: dict = {'region': 'us-east-1', 'force_path_style': True}, repository_config_kwargs: dict = {}, - storage_settings_kwargs: dict = {}, + storage_settings_kwargs: dict = {'unsafe_use_conditional_update': False, 'unsafe_use_conditional_create': False}, ) -> icechunk.Repository: """ Create a new Icechunk repository in cloud object storage. @@ -266,7 +266,7 @@ def create_icechunk_repo(self, def open_icechunk_repo(self, bucket: str, prefix: str, - storage_config_kwargs: dict = {'region': '', 'force_path_style': True}, + storage_config_kwargs: dict = {'region': 'us-east-1', 'force_path_style': True}, repository_config_kwargs: dict = {}, storage_settings_kwargs: dict = {'unsafe_use_conditional_update': False, 'unsafe_use_conditional_create': False}, ) -> icechunk.Repository: From 3adaa834dbd11843484ff0ab8552053bcd6c97f3 Mon Sep 17 00:00:00 2001 From: Ollie Tooth Date: Fri, 15 May 2026 15:40:20 +0100 Subject: [PATCH 2/3] Added `exists` and `groups` kwargs to send & update functions to support modifying existing Icechunk / Zarr stores and creating hierarchical stores. Improved type hint consistency. --- OceanDataStore/object_store_handler.py | 459 +++++++++++++++---------- 1 file changed, 281 insertions(+), 178 deletions(-) diff --git a/OceanDataStore/object_store_handler.py b/OceanDataStore/object_store_handler.py index 171b349d..8bb188dc 100644 --- a/OceanDataStore/object_store_handler.py +++ b/OceanDataStore/object_store_handler.py @@ -125,9 +125,10 @@ async def _check_zarr_store(obj_store: ObjectStoreS3, async def _check_zarr_compatibility(data: xr.DataArray | xr.Dataset, obj_store: ObjectStoreS3, dest: str, - append_dim: str = "time_counter", + group: Optional[str] = None, + append_dim: Optional[str] = "time_counter", rechunk: Optional[dict] = None, - version: int = 3, + version: Optional[int] = 3, ) -> None: """ Check compatibility of DataArray or Dataset to update existing @@ -141,11 +142,13 @@ async def _check_zarr_compatibility(data: xr.DataArray | xr.Dataset, ObjectStoreS3 remote filesystem. dest: str Destination path in the object store. - append_dim: bool, default="time_counter" + group: Optional[str], default=None + Group in Zarr store to update. + append_dim: Optional[str], default="time_counter" Dimension to append data to existing Zarr store. rechunk: Optional[dict], default=None Mapping to rechunk dimensions. - version: int, default=3 + version: Optional[int], default=3 Zarr version to use. """ # === Initialise store using fsspec === # @@ -158,7 +161,7 @@ async def _check_zarr_compatibility(data: xr.DataArray | xr.Dataset, # 2. Check Zarr store compatibility: try: - ds_store = xr.open_zarr(store, zarr_format=version) + ds_store = xr.open_zarr(store, group=group, zarr_format=version) except Exception as e: await _close_session(obj_store=obj_store) raise FileNotFoundError(f"zarr version {version} is not compatible with the store: {e}") @@ -194,6 +197,7 @@ def _check_icechunk_compatibility(data: xr.DataArray | xr.Dataset, branch: str, append_dim: str, rechunk: dict, + group: Optional[str] = None, ) -> None: """ Check compatibility of DataArray or Dataset to update existing @@ -213,13 +217,15 @@ def _check_icechunk_compatibility(data: xr.DataArray | xr.Dataset, Dimension to append data to existing IcechunkStore. rechunk: dict Mapping to rechunk dimensions. + group: Optional[str], default=None + Group in IcechunkStore to update. """ # === Initialise IcechunkStore from session === # store = repo.readonly_session(branch=branch).store # 1. Check if IcechunkStore exists: try: - ds_store = xr.open_zarr(store, consolidated=False) + ds_store = xr.open_zarr(store, group=group, consolidated=False) except Exception as e: raise FileNotFoundError(f"IcechunkStore not found in repository: {e}") @@ -250,7 +256,7 @@ async def _close_session(obj_store: ObjectStoreS3) -> None: Parameters ---------- - obj_store + obj_store: ObjectStoreS3 ObjectStoreS3 remote filesystem. """ if hasattr(obj_store, '_s3creator'): @@ -260,7 +266,8 @@ async def _close_session(obj_store: ObjectStoreS3) -> None: async def _write_to_zarr(data: xr.DataArray | xr.Dataset, obj_store: ObjectStoreS3, dest: str, - version: int = 3, + group: Optional[str] = None, + version: Optional[int] = 3, ) -> None: """ Write DataArray or Dataset to Zarr store in cloud @@ -274,7 +281,9 @@ async def _write_to_zarr(data: xr.DataArray | xr.Dataset, ObjectStoreS3 remote filesystem. dest: str Path to Zarr store in the object store. - version: int, default=3 + group: Optional[str], default=None + Group in Zarr store to write data to. + version: Optional[int], default=3 Zarr version to use. """ # === Verify Inputs === # @@ -284,6 +293,9 @@ async def _write_to_zarr(data: xr.DataArray | xr.Dataset, raise TypeError("obj_store must be an ObjectStoreS3 instance.") if not isinstance(dest, str): raise TypeError("dest must be a string.") + if group is not None: + if not isinstance(group, str): + raise TypeError("group must be a string.") if not isinstance(version, int): raise TypeError("version must be an integer.") @@ -299,14 +311,14 @@ async def _write_to_zarr(data: xr.DataArray | xr.Dataset, # Write Dataset to Zarr store in Object Store: if await _check_zarr_store(obj_store=obj_store, path=dest): - logging.info(f"Skipping Variable: Store already exists at {dest}") + logging.info(f"Skipping Dataset: Store already exists at {dest}") else: with timer(action='send', dest=dest, var=var): # Catch consolidated metadata warnings: with warnings.catch_warnings(): warnings.simplefilter(action="ignore", category=UserWarning) - data.to_zarr(store=store, mode="w", zarr_format=version) + data.to_zarr(store=store, mode="w", group=group, zarr_format=version) def _write_to_icechunk(data: xr.DataArray | xr.Dataset, @@ -314,6 +326,7 @@ def _write_to_icechunk(data: xr.DataArray | xr.Dataset, repo: icechunk.Repository, commit_message: str, branch: Optional[str] = "main", + group: Optional[str] = None, ) -> None: """ Write DataArray or Dataset to IcechunkStore in cloud @@ -330,8 +343,10 @@ def _write_to_icechunk(data: xr.DataArray | xr.Dataset, IcechunkStore. commit_message: str Commit message when updating the Icechunk repository. - branch: str, default="main" + branch: Optional[str], default="main" Branch on which to write data to IcechunkStore. + group: Optional[str], default=None + Group in IcechunkStore to write data to. """ # === Convert DataArrays to Datasets === # if isinstance(data, xr.DataArray): @@ -343,13 +358,14 @@ def _write_to_icechunk(data: xr.DataArray | xr.Dataset, # === Write Data to IcechunkStore & Commit === # with timer(action='send', dest=dest, var=var): session = repo.writable_session(branch=branch) - icechunk_xr.to_icechunk(data, session, mode='a') + icechunk_xr.to_icechunk(data, session=session, group=group, mode='a') session.commit(message=commit_message) async def _append_to_zarr(data: xr.DataArray | xr.Dataset, obj_store: ObjectStoreS3, dest: str, + group: Optional[str] = None, append_dim: str = "time_counter", version: int = 3, ) -> None: @@ -365,9 +381,11 @@ async def _append_to_zarr(data: xr.DataArray | xr.Dataset, ObjectStoreS3 remote filesystem. dest: str Path to Zarr store in the object store. - append_dim: str, default="time_counter" + group: Optional[str], default=None + Group in Zarr store to append data to. + append_dim: Optional[str], default="time_counter" Dimension to append data to existing Zarr store. - version: int, default=3 + version: Optional[int], default=3 Zarr version to use. """ # === Initialise store using fsspec === # @@ -377,7 +395,7 @@ async def _append_to_zarr(data: xr.DataArray | xr.Dataset, # Catch consolidated metadata warnings: with warnings.catch_warnings(): warnings.simplefilter(action="ignore", category=UserWarning) - data.to_zarr(store=store, append_dim=append_dim, zarr_format=version) + data.to_zarr(store=store, append_dim=append_dim, group=group, zarr_format=version) def _append_to_icechunk(data: xr.DataArray | xr.Dataset, @@ -385,7 +403,8 @@ def _append_to_icechunk(data: xr.DataArray | xr.Dataset, repo: icechunk.Repository, commit_message: str, branch: Optional[str] = "main", - append_dim: str = "time_counter", + group: Optional[str] = None, + append_dim: Optional[str] = "time_counter", ) -> None: """ Append DataArray or Dataset to existing IcechunkStore in @@ -402,9 +421,11 @@ def _append_to_icechunk(data: xr.DataArray | xr.Dataset, IcechunkStore. commit_message: str Commit message when updating the Icechunk repository. - branch: str, default="main" + branch: Optional[str], default="main" Branch on which to write data to IcechunkStore. - append_dim: str, default="time_counter" + group: Optional[str], default=None + Group in IcechunkStore to append data to. + append_dim: Optional[str], default="time_counter" Dimension to append data to existing IcechunkStore. """ # === Convert DataArrays to Datasets === # @@ -414,7 +435,7 @@ def _append_to_icechunk(data: xr.DataArray | xr.Dataset, # === Append Data to IcechunkStore & Commit === # with timer(action='append', dest=dest): session = repo.writable_session(branch=branch) - icechunk_xr.to_icechunk(obj=data, session=session, append_dim=append_dim) + icechunk_xr.to_icechunk(obj=data, session=session, group=group, append_dim=append_dim) session.commit(message=commit_message) @@ -422,7 +443,8 @@ async def _replace_in_zarr(data: xr.DataArray | xr.Dataset, obj_store: ObjectStoreS3, dest: str, region: dict, - version: int = 3, + group: Optional[str] = None, + version: Optional[int] = 3, ) -> None: """ Append DataArray or Dataset to existing Zarr store in @@ -438,7 +460,9 @@ async def _replace_in_zarr(data: xr.DataArray | xr.Dataset, Path to Zarr store in the object store. region: dict Region of existing Zarr store to replace data. - version: int, default=3 + group: Optional[str], default=None + Group in Zarr store to replace data in. + version: Optional[int], default=3 Zarr version to use. """ # === Initialise store using fsspec === # @@ -453,7 +477,7 @@ async def _replace_in_zarr(data: xr.DataArray | xr.Dataset, # Catch consolidated metadata warnings: with warnings.catch_warnings(): warnings.simplefilter(action="ignore", category=UserWarning) - data.to_zarr(store=store, region=region, zarr_format=version) + data.to_zarr(store=store, region=region, group=group, zarr_format=version) def _replace_in_icechunk(data: xr.DataArray | xr.Dataset, @@ -462,6 +486,7 @@ def _replace_in_icechunk(data: xr.DataArray | xr.Dataset, repo: icechunk.Repository, commit_message: str, branch: Optional[str] = "main", + group: Optional[str] = None, ) -> None: """ Replace data in existing IcechunkStore in cloud object storage. @@ -478,8 +503,10 @@ def _replace_in_icechunk(data: xr.DataArray | xr.Dataset, Icechunk repository in which to replace data in IcechunkStore. commit_message: str Commit message when updating the Icechunk repository. - branch: str, default="main" + branch: Optional[str], default="main" Branch on which to write data to IcechunkStore. + group: Optional[str], default=None + Group in IcechunkStore to replace data in. """ # === Convert DataArrays to Datasets === # if isinstance(data, xr.DataArray): @@ -493,16 +520,17 @@ def _replace_in_icechunk(data: xr.DataArray | xr.Dataset, # === Write Data to IcechunkStore & Commit === # with timer(action='replace', dest=dest): session = repo.writable_session(branch=branch) - icechunk_xr.to_icechunk(obj=data, session=session, region=region) + icechunk_xr.to_icechunk(obj=data, session=session, region=region, group=group) session.commit(message=commit_message) async def _update_zarr_store(data: xr.DataArray | xr.Dataset, obj_store: ObjectStoreS3, dest: str, - append_dim: str = "time_counter", + group: Optional[str] = None, + append_dim: Optional[str] = "time_counter", rechunk: Optional[dict] = None, - version: int = 3, + version: Optional[int] = 3, ) -> None: """ Update an existing Zarr store in object storage by replacing @@ -516,11 +544,13 @@ async def _update_zarr_store(data: xr.DataArray | xr.Dataset, ObjectStoreS3 remote filesystem. dest: str Path to Zarr store in the object store. - append_dim: bool, default="time_counter" + group: Optional[str], default=None + Group in Zarr store to update. + append_dim: Optional[str], default="time_counter" Dimension to append data to existing Zarr store. rechunk: Optional[dict], default=None Mapping to rechunk dimensions. - version: int, default=3 + version: Optional[int], default=3 Zarr version to use. """ # === Verify Inputs === # @@ -530,6 +560,9 @@ async def _update_zarr_store(data: xr.DataArray | xr.Dataset, raise TypeError("obj_store must be an ObjectStoreS3 instance.") if not isinstance(dest, str): raise TypeError("dest must be a string.") + if group is not None: + if not isinstance(group, str): + raise TypeError("group must be a string.") if not isinstance(append_dim, str): raise TypeError("append_dim must be a string.") if rechunk is not None: @@ -553,6 +586,7 @@ async def _update_zarr_store(data: xr.DataArray | xr.Dataset, await _check_zarr_compatibility(data=ds_source, obj_store=obj_store, dest=dest, + group=group, append_dim=append_dim, rechunk=rechunk, version=version @@ -561,7 +595,7 @@ async def _update_zarr_store(data: xr.DataArray | xr.Dataset, # === Update existing variable in Zarr Store === # # Extract source & target append dimension values: - ds_target = xr.open_zarr(store, zarr_format=version) + ds_target = xr.open_zarr(store, group=group, zarr_format=version) if (var in ds_target.data_vars) or (var is None): @@ -594,6 +628,7 @@ async def _update_zarr_store(data: xr.DataArray | xr.Dataset, obj_store=obj_store, dest=dest, region={append_dim : slice(target_ind_min, target_ind_max)}, + group=group, version=version, ) @@ -603,6 +638,7 @@ async def _update_zarr_store(data: xr.DataArray | xr.Dataset, await _append_to_zarr(data=ds_source.isel({append_dim : slice(source_ind_max, source_ind_size)}), obj_store=obj_store, dest=dest, + group=group, append_dim=append_dim, version=version, ) @@ -612,6 +648,7 @@ async def _update_zarr_store(data: xr.DataArray | xr.Dataset, await _append_to_zarr(data=ds_source, obj_store=obj_store, dest=dest, + group=group, append_dim=append_dim, version=version, ) @@ -621,6 +658,7 @@ async def _update_zarr_store(data: xr.DataArray | xr.Dataset, await _write_to_zarr(data=ds_source, obj_store=obj_store, dest=dest, + group=group, version=version, ) @@ -632,7 +670,8 @@ def _update_icechunk_store(data: xr.DataArray | xr.Dataset, repo: icechunk.Repository, commit_message: str, branch: Optional[str] = "main", - append_dim: str = "time_counter", + group: Optional[str] = None, + append_dim: Optional[str] = "time_counter", rechunk: Optional[dict] = None, ) -> None: """ @@ -652,7 +691,9 @@ def _update_icechunk_store(data: xr.DataArray | xr.Dataset, Commit message when updating the Icechunk repository. branch: str, default="main" Branch on which to write data to IcechunkStore. - append_dim: bool, default="time_counter" + group: Optional[str], default=None + Group in IcechunkStore to update. + append_dim: Optional[str], default="time_counter" Dimension to append data to existing IcechunkStore. rechunk: Optional[dict], default=None Mapping to rechunk dimensions. @@ -667,7 +708,7 @@ def _update_icechunk_store(data: xr.DataArray | xr.Dataset, # Extract source & target append dimension values: store = repo.readonly_session(branch=branch).store - ds_target = xr.open_zarr(store, consolidated=False) + ds_target = xr.open_zarr(store, group=group, consolidated=False) target_append_dim = ds_target[append_dim].values source_append_dim = ds_source[append_dim].values @@ -680,6 +721,7 @@ def _update_icechunk_store(data: xr.DataArray | xr.Dataset, branch=branch, append_dim=append_dim, rechunk=rechunk, + group=group ) logging.info(f"Passed Compatibility Checks for IcechunkStore {dest}") @@ -713,7 +755,8 @@ def _update_icechunk_store(data: xr.DataArray | xr.Dataset, dest=dest, region={append_dim : slice(target_ind_min, target_ind_max)}, commit_message=rep_commit_message, - branch=branch + branch=branch, + group=group ) # 2. Append new values to target IcechunkStore: @@ -729,6 +772,7 @@ def _update_icechunk_store(data: xr.DataArray | xr.Dataset, dest=dest, commit_message=app_commit_message, branch=branch, + group=group, append_dim=append_dim ) else: @@ -744,6 +788,7 @@ def _update_icechunk_store(data: xr.DataArray | xr.Dataset, dest=dest, commit_message=app_commit_message, branch=branch, + group=group, append_dim=append_dim ) else: @@ -755,12 +800,13 @@ def _update_icechunk_store(data: xr.DataArray | xr.Dataset, repo=repo, commit_message=snd_commit_message, branch=branch, + group=group ) def _preprocess_dataset(file: list[str] | str | xr.Dataset, rechunk: Optional[dict] = None, - append_dim: str = "time_counter", + append_dim: Optional[str] = "time_counter", update_coords: Optional[dict] = None, grid_filepath: Optional[str] = None, attrs: Optional[dict] = None, @@ -906,13 +952,14 @@ async def _send_to_zarr( object_prefix: str, store_credentials_json: str, client : Optional[Client] = None, + group: Optional[str] = None, variables: Optional[list[str]] = None, - append_dim: str = "time_counter", + append_dim: Optional[str] = "time_counter", grid_filepath: Optional[str] = None, update_coords: Optional[dict] = None, rechunk: Optional[dict] = None, attrs: Optional[dict] = None, - parallel: bool = False, + parallel: Optional[bool] = False, zarr_version: int = 3 ) -> None: """ @@ -930,26 +977,28 @@ async def _send_to_zarr( Prefix to be added to the object names in the object store. store_credentials_json: str Path to the JSON file containing the object store credentials. - client: dask.distributed.Client, optional + client: Optional[dask.distributed.Client], default=None Dask Client object. If provided, the object store session will be closed on all workers when complete. - variables: list[str], optional + group: Optional[str], default=None + Group in Zarr store to write data to. + variables: Optional[list[str]], default=None List of variables to send to Zarr stores. If None, all variables will be sent. - append_dim: str, default='time_counter' + append_dim: Optional[str], default='time_counter' Name of the dimension to append multifile datasets. - grid_filepath: str, optional + grid_filepath: Optional[str], default=None Path to file containing model grid parameter. - update_coords: dict, optional + update_coords: Optional[dict], default=None Dictionary of coordinate variables to update. - rechunk: dict, optional + rechunk: Optional[dict], default=None Rechunk strategy dictionary. - attrs: dict, optional + attrs: Optional[dict], default=None Attributes to add to the dataset. - parallel: bool, default=False, + parallel: Optional[bool], default=False, Whether to perform open and preprocess steps in parallel using `dask.delayed`. - zarr_version: int, default=3 + zarr_version: Optional[int], default=3 Zarr version to use. """ # === Initialise Asynchronous Object Store === # @@ -978,6 +1027,7 @@ async def _send_to_zarr( await _write_to_zarr(data=ds_filepath[variables], obj_store=obj_store, dest=dest, + group=group, version=zarr_version ) @@ -997,15 +1047,16 @@ async def send_to_zarr( bucket: str, object_prefix: str, store_credentials_json: str, + group: Optional[str] = None, variables: Optional[list[str]] = None, - append_dim: str = "time_counter", + append_dim: Optional[str] = "time_counter", grid_filepath: Optional[str] = None, update_coords: Optional[dict] = None, rechunk: Optional[dict] = None, attrs: Optional[dict] = None, dask_config_kwargs: Optional[dict] = None, dask_cluster_kwargs: Optional[dict] = None, - zarr_version: int = 3 + zarr_version: Optional[int] = 3 ) -> None: """ Write data to new Zarr store in cloud object storage with @@ -1023,23 +1074,25 @@ async def send_to_zarr( Prefix to be added to the object names in the object store. store_credentials_json: str Path to the JSON file containing the object store credentials. - variables: list[str], optional + group: Optional[str], default=None + Group in Zarr store to write data to. + variables: Optional[list[str]], default=None List of variables to send. If None, all variables will be sent. - append_dim: str, default="time_counter" + append_dim: Optional[str], default="time_counter" Name of the append dimension, by default "time_counter". - grid_filepath: str, optional + grid_filepath: Optional[str], default=None Path to file containing model grid parameter. - update_coords: dict, optional + update_coords: Optional[dict], default=None Dictionary of coordinate variables to update. - rechunk: dict, optional + rechunk: Optional[dict], default=None Rechunk strategy dictionary, by default None. - attrs: dict, optional + attrs: Optional[dict], default=None Attributes to add to the dataset. - dask_config_kwargs: Dict[str,str], optional + dask_config_kwargs: Optional[dict], default=None Dask configuration settings passed to dask.config.set(). - dask_cluster_kwargs: dict, optional + dask_cluster_kwargs: Optional[dict], default=None Dask cluster configuration settings passed to LocalCluster(). - zarr_version: int, default=3 + zarr_version: Optional[int], default=3 Zarr version to use. """ if dask_cluster_kwargs is not None: @@ -1060,6 +1113,7 @@ async def send_to_zarr( object_prefix=object_prefix, store_credentials_json=store_credentials_json, client=client, + group=group, variables=variables, append_dim=append_dim, grid_filepath=grid_filepath, @@ -1082,6 +1136,7 @@ async def send_to_zarr( object_prefix=object_prefix, store_credentials_json=store_credentials_json, client=None, + group=group, variables=variables, append_dim=append_dim, grid_filepath=grid_filepath, @@ -1098,16 +1153,18 @@ def _send_to_icechunk( bucket: str, object_prefix: str, store_credentials_json: str, + exists: Optional[bool] = False, + group: Optional[str] = None, variables: Optional[list[str]] = None, append_dim: Optional[str] = 'time_counter', grid_filepath: Optional[str] = None, update_coords: Optional[dict] = None, rechunk: Optional[dict] = None, attrs: Optional[dict] = None, - parallel: bool = False, - branch: str = "main", - commit_message: str = "Add new data to my Icechunk repository", - variable_commits: bool = False, + parallel: Optional[bool] = False, + branch: Optional[str] = "main", + commit_message: Optional[str] = "Add new data to my Icechunk repository", + variable_commits: Optional[bool] = False, icechunk_config: Optional[dict] = None, ) -> None: """ @@ -1125,29 +1182,33 @@ def _send_to_icechunk( Prefix to be added to the object names in the object store. store_credentials_json: str Path to the JSON file containing the object store credentials. - variables: list[str], optional + exists: Optional[bool], default=False + Whether to write to an existing Icechunk repository or create a new repository. + group: Optional[str], default=None + Group in Icechunk repository to write data to. + variables: Optional[list[str]], default=None List of variables to send. If None, all variables will be sent. - append_dim: str, default='time_counter' + append_dim: Optional[str], default='time_counter' Name of the dimension to append multifile datasets. - grid_filepath: str, optional + grid_filepath: Optional[str], default=None Path to file containing model grid parameter. - update_coords: dict, optional + update_coords: Optional[dict], default=None Dictionary of coordinate variables to update. - rechunk: dict, optional + rechunk: Optional[dict], default=None Rechunk strategy dictionary, by default None. - attrs: dict, optional + attrs: Optional[dict], default=None Attributes to add to the dataset. - parallel: bool, default=False + parallel: Optional[bool], default=False Whether to perform open and preprocess steps in parallel using `dask.delayed`. - branch: str, default="main" + branch: Optional[str], default="main" Branch on which to write data to IcechunkStore. - commit_message: str, default="Initial commit" + commit_message: Optional[str], default="Initial commit" Commit message when updating the Icechunk repository. - variable_commits: bool, default=False + variable_commits: Optional[bool], default=False Whether to write each variable to Icechunk repository using separate commits. - icechunk_config: dict, optional + icechunk_config: Optional[dict], default=None Icechunk repository configuration. """ # === Initialise Synchronous Object Store === # @@ -1158,9 +1219,9 @@ def _send_to_icechunk( ) if icechunk_config is None: - icechunk_config = {"storage_config_kwargs":{}, - "repository_config_kwargs":{}, - "storage_settings_kwargs":{} + icechunk_config = {"storage_config_kwargs": {'region': 'us-east-1', 'force_path_style': True}, + "repository_config_kwargs": {}, + "storage_settings_kwargs": {'unsafe_use_conditional_update': False, 'unsafe_use_conditional_create': False}, } # === Preprocess Data === # @@ -1182,48 +1243,62 @@ def _send_to_icechunk( source_append_dim = ds_filepath[append_dim].values # === Send Variables to Icechunk Repo === # - try: - # Create new Icechunk repo: - repo = obj_store.create_icechunk_repo(bucket=bucket, - prefix=object_prefix, - storage_config_kwargs=icechunk_config["storage_config_kwargs"], - repository_config_kwargs=icechunk_config["repository_config_kwargs"], - storage_settings_kwargs=icechunk_config["storage_settings_kwargs"], - ) - if variable_commits: - for var in variables: - logging.info(f"Sending Variable: {var}") - if append_dim in ds_filepath[var].dims: - snd_commit_message = f"{commit_message} -> Sent {var} along {append_dim} from {source_append_dim[0]} to {source_append_dim[-1]}." - else: - snd_commit_message = f"{commit_message} -> Sent {var}." - - # Write each variable using separate commits to the repo: - _write_to_icechunk(data=ds_filepath[var], - dest=f"{bucket}/{object_prefix}", - repo=repo, - commit_message=snd_commit_message, - branch=branch, - ) - else: - # Write all variables using single commit to the repo: - logging.info(f"Sending Dataset: {object_prefix}") - if append_dim in ds_filepath.dims: - snd_commit_message = f"{commit_message} -> Sent dataset along {append_dim} from {source_append_dim[0]} to {source_append_dim[-1]}." + if exists: + # Open existing Icechunk repo: + try: + repo = obj_store.open_icechunk_repo(bucket=bucket, + prefix=object_prefix, + storage_config_kwargs=icechunk_config["storage_config_kwargs"], + repository_config_kwargs=icechunk_config["repository_config_kwargs"], + storage_settings_kwargs=icechunk_config["storage_settings_kwargs"], + ) + except icechunk.IcechunkError: + logging.info(f"Failed to open existing Icechunk repository at {bucket}/{object_prefix}") + + else: + try: + # Create new Icechunk repo: + repo = obj_store.create_icechunk_repo(bucket=bucket, + prefix=object_prefix, + storage_config_kwargs=icechunk_config["storage_config_kwargs"], + repository_config_kwargs=icechunk_config["repository_config_kwargs"], + storage_settings_kwargs=icechunk_config["storage_settings_kwargs"], + ) + except icechunk.IcechunkError: + logging.info(f"Failed to create new Icechunk repository at {bucket}/{object_prefix}") + + # Write data to Icechunk repository: + if variable_commits: + for var in variables: + logging.info(f"Sending Variable: {var}") + if append_dim in ds_filepath[var].dims: + snd_commit_message = f"{commit_message} -> Sent {var} along {append_dim} from {source_append_dim[0]} to {source_append_dim[-1]}." else: - snd_commit_message = f"{commit_message} -> Sent dataset." + snd_commit_message = f"{commit_message} -> Sent {var}." - _write_to_icechunk(data=ds_filepath[variables], - dest=f"{bucket}/{object_prefix}", - repo=repo, - commit_message=snd_commit_message, - branch=branch, - ) - except icechunk.IcechunkError: - if variable_commits: - logging.info(f"Skipping Dataset: Icechunk repository already exists at {bucket}/{object_prefix}/{var}") + # Write each variable using separate commits to the repo: + _write_to_icechunk(data=ds_filepath[var], + dest=f"{bucket}/{object_prefix}", + repo=repo, + commit_message=snd_commit_message, + branch=branch, + group=group + ) + else: + # Write all variables using single commit to the repo: + logging.info(f"Sending Dataset: {object_prefix}") + if append_dim in ds_filepath.dims: + snd_commit_message = f"{commit_message} -> Sent dataset along {append_dim} from {source_append_dim[0]} to {source_append_dim[-1]}." else: - logging.info(f"Skipping Dataset: Icechunk repository already exists at {bucket}/{object_prefix}") + snd_commit_message = f"{commit_message} -> Sent dataset." + + _write_to_icechunk(data=ds_filepath[variables], + dest=f"{bucket}/{object_prefix}", + repo=repo, + commit_message=snd_commit_message, + branch=branch, + group=group + ) # Release resources to avoid memory leaks: ds_filepath.close() @@ -1234,15 +1309,17 @@ def send_to_icechunk( bucket: str, object_prefix: str, store_credentials_json: str, + exists: Optional[bool] = False, + group: Optional[str] = None, variables: Optional[list[str]] = None, append_dim: Optional[str] = 'time_counter', grid_filepath: Optional[str] = None, update_coords: Optional[dict] = None, rechunk: Optional[dict] = None, attrs: Optional[dict] = None, - branch: str = "main", - commit_message: str = "Add new data to my Icechunk repository", - variable_commits: bool = False, + branch: Optional[str] = "main", + commit_message: Optional[str] = "Add new data to my Icechunk repository", + variable_commits: Optional[bool] = False, dask_config_kwargs: Optional[dict] = None, dask_cluster_kwargs: Optional[dict] = None, icechunk_config: Optional[dict] = None, @@ -1263,30 +1340,34 @@ def send_to_icechunk( Prefix to be added to the object names in the object store. store_credentials_json: str Path to the JSON file containing the object store credentials. - variables: list[str], optional + exists: Optional[bool], default=False + Whether to write to an existing Icechunk repository or create a new repository. + group: Optional[str], default=None + Group in Icechunk repository to write data to. + variables: Optional[list[str]], default=None List of variables to send. If None, all variables will be sent. - append_dim: str, default='time_counter' + append_dim: Optional[str], default='time_counter' Name of the dimension to append multifile datasets. - grid_filepath: str, optional + grid_filepath: Optional[str], default=None Path to file containing model grid parameter. - update_coords: dict, optional + update_coords: Optional[dict], default=None Dictionary of coordinate variables to update. - rechunk: dict, optional + rechunk: Optional[dict], default=None Rechunk strategy dictionary, by default None. - attrs: dict, optional + attrs: Optional[dict], default=None Attributes to add to the dataset. - branch: str, default="main" + branch: Optional[str], default="main" Branch on which to write data to IcechunkStore. - commit_message: str, default="Initial commit" + commit_message: Optional[str], default="Initial commit" Commit message when updating the Icechunk repository. - variable_commits: bool, default=False + variable_commits: Optional[bool], default=False Whether to write each variable to Icechunk repository using separate commits. - dask_config_kwargs: dict, optional + dask_config_kwargs: Optional[dict], default=None Dask configuration settings passed to dask.config.set(). - dask_cluster_kwargs: dict, optional + dask_cluster_kwargs: Optional[dict], default=None Dask cluster configuration settings passed to LocalCluster(). - icechunk_config: dict, optional + icechunk_config: Optional[dict], default=None Icechunk repository configuration. """ if dask_cluster_kwargs is not None: @@ -1306,6 +1387,8 @@ def send_to_icechunk( bucket=bucket, object_prefix=object_prefix, store_credentials_json=store_credentials_json, + exists=exists, + group=group, variables=variables, append_dim=append_dim, grid_filepath=grid_filepath, @@ -1330,6 +1413,8 @@ def send_to_icechunk( bucket=bucket, object_prefix=object_prefix, store_credentials_json=store_credentials_json, + exists=exists, + group=group, variables=variables, append_dim=append_dim, grid_filepath=grid_filepath, @@ -1350,14 +1435,15 @@ async def _update_zarr( object_prefix: str, store_credentials_json: str, client : Optional[Client] = None, + group: Optional[str] = None, variables: Optional[list[str]] = None, - append_dim: str = "time_counter", + append_dim: Optional[str] = "time_counter", grid_filepath: Optional[str] = None, update_coords: Optional[dict] = None, rechunk: Optional[dict] = None, attrs: Optional[dict] = None, - parallel: bool = False, - zarr_version: int = 3 + parallel: Optional[bool] = False, + zarr_version: Optional[int] = 3 ) -> None: """ Update existing Zarr store in cloud object storage @@ -1378,23 +1464,25 @@ async def _update_zarr( client: dask.distributed.Client, optional Dask Client object. If provided, the object store session will be closed on all workers when complete. - variables: list, optional + group: Optional[str], default=None + Group in Zarr store to write data to. + variables: Optional[list[str]], default=None List of variables to send to Zarr stores. If None, all variables will be sent. - append_dim: str, default='time_counter' + append_dim: Optional[str], default='time_counter' Name of the dimension to append multifile datasets. - grid_filepath: str, optional + grid_filepath: Optional[str], default=None Path to file containing model grid parameter. - update_coords: dict, optional + update_coords: Optional[dict], default=None Dictionary of coordinate variables to update. - rechunk: dict, optional + rechunk: Optional[dict], default=None Rechunk strategy dictionary. - attrs: dict, optional + attrs: Optional[dict], default=None Attributes to add to the dataset. - parallel: bool, default=False + parallel: Optional[bool], default=False Whether to perform open and preprocess steps in parallel using `dask.delayed`. - zarr_version: int, default=3 + zarr_version: Optional[int], default=3 Zarr version to use. """ # === Initialise Asynchronous Object Store === # @@ -1426,6 +1514,7 @@ async def _update_zarr( await _update_zarr_store(data=ds_filepath[variables], obj_store=obj_store, dest=dest, + group=group, append_dim=append_dim, rechunk=rechunk, version=zarr_version @@ -1447,15 +1536,16 @@ async def update_zarr( bucket: str, object_prefix: str, store_credentials_json: str, + group: Optional[str] = None, variables: Optional[list[str]] = None, - append_dim: str = "time_counter", + append_dim: Optional[str] = "time_counter", grid_filepath: Optional[str] = None, update_coords: Optional[dict] = None, rechunk: Optional[dict] = None, attrs: Optional[dict] = None, dask_config_kwargs: Optional[dict] = None, dask_cluster_kwargs: Optional[dict] = None, - zarr_version: int = 3 + zarr_version: Optional[int] = 3 ) -> None: """ Update data in existing Zarr store in cloud object @@ -1473,24 +1563,26 @@ async def update_zarr( Prefix to be added to the object names in the object store. store_credentials_json: str Path to the JSON file containing the object store credentials. - variables: list, optional + group: Optional[str], default=None + Group in Zarr store to write data to. + variables: Optional[list[str]], default=None List of variables to send to Zarr stores. If None, all variables will be sent. - append_dim: str, default='time_counter' + append_dim: Optional[str], default='time_counter' Name of the dimension to append multifile datasets. - grid_filepath: str, optional + grid_filepath: Optional[str], default=None Path to file containing model grid parameter. - update_coords: dict, optional + update_coords: Optional[dict], default=None Dictionary of coordinate variables to update. - rechunk: dict, optional + rechunk: Optional[dict], default=None Rechunk strategy dictionary. - attrs: dict, optional + attrs: Optional[dict], default=None Attributes to add to the dataset. - dask_config_kwargs: Dict[str,str], optional + dask_config_kwargs: Optional[Dict[str,str]], default=None Dask configuration settings passed to dask.config.set(). - dask_cluster_kwargs: dict, optional + dask_cluster_kwargs: Optional[dict], default=None Dask cluster configuration settings passed to LocalCluster(). - zarr_version: int, default=3 + zarr_version: Optional[int], default=3 zarr version to use. """ if dask_cluster_kwargs is not None: @@ -1511,6 +1603,7 @@ async def update_zarr( object_prefix=object_prefix, store_credentials_json=store_credentials_json, client=client, + group=group, variables=variables, append_dim=append_dim, grid_filepath=grid_filepath, @@ -1533,6 +1626,7 @@ async def update_zarr( object_prefix=object_prefix, store_credentials_json=store_credentials_json, client=None, + group=group, variables=variables, append_dim=append_dim, grid_filepath=grid_filepath, @@ -1549,6 +1643,7 @@ def _update_icechunk( bucket: str, object_prefix: str, store_credentials_json: str, + group: Optional[str] = None, variables: Optional[list[str]] = None, append_dim: Optional[str] = 'time_counter', grid_filepath: Optional[str] = None, @@ -1576,26 +1671,28 @@ def _update_icechunk( Prefix to be added to the object names in the object store. store_credentials_json: str Path to the JSON file containing the object store credentials. - variables: list[str], optional + group: Optional[str], default=None + Group in Icechunk repository to write data to. + variables: Optional[list[str]], default=None List of variables to send. If None, all variables will be sent. - append_dim: str, default='time_counter' + append_dim: Optional[str], default='time_counter' Name of the dimension to append multifile datasets. - grid_filepath: str, optional + grid_filepath: Optional[str], default=None Path to file containing model grid parameter. - update_coords: dict, optional + update_coords: Optional[dict], default=None Dictionary of coordinate variables to update. - rechunk: dict, optional + rechunk: Optional[dict], default=None Rechunk strategy dictionary, by default None. - attrs: dict, optional + attrs: Optional[dict], default=None Attributes to add to the dataset. - parallel: bool, default=False + parallel: Optional[bool], default=False Whether to perform open and preprocess steps in parallel using `dask.delayed`. - branch: str, default="main" + branch: Optional[str], default="main" Branch on which to write data to IcechunkStore. - commit_message: str, default="Update commit" + commit_message: Optional[str], default="Update commit" Commit message when updating the Icechunk repository. - icechunk_config: dict, optional + icechunk_config: Optional[dict], default=None Icechunk repository configuration. """ # === Initialise Synchronous Object Store === # @@ -1606,9 +1703,9 @@ def _update_icechunk( ) if icechunk_config is None: - icechunk_config = {"storage_config_kwargs":{}, - "repository_config_kwargs":{}, - "storage_settings_kwargs":{} + icechunk_config = {"storage_config_kwargs": {'region': 'us-east-1', 'force_path_style': True}, + "repository_config_kwargs": {}, + "storage_settings_kwargs": {'unsafe_use_conditional_update': False, 'unsafe_use_conditional_create': False}, } # === Preprocess Data === # @@ -1643,6 +1740,7 @@ def _update_icechunk( repo=repo, commit_message=commit_message, branch=branch, + group=group, append_dim=append_dim, rechunk=rechunk, ) @@ -1659,14 +1757,15 @@ def update_icechunk( bucket: str, object_prefix: str, store_credentials_json: str, - variables: list[str] | str = 'all', + group: Optional[str] = None, + variables: Optional[list[str]] = None, append_dim: Optional[str] = 'time_counter', grid_filepath: Optional[str] = None, update_coords: Optional[dict] = None, rechunk: Optional[dict] = None, attrs: Optional[dict] = None, - branch: str = "main", - commit_message: str = "Update data in my Icechunk repository", + branch: Optional[str] = "main", + commit_message: Optional[str] = "Update data in my Icechunk repository", dask_config_kwargs: Optional[dict] = None, dask_cluster_kwargs: Optional[dict] = None, icechunk_config: Optional[dict] = None, @@ -1687,27 +1786,29 @@ def update_icechunk( Prefix to be added to the object names in the object store. store_credentials_json: str Path to the JSON file containing the object store credentials. - variables: list | str, default="all" + group: Optional[str], default=None + Group in Icechunk repository to write data to. + variables: Optional[list[str]], default=None List of variables to send. If None, all variables will be sent. - append_dim: str, default='time_counter' + append_dim: Optional[str], default='time_counter' Name of the dimension to append multifile datasets. - grid_filepath: str, optional + grid_filepath: Optional[str], default=None Path to file containing model grid parameter. - update_coords: dict, optional + update_coords: Optional[dict], default=None Dictionary of coordinate variables to update. - rechunk: dict, optional + rechunk: Optional[dict], default=None Rechunk strategy dictionary, by default None. - attrs: dict, optional + attrs: Optional[dict], default=None Attributes to add to the dataset. - branch: str, default="main" + branch: Optional[str], default="main" Branch on which to write data to IcechunkStore. - commit_message: str, default="Initial commit" + commit_message: Optional[str], default="Initial commit" Commit message when updating the Icechunk repository. - dask_config_kwargs: dict, optional + dask_config_kwargs: Optional[dict], default=None Dask configuration settings passed to dask.config.set(). - dask_cluster_kwargs: dict, optional + dask_cluster_kwargs: Optional[dict], default=None Dask cluster configuration settings passed to LocalCluster(). - icechunk_config: dict, optional + icechunk_config: Optional[dict], default=None Icechunk repository configuration. """ # === Update Icechunk repo(s) with Dask === # @@ -1727,6 +1828,7 @@ def update_icechunk( bucket=bucket, object_prefix=object_prefix, store_credentials_json=store_credentials_json, + group=group, variables=variables, append_dim=append_dim, grid_filepath=grid_filepath, @@ -1750,6 +1852,7 @@ def update_icechunk( bucket=bucket, object_prefix=object_prefix, store_credentials_json=store_credentials_json, + group=group, variables=variables, append_dim=append_dim, grid_filepath=grid_filepath, From 7ac80932116cb0d0c3d43d507f043c714d4bd904 Mon Sep 17 00:00:00 2001 From: Ollie Tooth Date: Fri, 15 May 2026 15:42:32 +0100 Subject: [PATCH 3/3] Fix all f-string formating in LocalCluster logs. --- OceanDataStore/object_store_handler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/OceanDataStore/object_store_handler.py b/OceanDataStore/object_store_handler.py index 8bb188dc..6960daff 100644 --- a/OceanDataStore/object_store_handler.py +++ b/OceanDataStore/object_store_handler.py @@ -1103,7 +1103,7 @@ async def send_to_zarr( # Create local dask cluster & client: with LocalCluster(**dask_cluster_kwargs) as cluster, Client(cluster) as client: - logging.info(f"Created LocalCluster with {dask_cluster_kwargs["n_workers"]} workers @ Client: {client.dashboard_link}") + logging.info(f"Created LocalCluster with {dask_cluster_kwargs['n_workers']} workers @ Client: {client.dashboard_link}") # Catch UserWarnings when rechunking data: client.register_worker_plugin(CaptureWarningsPlugin()) @@ -1378,7 +1378,7 @@ def send_to_icechunk( # Create local dask cluster & client: with LocalCluster(**dask_cluster_kwargs) as cluster, Client(cluster) as client: - logging.info(f"Created LocalCluster with {dask_cluster_kwargs["n_workers"]} workers @ Client: {client.dashboard_link}") + logging.info(f"Created LocalCluster with {dask_cluster_kwargs['n_workers']} workers @ Client: {client.dashboard_link}") # Catch UserWarnings when rechunking data: client.register_worker_plugin(CaptureWarningsPlugin()) @@ -1593,7 +1593,7 @@ async def update_zarr( # Create local dask cluster & client: with LocalCluster(**dask_cluster_kwargs) as cluster, Client(cluster) as client: - logging.info(f"Created LocalCluster with {dask_cluster_kwargs["n_workers"]} workers @ Client: {client.dashboard_link}") + logging.info(f"Created LocalCluster with {dask_cluster_kwargs['n_workers']} workers @ Client: {client.dashboard_link}") # Catch UserWarnings when rechunking data: client.register_worker_plugin(CaptureWarningsPlugin()) @@ -1819,7 +1819,7 @@ def update_icechunk( # Create local dask cluster & client: with LocalCluster(**dask_cluster_kwargs) as cluster, Client(cluster) as client: - logging.info(f"Created LocalCluster with {dask_cluster_kwargs["n_workers"]} workers @ Client: {client.dashboard_link}") + logging.info(f"Created LocalCluster with {dask_cluster_kwargs['n_workers']} workers @ Client: {client.dashboard_link}") # Catch UserWarnings when rechunking data: client.register_worker_plugin(CaptureWarningsPlugin())