diff --git a/.vscode/launch.json b/.vscode/launch.json index 173d2ff..2369e22 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -480,11 +480,13 @@ // Test ESP markers - Shallow log file from Denmark deployment in June 2024, has large depth values in self.ds.depth.values[6500:6800] //"args": ["-v", "1", "--log_file", "makai/missionlogs/2024/20240607_20240615/20240611T082709/202406110827_202406111026.nc4", "--update_ssds_provenance", "--clobber"] // Test making engineering plot with a short log file - "args": ["-v", "1", "--log_file", "ahi/missionlogs/2025/20250128_20250131/20250129T145420/202501291454_202501292233.nc4"] + //"args": ["-v", "1", "--log_file", "ahi/missionlogs/2025/20250128_20250131/20250129T145420/202501291454_202501292233.nc4"] // and for the rest of the log files of this deployment //"args": ["-v", "1", "--auv_name", "ahi", "--start", "20250127T000000", "--end", "20250201T000000", "--update_ssds_provenance", "--clobber", "--no_cleanup"] // Test having a better y-max that puts data instead of white space at the bottom //"args": ["-v", "1", "--log_file", "ahi/missionlogs/2026/20260406_20260412/20260410T010521/202604100105_202604100800.nc4"] + // Test processing only new data - as we would do in production with a cron job + "args": ["-v", "1", "--last_n_days", "14"] }, { "name": "process_lrauv_sbd", diff --git a/src/data/archive.py b/src/data/archive.py index c31b75b..280a70b 100755 --- a/src/data/archive.py +++ b/src/data/archive.py @@ -209,7 +209,20 @@ def copy_to_AUVTCD(self, nc_file_base: Path, freq: str = FREQ) -> None: # noqa: def copy_to_M3(self, resampled_nc_file: str) -> None: pass - def copy_to_LRAUV(self, log_file: str, freq: str = FREQ) -> None: # noqa: C901, PLR0912, PLR0915 + def _archive_file(self, src_file: Path, dst_file: Path) -> None: + """Copy src to dst; skip if dst already exists unless --clobber.""" + if not src_file.exists(): + return + if dst_file.exists(): + if self.clobber: + dst_file.unlink() + else: + self.logger.info("Already archived, skipping: %s", dst_file.name) + return + shutil.copyfile(src_file, dst_file) + self.logger.info("copyfile %s %s done.", src_file.name, dst_file.parent) + + def copy_to_LRAUV(self, log_file: str, freq: str = FREQ) -> None: "Copy the intermediate and resampled netCDF file(s) to the archive LRAUV location" src_dir = Path(BASE_LRAUV_PATH, Path(log_file).parent) dst_dir = Path(LRAUV_VOL, Path(log_file).parent) @@ -220,79 +233,20 @@ def copy_to_LRAUV(self, log_file: str, freq: str = FREQ) -> None: # noqa: C901, self.logger.info("Is %s mounted?", self.mount_dir) sys.exit(1) for src_file in sorted(src_dir.glob(f"{Path(log_file).stem}_{GROUP}_*.nc")): - dst_file = Path(dst_dir, src_file.name) - if self.clobber: - if dst_file.exists(): - self.logger.info("Removing %s", dst_file) - dst_file.unlink() - if src_file.exists(): - shutil.copyfile(src_file, dst_file) - self.logger.info("copyfile %s %s done.", src_file, dst_dir) - elif src_file.exists(): - self.logger.info( - "%-75s exists, but is not being archived because --clobber is not specified.", - src_file.name, - ) + self._archive_file(src_file, Path(dst_dir, src_file.name)) for ftype in (f"{freq}.nc", "combined.nc4", "align.nc4"): src_file = Path(src_dir, f"{Path(log_file).stem}_{ftype}") - dst_file = Path(dst_dir, src_file.name) - if self.clobber: - if dst_file.exists(): - self.logger.info("Removing %s", dst_file) - dst_file.unlink() - if src_file.exists(): - shutil.copyfile(src_file, dst_file) - self.logger.info("copyfile %s %s done.", src_file, dst_dir) - elif src_file.exists(): - self.logger.info( - "%-36s exists, but is not being archived because --clobber is not specified.", # noqa: E501 - src_file.name, - ) + self._archive_file(src_file, Path(dst_dir, src_file.name)) # Copy PNG product files created by create_products.py self.logger.info("Archiving product files") for src_file in src_dir.glob(f"{Path(log_file).stem}_*.png"): - dst_file = Path(dst_dir, src_file.name) - if self.clobber: - if dst_file.exists(): - self.logger.info("Removing %s", dst_file) - dst_file.unlink() - if src_file.exists(): - shutil.copyfile(src_file, dst_file) - self.logger.info("copyfile %s %s done.", src_file, dst_dir) - elif src_file.exists(): - self.logger.info( - "%-36s exists, but is not being archived because --clobber is not specified.", - src_file.name, - ) - + self._archive_file(src_file, Path(dst_dir, src_file.name)) # Copy ODV/text product files created by create_products.py (e.g., *_Sipper.txt) for src_file in src_dir.glob(f"{Path(log_file).stem}_*.txt"): - dst_file = Path(dst_dir, src_file.name) - if self.clobber: - if dst_file.exists(): - self.logger.info("Removing %s", dst_file) - dst_file.unlink() - if src_file.exists(): - shutil.copyfile(src_file, dst_file) - self.logger.info("copyfile %s %s done.", src_file, dst_dir) - elif src_file.exists(): - self.logger.info( - "%-36s exists, but is not being archived because --clobber is not specified.", - src_file.name, - ) + self._archive_file(src_file, Path(dst_dir, src_file.name)) # Copy the processing.log file last so that we get everything src_file = Path(src_dir, f"{Path(log_file).stem}_{LOG_NAME}") - dst_file = Path(dst_dir, src_file.name) - if src_file.exists(): - if self.clobber: - self.logger.info("copyfile %s %s", src_file, dst_dir) - shutil.copyfile(src_file, dst_file) - self.logger.info("copyfile %s %s done.", src_file, dst_dir) - elif src_file.exists(): - self.logger.info( - "%26s exists, but is not being archived because --clobber is not specified.", # noqa: E501 - src_file.name, - ) + self._archive_file(src_file, Path(dst_dir, src_file.name)) def copy_sbd_to_LRAUV(self, sbd_nc_path: Path) -> None: """Copy SBD resampled netCDF and any product files to the LRAUV archive volume. diff --git a/src/data/process.py b/src/data/process.py index 52b4ae4..d859d4d 100755 --- a/src/data/process.py +++ b/src/data/process.py @@ -66,7 +66,7 @@ class data are: download_process and calibrate, while for LRAUV class data from socket import gethostname from align import Align_NetCDF, InvalidCalFile, InvalidCombinedFile -from archive import LOG_NAME, Archiver +from archive import LOG_NAME, LRAUV_VOL, Archiver from calibrate import EXPECTED_SENSORS, Calibrate_NetCDF from combine import Combine_NetCDF from create_products import MISSIONIMAGES, CreateProducts @@ -115,9 +115,8 @@ def wrapper(self, log_file: str): raise finally: if hasattr(self, "log_handler"): - # Cleanup and archiving logic - if self.config.get("clobber"): - self.archive(mission=None, log_file=log_file) + # Archive whenever processing ran (freshness check happens before this decorator) + self.archive(mission=None, log_file=log_file) if not self.config.get("no_cleanup"): self.cleanup(log_file=log_file) self.logger.info( @@ -1301,12 +1300,48 @@ def process_log_file(self, log_file: str) -> None: ) self.logger.info("Finished processing log file: %s", log_file) + def _process_log_file_list(self, log_files: list) -> None: + """Apply freshness check and process each log file in the list.""" + for log_file in log_files: + self.auv_name = log_file.split("/")[0].lower() + if not self.config.get("clobber") and self._is_log_output_fresh(log_file): + continue + self.logger.info("Processing log file: %s", log_file) + try: + self.process_log_file(log_file) + except (InvalidCalFile, InvalidCombinedFile) as e: + self.logger.warning("%s", e) + + def _is_log_output_fresh(self, log_file: str) -> bool: + """Return True if the archived _1S.nc is up to date vs the source nc4 on the NFS mount. + + Uses the NFS-mount copy of the nc4 (always present) and the archived _1S.nc on + LRAUV_VOL (written by copy_to_LRAUV after cleanup removes local copies). + Returns False when either file is absent so processing proceeds normally. + """ + nc4_path = Path(self.vehicle_dir, log_file) + resampled_path = Path( + LRAUV_VOL, + Path(log_file).parent, + f"{Path(log_file).stem}_{FREQ}.nc", + ) + if not nc4_path.exists() or not resampled_path.exists(): + return False + if nc4_path.stat().st_mtime <= resampled_path.stat().st_mtime: + self.logger.info("Output up to date, skipping: %s", log_file) + return True + self.logger.info("Source nc4 is newer, reprocessing: %s", log_file) + return False + def process_log_files(self) -> None: if self.config.get("log_file"): # log_file is string like: # brizo/missionlogs/2025/20250909_20250915/20250914T080941/202509140809_202509150109.nc4 - self.auv_name = self.config["log_file"].split("/")[0].lower() - self.process_log_file(self.config["log_file"]) + log_file = self.config["log_file"] + self.auv_name = log_file.split("/")[0].lower() + if not self.config.get("clobber") and self._is_log_output_fresh(log_file): + return + self.process_log_file(log_file) elif self.config.get("start") and self.config.get("end"): # Process multiple log files within datetime range log_files = self.log_file_list( @@ -1321,14 +1356,7 @@ def process_log_files(self) -> None: return self.logger.info("Processing %d log files in datetime range", len(log_files)) - for log_file in log_files: - # Extract AUV name from path - self.auv_name = log_file.split("/")[0].lower() - self.logger.info("Processing log file: %s", log_file) - try: - self.process_log_file(log_file) - except (InvalidCalFile, InvalidCombinedFile) as e: - self.logger.warning("%s", e) + self._process_log_file_list(log_files) elif self.config.get("last_n_days"): # Process log files from the last N days end_dt = datetime.now(tz=UTC) @@ -1358,14 +1386,7 @@ def process_log_files(self) -> None: len(log_files), self.config["last_n_days"], ) - for log_file in log_files: - # Extract AUV name from path - self.auv_name = log_file.split("/")[0].lower() - self.logger.info("Processing log file: %s", log_file) - try: - self.process_log_file(log_file) - except (InvalidCalFile, InvalidCombinedFile) as e: - self.logger.warning("%s", e) + self._process_log_file_list(log_files) else: self.logger.error( "Must provide either --log_file, both --start and --end, or --last_n_days arguments"