diff --git a/.vscode/launch.json b/.vscode/launch.json index 3c2316c..2e05de9 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -302,7 +302,8 @@ //"args": ["-v", "1", "--mission", "2018.348.01", "--noinput", "--no_cleanup", "--clobber"], //"args": ["-v", "1", "--mission", "2024.324.00", "--noinput"], //"args": ["-v", "1", "--mission", "2025.030.00", "--noinput"], - "args": ["-v", "1", "--mission", "2025.296.00", "--noinput", "--no_cleanup"], + //"args": ["-v", "1", "--mission", "2025.296.00", "--noinput", "--no_cleanup"], + "args": ["-v", "1", "--mission", "2026.138.02", "--noinput", "--no_cleanup", "--clobber"], }, { "name": "process_dorado", diff --git a/src/data/calibrate.py b/src/data/calibrate.py index 627e6b7..c67eb02 100755 --- a/src/data/calibrate.py +++ b/src/data/calibrate.py @@ -1186,131 +1186,110 @@ def _cal_date_xml_files( return OrderedDict(sorted(cal_date_xml_files.items())) - def _read_oxy_coeffs( # noqa: C901, PLR0912, PLR0915 - self, - cfg_filename: Path, - portstbd: str = "", - ) -> tuple[Coeffs, str]: - """Based on the serial number found as a comment in the .cfg file find - the approriate calibration coefficients for the oxygen sensor within the - '/DMO/MDUC_CORE_CTD_200103/Calibration Files' shared drive folder. - portstbd is either "", "port" or "stbd". + def _parse_cfg_serial_numbers(self, cfg_filename: Path) -> list[int]: + """Read //OxygenSerialNumber comments from a Seabird .cfg file. + + For i2map the lines look like: + //OxygenSerialNumber = 2510; """ - # For i2map .cfg file lines look like: - # //OxygenSerialNumber = 2510; - # //note - this is the sensor in line with the C & T sensors. Goes to voltage channel 3 - # - # //OxygenSerialNumber = 3968; - # //note - this sensor is installed on the stbd side of the vehicle in line with the - # // transmissometer. Goes to voltage channel 5 - # //note - seabird has adopted a new DO calibration with a polynomial for temp correction - # //A = -3.0812e-003 - # //B = 7.8442e-005 - # //C = -9.0601e-007 - # //E = 0.036 - # SOc = 0.4466; - # BOc = 0.0000; - # Voff = -0.5070; - # TCor = -0.0000; - # PCor = 1.3500e-04; //not given in new calibration sheet - - # Read from .cfg file to get the serial numbers of the oxygen sensors self.logger.debug("Opening %s", cfg_filename) - coeffs = Coeffs() - - portstbd_order = { - "port": 0, - "stbd": 1, - } # Typical order of oxygen sensors in seabird25p.cfg file + serial_numbers = [] with cfg_filename.open() as fh: - sensor_count = 0 - serial_numbers = [] for line in fh: self.logger.debug(line) if line.startswith("//OxygenSerialNumber = "): serial_numbers.append(int(line.split()[-1].strip(";"))) - sensor_count += 1 - if len(serial_numbers) == 0: + if not serial_numbers: error_message = f"No oxygen sensor serial number found in {cfg_filename}" raise ValueError(error_message) if len(serial_numbers) > 2: # noqa: PLR2004 error_message = f"More than 2 oxygen sensor serial numbers found in {cfg_filename}" raise ValueError(error_message) + return serial_numbers + + def _select_oxy_serial_number( + self, + serial_numbers: list[int], + portstbd: str, + cfg_filename: Path, + ) -> int: + """Return the serial number for the requested port/stbd O2 sensor. + + Typical order in seabird25p.cfg: port=index 0, stbd=index 1. + """ + portstbd_order = {"port": 0, "stbd": 1} if portstbd: - serial_number = serial_numbers[portstbd_order[portstbd]] + idx = portstbd_order[portstbd] + if idx >= len(serial_numbers): + error_message = ( + f"'{portstbd}' side requires serial_numbers[{idx}], but {cfg_filename} " + f"only contains {len(serial_numbers)} OxygenSerialNumber " + f"entries: {serial_numbers}" + ) + raise ValueError(error_message) self.logger.info( "Looking for calibration file for O2 sensor serial number %s on %s side", - serial_number, + serial_numbers[idx], portstbd, ) - elif len(serial_numbers) == 1: + return serial_numbers[idx] + if len(serial_numbers) == 1: self.logger.info( "Looking for calibration file for O2 sensor serial number %s", serial_numbers[0], ) - serial_number = serial_numbers[0] - else: - error_message = ( - f"Multiple oxygen sensor serial numbers found in {cfg_filename} " - "with no port or stbd specified" - ) - raise ValueError(error_message) - - # Find the calibration file for the oxygen sensor - self.logger.debug( - "Finding calibration file for oxygen serial number = %s on mission %s", - serial_number, - self.mission, + return serial_numbers[0] + error_message = ( + f"Multiple oxygen sensor serial numbers found in {cfg_filename} " + "with no port or stbd specified" ) + raise ValueError(error_message) + def _find_sensor_cal_dir(self, serial_number: int) -> tuple[str, list[str]]: + """Locate the calibration directory and date subdirectories for an O2 serial number.""" safe_calibration_dir = Path(self.calibration_dir).resolve() if not safe_calibration_dir.is_dir(): error_message = f"Calibration directory '{self.calibration_dir}' does not exist" raise LookupError(error_message) find_cmd = f'find "{safe_calibration_dir}" -name "{serial_number}"' self.logger.info("Executing: %s ", find_cmd) - safe_find_cmd = shlex.split(find_cmd) sensor_dir = subprocess.run( # noqa: S603 - safe_find_cmd, # noqa: S603 + shlex.split(find_cmd), # noqa: S603 capture_output=True, text=True, check=True, ).stdout.strip() self.logger.debug("%s", sensor_dir) - safe_sensor_dir = Path(sensor_dir).resolve() if not safe_sensor_dir.is_dir(): error_message = f"Sensor directory '{sensor_dir}' does not exist" raise LookupError(error_message) - # Find only the direct child directories: https://stackoverflow.com/a/20103980 - # Unable to use subprocess.run() with find an "*" in the command, apparently + # Find only direct child directories: https://stackoverflow.com/a/20103980 + # Unable to use subprocess.run() with find and "*" in the command dir_find_cmd = f'find "{safe_sensor_dir}"/* -maxdepth 0 -type d' self.logger.debug("Executing: dir_find_cmd = %s", dir_find_cmd) cal_date_dirs = [x.split("/")[-1] for x in os.popen(dir_find_cmd).read().split("\n") if x] # noqa: S605 self.logger.info("Found calibration date dirs: %s", " ".join(cal_date_dirs)) - cal_dates = self._cal_date_xml_files(sensor_dir, cal_date_dirs, serial_number) + return sensor_dir, cal_date_dirs + + def _select_best_cal_date(self, cal_dates: dict) -> datetime: + """Return the most recent calibration date that precedes the mission start.""" mission_start = self.seabird25p.orig_data.cf["time"].to_numpy()[0] - cal_date_to_use = next(iter(cal_dates)) # Default to first calibration date - for cal_date in cal_dates: - # Find the most recent calibration date just before the mission start + cal_date_to_use = next(iter(cal_dates)) + for cal_date, cal_xml_file in cal_dates.items(): self.logger.debug( "Comparing cal_date=%s with mission_start=%s", cal_date, mission_start ) - self.logger.info( - "File %s has CalibrationDate %s", - cal_dates[cal_date], - cal_date, - ) + self.logger.info("File %s has CalibrationDate %s", cal_xml_file, cal_date) if np.datetime64(cal_date.replace(tzinfo=None)) > mission_start: self.logger.info( "Breaking from loop as %s is after %s with mission_start=%s", - cal_dates[cal_date], + cal_xml_file, self.mission, mission_start, ) break cal_date_to_use = cal_date - if np.datetime64(cal_date_to_use.replace(tzinfo=None)) < mission_start: self.logger.info( "File %s is just before %s with mission_start=%s", @@ -1325,55 +1304,57 @@ def _read_oxy_coeffs( # noqa: C901, PLR0912, PLR0915 self.mission, mission_start, ) + return cal_date_to_use + + def _parse_cal_xml_coeffs(self, cal_xml_path: str, serial_number: int) -> Coeffs: + """Parse equation-1 calibration coefficients from a Seabird O2 XML cal file. - # Read the calibration coefficients from the .cal file which looks like: - # INSTRUMENT_TYPE=SBE43 - # SERIALNO=2510 - # OCALDATE=09-Sep-14 - # SOC= 4.533809e-001 - # VOFFSET=-5.191352e-001 - # A=-5.251956e-003 - # B= 2.762519e-004 - # C=-4.164687e-006 - # E= 3.600000e-002 - # Tau20= 1.030000e+000 - - # parse the .xml file to get the "equation 1" calibration coefficients: - # - # - # 5.0544e-001 - # -0.5124 - # -4.8460e-003 - # 2.2670e-004 - # -3.2013e-006 - # 2.5826e+000 - # 1.92634e-004 - # -4.64803e-002 - # 3.6000e-002 - # 1.5600 - #

-3.3000e-002

- #

5.0000e+003

- #

1.4500e+003

- #
- root = ET.parse(cal_dates[cal_date_to_use]).getroot() + The file contains a element with + children like , , , , , , , , , + ,

,

,

. + """ + coeffs = Coeffs() + root = ET.parse(cal_xml_path).getroot() cal_xml_serial_number = int(root.find("SerialNumber").text) if cal_xml_serial_number != serial_number: self.logger.warning( "Serial number in %s = %s does not match %s", - cal_dates[cal_date_to_use], + cal_xml_path, cal_xml_serial_number, serial_number, ) for elem in root.findall("CalibrationCoefficients[@equation]"): if elem.attrib["equation"] == "1": - eq1 = elem - for child in eq1: - try: - setattr(coeffs, child.tag, float(child.text)) - except ValueError: - setattr(coeffs, child.tag, child.text) + for child in elem: + try: + setattr(coeffs, child.tag, float(child.text)) + except ValueError: + setattr(coeffs, child.tag, child.text) + return coeffs - return coeffs, cal_dates[cal_date_to_use] + def _read_oxy_coeffs( + self, + cfg_filename: Path, + portstbd: str = "", + ) -> tuple[Coeffs, str]: + """Find O2 calibration coefficients for the serial number in the .cfg file. + + Looks up the appropriate XML cal file from + '/DMO/MDUC_CORE_CTD_200103/Calibration Files'. + portstbd is "", "port", or "stbd". + """ + serial_numbers = self._parse_cfg_serial_numbers(cfg_filename) + serial_number = self._select_oxy_serial_number(serial_numbers, portstbd, cfg_filename) + self.logger.debug( + "Finding calibration file for oxygen serial number = %s on mission %s", + serial_number, + self.mission, + ) + sensor_dir, cal_date_dirs = self._find_sensor_cal_dir(serial_number) + cal_dates = self._cal_date_xml_files(sensor_dir, cal_date_dirs, serial_number) + cal_date = self._select_best_cal_date(cal_dates) + coeffs = self._parse_cal_xml_coeffs(cal_dates[cal_date], serial_number) + return coeffs, cal_dates[cal_date] def _read_eco_dev(self, dev_filename): """Read calibration information from the file associated with the