Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -480,11 +480,13 @@
// Test ESP markers - Shallow log file from Denmark deployment in June 2024, has large depth values in self.ds.depth.values[6500:6800]
//"args": ["-v", "1", "--log_file", "makai/missionlogs/2024/20240607_20240615/20240611T082709/202406110827_202406111026.nc4", "--update_ssds_provenance", "--clobber"]
// Test making engineering plot with a short log file
"args": ["-v", "1", "--log_file", "ahi/missionlogs/2025/20250128_20250131/20250129T145420/202501291454_202501292233.nc4"]
//"args": ["-v", "1", "--log_file", "ahi/missionlogs/2025/20250128_20250131/20250129T145420/202501291454_202501292233.nc4"]
// and for the rest of the log files of this deployment
//"args": ["-v", "1", "--auv_name", "ahi", "--start", "20250127T000000", "--end", "20250201T000000", "--update_ssds_provenance", "--clobber", "--no_cleanup"]
// Test having a better y-max that puts data instead of white space at the bottom
//"args": ["-v", "1", "--log_file", "ahi/missionlogs/2026/20260406_20260412/20260410T010521/202604100105_202604100800.nc4"]
// Test processing only new data - as we would do in production with a cron job
"args": ["-v", "1", "--last_n_days", "14"]
},
{
"name": "process_lrauv_sbd",
Expand Down
84 changes: 19 additions & 65 deletions src/data/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,20 @@ def copy_to_AUVTCD(self, nc_file_base: Path, freq: str = FREQ) -> None: # noqa:
def copy_to_M3(self, resampled_nc_file: str) -> None:
pass

def copy_to_LRAUV(self, log_file: str, freq: str = FREQ) -> None: # noqa: C901, PLR0912, PLR0915
def _archive_file(self, src_file: Path, dst_file: Path) -> None:
"""Copy src to dst; skip if dst already exists unless --clobber."""
if not src_file.exists():
return
if dst_file.exists():
if self.clobber:
dst_file.unlink()
else:
self.logger.info("Already archived, skipping: %s", dst_file.name)
return
shutil.copyfile(src_file, dst_file)
self.logger.info("copyfile %s %s done.", src_file.name, dst_file.parent)

def copy_to_LRAUV(self, log_file: str, freq: str = FREQ) -> None:
"Copy the intermediate and resampled netCDF file(s) to the archive LRAUV location"
src_dir = Path(BASE_LRAUV_PATH, Path(log_file).parent)
dst_dir = Path(LRAUV_VOL, Path(log_file).parent)
Expand All @@ -220,79 +233,20 @@ def copy_to_LRAUV(self, log_file: str, freq: str = FREQ) -> None: # noqa: C901,
self.logger.info("Is %s mounted?", self.mount_dir)
sys.exit(1)
for src_file in sorted(src_dir.glob(f"{Path(log_file).stem}_{GROUP}_*.nc")):
dst_file = Path(dst_dir, src_file.name)
if self.clobber:
if dst_file.exists():
self.logger.info("Removing %s", dst_file)
dst_file.unlink()
if src_file.exists():
shutil.copyfile(src_file, dst_file)
self.logger.info("copyfile %s %s done.", src_file, dst_dir)
elif src_file.exists():
self.logger.info(
"%-75s exists, but is not being archived because --clobber is not specified.",
src_file.name,
)
self._archive_file(src_file, Path(dst_dir, src_file.name))
for ftype in (f"{freq}.nc", "combined.nc4", "align.nc4"):
src_file = Path(src_dir, f"{Path(log_file).stem}_{ftype}")
dst_file = Path(dst_dir, src_file.name)
if self.clobber:
if dst_file.exists():
self.logger.info("Removing %s", dst_file)
dst_file.unlink()
if src_file.exists():
shutil.copyfile(src_file, dst_file)
self.logger.info("copyfile %s %s done.", src_file, dst_dir)
elif src_file.exists():
self.logger.info(
"%-36s exists, but is not being archived because --clobber is not specified.", # noqa: E501
src_file.name,
)
self._archive_file(src_file, Path(dst_dir, src_file.name))
# Copy PNG product files created by create_products.py
self.logger.info("Archiving product files")
for src_file in src_dir.glob(f"{Path(log_file).stem}_*.png"):
dst_file = Path(dst_dir, src_file.name)
if self.clobber:
if dst_file.exists():
self.logger.info("Removing %s", dst_file)
dst_file.unlink()
if src_file.exists():
shutil.copyfile(src_file, dst_file)
self.logger.info("copyfile %s %s done.", src_file, dst_dir)
elif src_file.exists():
self.logger.info(
"%-36s exists, but is not being archived because --clobber is not specified.",
src_file.name,
)

self._archive_file(src_file, Path(dst_dir, src_file.name))
# Copy ODV/text product files created by create_products.py (e.g., *_Sipper.txt)
for src_file in src_dir.glob(f"{Path(log_file).stem}_*.txt"):
dst_file = Path(dst_dir, src_file.name)
if self.clobber:
if dst_file.exists():
self.logger.info("Removing %s", dst_file)
dst_file.unlink()
if src_file.exists():
shutil.copyfile(src_file, dst_file)
self.logger.info("copyfile %s %s done.", src_file, dst_dir)
elif src_file.exists():
self.logger.info(
"%-36s exists, but is not being archived because --clobber is not specified.",
src_file.name,
)
self._archive_file(src_file, Path(dst_dir, src_file.name))
# Copy the processing.log file last so that we get everything
src_file = Path(src_dir, f"{Path(log_file).stem}_{LOG_NAME}")
dst_file = Path(dst_dir, src_file.name)
if src_file.exists():
if self.clobber:
self.logger.info("copyfile %s %s", src_file, dst_dir)
shutil.copyfile(src_file, dst_file)
self.logger.info("copyfile %s %s done.", src_file, dst_dir)
elif src_file.exists():
self.logger.info(
"%26s exists, but is not being archived because --clobber is not specified.", # noqa: E501
src_file.name,
)
self._archive_file(src_file, Path(dst_dir, src_file.name))

def copy_sbd_to_LRAUV(self, sbd_nc_path: Path) -> None:
"""Copy SBD resampled netCDF and any product files to the LRAUV archive volume.
Expand Down
65 changes: 43 additions & 22 deletions src/data/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class data are: download_process and calibrate, while for LRAUV class data
from socket import gethostname

from align import Align_NetCDF, InvalidCalFile, InvalidCombinedFile
from archive import LOG_NAME, Archiver
from archive import LOG_NAME, LRAUV_VOL, Archiver
from calibrate import EXPECTED_SENSORS, Calibrate_NetCDF
from combine import Combine_NetCDF
from create_products import MISSIONIMAGES, CreateProducts
Expand Down Expand Up @@ -115,9 +115,8 @@ def wrapper(self, log_file: str):
raise
finally:
if hasattr(self, "log_handler"):
# Cleanup and archiving logic
if self.config.get("clobber"):
self.archive(mission=None, log_file=log_file)
# Archive whenever processing ran (freshness check happens before this decorator)
self.archive(mission=None, log_file=log_file)
if not self.config.get("no_cleanup"):
self.cleanup(log_file=log_file)
self.logger.info(
Expand Down Expand Up @@ -1301,12 +1300,48 @@ def process_log_file(self, log_file: str) -> None:
)
self.logger.info("Finished processing log file: %s", log_file)

def _process_log_file_list(self, log_files: list) -> None:
"""Apply freshness check and process each log file in the list."""
for log_file in log_files:
self.auv_name = log_file.split("/")[0].lower()
if not self.config.get("clobber") and self._is_log_output_fresh(log_file):
continue
self.logger.info("Processing log file: %s", log_file)
try:
self.process_log_file(log_file)
except (InvalidCalFile, InvalidCombinedFile) as e:
self.logger.warning("%s", e)

def _is_log_output_fresh(self, log_file: str) -> bool:
"""Return True if the archived _1S.nc is up to date vs the source nc4 on the NFS mount.

Uses the NFS-mount copy of the nc4 (always present) and the archived _1S.nc on
LRAUV_VOL (written by copy_to_LRAUV after cleanup removes local copies).
Returns False when either file is absent so processing proceeds normally.
"""
nc4_path = Path(self.vehicle_dir, log_file)
resampled_path = Path(
LRAUV_VOL,
Path(log_file).parent,
f"{Path(log_file).stem}_{FREQ}.nc",
)
if not nc4_path.exists() or not resampled_path.exists():
return False
if nc4_path.stat().st_mtime <= resampled_path.stat().st_mtime:
self.logger.info("Output up to date, skipping: %s", log_file)
return True
self.logger.info("Source nc4 is newer, reprocessing: %s", log_file)
return False

def process_log_files(self) -> None:
if self.config.get("log_file"):
# log_file is string like:
# brizo/missionlogs/2025/20250909_20250915/20250914T080941/202509140809_202509150109.nc4
self.auv_name = self.config["log_file"].split("/")[0].lower()
self.process_log_file(self.config["log_file"])
log_file = self.config["log_file"]
self.auv_name = log_file.split("/")[0].lower()
if not self.config.get("clobber") and self._is_log_output_fresh(log_file):
return
self.process_log_file(log_file)
elif self.config.get("start") and self.config.get("end"):
# Process multiple log files within datetime range
log_files = self.log_file_list(
Expand All @@ -1321,14 +1356,7 @@ def process_log_files(self) -> None:
return

self.logger.info("Processing %d log files in datetime range", len(log_files))
for log_file in log_files:
# Extract AUV name from path
self.auv_name = log_file.split("/")[0].lower()
self.logger.info("Processing log file: %s", log_file)
try:
self.process_log_file(log_file)
except (InvalidCalFile, InvalidCombinedFile) as e:
self.logger.warning("%s", e)
self._process_log_file_list(log_files)
elif self.config.get("last_n_days"):
# Process log files from the last N days
end_dt = datetime.now(tz=UTC)
Expand Down Expand Up @@ -1358,14 +1386,7 @@ def process_log_files(self) -> None:
len(log_files),
self.config["last_n_days"],
)
for log_file in log_files:
# Extract AUV name from path
self.auv_name = log_file.split("/")[0].lower()
self.logger.info("Processing log file: %s", log_file)
try:
self.process_log_file(log_file)
except (InvalidCalFile, InvalidCombinedFile) as e:
self.logger.warning("%s", e)
self._process_log_file_list(log_files)
else:
self.logger.error(
"Must provide either --log_file, both --start and --end, or --last_n_days arguments"
Expand Down
Loading