diff --git a/.vscode/launch.json b/.vscode/launch.json
index 3c2316c..2e05de9 100644
--- a/.vscode/launch.json
+++ b/.vscode/launch.json
@@ -302,7 +302,8 @@
//"args": ["-v", "1", "--mission", "2018.348.01", "--noinput", "--no_cleanup", "--clobber"],
//"args": ["-v", "1", "--mission", "2024.324.00", "--noinput"],
//"args": ["-v", "1", "--mission", "2025.030.00", "--noinput"],
- "args": ["-v", "1", "--mission", "2025.296.00", "--noinput", "--no_cleanup"],
+ //"args": ["-v", "1", "--mission", "2025.296.00", "--noinput", "--no_cleanup"],
+ "args": ["-v", "1", "--mission", "2026.138.02", "--noinput", "--no_cleanup", "--clobber"],
},
{
"name": "process_dorado",
diff --git a/src/data/calibrate.py b/src/data/calibrate.py
index 627e6b7..c67eb02 100755
--- a/src/data/calibrate.py
+++ b/src/data/calibrate.py
@@ -1186,131 +1186,110 @@ def _cal_date_xml_files(
return OrderedDict(sorted(cal_date_xml_files.items()))
- def _read_oxy_coeffs( # noqa: C901, PLR0912, PLR0915
- self,
- cfg_filename: Path,
- portstbd: str = "",
- ) -> tuple[Coeffs, str]:
- """Based on the serial number found as a comment in the .cfg file find
- the approriate calibration coefficients for the oxygen sensor within the
- '/DMO/MDUC_CORE_CTD_200103/Calibration Files' shared drive folder.
- portstbd is either "", "port" or "stbd".
+ def _parse_cfg_serial_numbers(self, cfg_filename: Path) -> list[int]:
+ """Read //OxygenSerialNumber comments from a Seabird .cfg file.
+
+ For i2map the lines look like:
+ //OxygenSerialNumber = 2510;
"""
- # For i2map .cfg file lines look like:
- # //OxygenSerialNumber = 2510;
- # //note - this is the sensor in line with the C & T sensors. Goes to voltage channel 3
- #
- # //OxygenSerialNumber = 3968;
- # //note - this sensor is installed on the stbd side of the vehicle in line with the
- # // transmissometer. Goes to voltage channel 5
- # //note - seabird has adopted a new DO calibration with a polynomial for temp correction
- # //A = -3.0812e-003
- # //B = 7.8442e-005
- # //C = -9.0601e-007
- # //E = 0.036
- # SOc = 0.4466;
- # BOc = 0.0000;
- # Voff = -0.5070;
- # TCor = -0.0000;
- # PCor = 1.3500e-04; //not given in new calibration sheet
-
- # Read from .cfg file to get the serial numbers of the oxygen sensors
self.logger.debug("Opening %s", cfg_filename)
- coeffs = Coeffs()
-
- portstbd_order = {
- "port": 0,
- "stbd": 1,
- } # Typical order of oxygen sensors in seabird25p.cfg file
+ serial_numbers = []
with cfg_filename.open() as fh:
- sensor_count = 0
- serial_numbers = []
for line in fh:
self.logger.debug(line)
if line.startswith("//OxygenSerialNumber = "):
serial_numbers.append(int(line.split()[-1].strip(";")))
- sensor_count += 1
- if len(serial_numbers) == 0:
+ if not serial_numbers:
error_message = f"No oxygen sensor serial number found in {cfg_filename}"
raise ValueError(error_message)
if len(serial_numbers) > 2: # noqa: PLR2004
error_message = f"More than 2 oxygen sensor serial numbers found in {cfg_filename}"
raise ValueError(error_message)
+ return serial_numbers
+
+ def _select_oxy_serial_number(
+ self,
+ serial_numbers: list[int],
+ portstbd: str,
+ cfg_filename: Path,
+ ) -> int:
+ """Return the serial number for the requested port/stbd O2 sensor.
+
+ Typical order in seabird25p.cfg: port=index 0, stbd=index 1.
+ """
+ portstbd_order = {"port": 0, "stbd": 1}
if portstbd:
- serial_number = serial_numbers[portstbd_order[portstbd]]
+ idx = portstbd_order[portstbd]
+ if idx >= len(serial_numbers):
+ error_message = (
+ f"'{portstbd}' side requires serial_numbers[{idx}], but {cfg_filename} "
+ f"only contains {len(serial_numbers)} OxygenSerialNumber "
+ f"entries: {serial_numbers}"
+ )
+ raise ValueError(error_message)
self.logger.info(
"Looking for calibration file for O2 sensor serial number %s on %s side",
- serial_number,
+ serial_numbers[idx],
portstbd,
)
- elif len(serial_numbers) == 1:
+ return serial_numbers[idx]
+ if len(serial_numbers) == 1:
self.logger.info(
"Looking for calibration file for O2 sensor serial number %s",
serial_numbers[0],
)
- serial_number = serial_numbers[0]
- else:
- error_message = (
- f"Multiple oxygen sensor serial numbers found in {cfg_filename} "
- "with no port or stbd specified"
- )
- raise ValueError(error_message)
-
- # Find the calibration file for the oxygen sensor
- self.logger.debug(
- "Finding calibration file for oxygen serial number = %s on mission %s",
- serial_number,
- self.mission,
+ return serial_numbers[0]
+ error_message = (
+ f"Multiple oxygen sensor serial numbers found in {cfg_filename} "
+ "with no port or stbd specified"
)
+ raise ValueError(error_message)
+ def _find_sensor_cal_dir(self, serial_number: int) -> tuple[str, list[str]]:
+ """Locate the calibration directory and date subdirectories for an O2 serial number."""
safe_calibration_dir = Path(self.calibration_dir).resolve()
if not safe_calibration_dir.is_dir():
error_message = f"Calibration directory '{self.calibration_dir}' does not exist"
raise LookupError(error_message)
find_cmd = f'find "{safe_calibration_dir}" -name "{serial_number}"'
self.logger.info("Executing: %s ", find_cmd)
- safe_find_cmd = shlex.split(find_cmd)
sensor_dir = subprocess.run( # noqa: S603
- safe_find_cmd, # noqa: S603
+ shlex.split(find_cmd), # noqa: S603
capture_output=True,
text=True,
check=True,
).stdout.strip()
self.logger.debug("%s", sensor_dir)
-
safe_sensor_dir = Path(sensor_dir).resolve()
if not safe_sensor_dir.is_dir():
error_message = f"Sensor directory '{sensor_dir}' does not exist"
raise LookupError(error_message)
- # Find only the direct child directories: https://stackoverflow.com/a/20103980
- # Unable to use subprocess.run() with find an "*" in the command, apparently
+ # Find only direct child directories: https://stackoverflow.com/a/20103980
+ # Unable to use subprocess.run() with find and "*" in the command
dir_find_cmd = f'find "{safe_sensor_dir}"/* -maxdepth 0 -type d'
self.logger.debug("Executing: dir_find_cmd = %s", dir_find_cmd)
cal_date_dirs = [x.split("/")[-1] for x in os.popen(dir_find_cmd).read().split("\n") if x] # noqa: S605
self.logger.info("Found calibration date dirs: %s", " ".join(cal_date_dirs))
- cal_dates = self._cal_date_xml_files(sensor_dir, cal_date_dirs, serial_number)
+ return sensor_dir, cal_date_dirs
+
+ def _select_best_cal_date(self, cal_dates: dict) -> datetime:
+ """Return the most recent calibration date that precedes the mission start."""
mission_start = self.seabird25p.orig_data.cf["time"].to_numpy()[0]
- cal_date_to_use = next(iter(cal_dates)) # Default to first calibration date
- for cal_date in cal_dates:
- # Find the most recent calibration date just before the mission start
+ cal_date_to_use = next(iter(cal_dates))
+ for cal_date, cal_xml_file in cal_dates.items():
self.logger.debug(
"Comparing cal_date=%s with mission_start=%s", cal_date, mission_start
)
- self.logger.info(
- "File %s has CalibrationDate %s",
- cal_dates[cal_date],
- cal_date,
- )
+ self.logger.info("File %s has CalibrationDate %s", cal_xml_file, cal_date)
if np.datetime64(cal_date.replace(tzinfo=None)) > mission_start:
self.logger.info(
"Breaking from loop as %s is after %s with mission_start=%s",
- cal_dates[cal_date],
+ cal_xml_file,
self.mission,
mission_start,
)
break
cal_date_to_use = cal_date
-
if np.datetime64(cal_date_to_use.replace(tzinfo=None)) < mission_start:
self.logger.info(
"File %s is just before %s with mission_start=%s",
@@ -1325,55 +1304,57 @@ def _read_oxy_coeffs( # noqa: C901, PLR0912, PLR0915
self.mission,
mission_start,
)
+ return cal_date_to_use
+
+ def _parse_cal_xml_coeffs(self, cal_xml_path: str, serial_number: int) -> Coeffs:
+ """Parse equation-1 calibration coefficients from a Seabird O2 XML cal file.
- # Read the calibration coefficients from the .cal file which looks like:
- # INSTRUMENT_TYPE=SBE43
- # SERIALNO=2510
- # OCALDATE=09-Sep-14
- # SOC= 4.533809e-001
- # VOFFSET=-5.191352e-001
- # A=-5.251956e-003
- # B= 2.762519e-004
- # C=-4.164687e-006
- # E= 3.600000e-002
- # Tau20= 1.030000e+000
-
- # parse the .xml file to get the "equation 1" calibration coefficients:
- #
- #
- # 5.0544e-001
- # -0.5124
- # -4.8460e-003
- # 2.2670e-004
- # -3.2013e-006
- # 2.5826e+000
- # 1.92634e-004
- # -4.64803e-002
- # 3.6000e-002
- # 1.5600
- # -3.3000e-002
- # 5.0000e+003
- # 1.4500e+003
- #
- root = ET.parse(cal_dates[cal_date_to_use]).getroot()
+ The file contains a element with
+ children like , , , , , , , , ,
+ , , , .
+ """
+ coeffs = Coeffs()
+ root = ET.parse(cal_xml_path).getroot()
cal_xml_serial_number = int(root.find("SerialNumber").text)
if cal_xml_serial_number != serial_number:
self.logger.warning(
"Serial number in %s = %s does not match %s",
- cal_dates[cal_date_to_use],
+ cal_xml_path,
cal_xml_serial_number,
serial_number,
)
for elem in root.findall("CalibrationCoefficients[@equation]"):
if elem.attrib["equation"] == "1":
- eq1 = elem
- for child in eq1:
- try:
- setattr(coeffs, child.tag, float(child.text))
- except ValueError:
- setattr(coeffs, child.tag, child.text)
+ for child in elem:
+ try:
+ setattr(coeffs, child.tag, float(child.text))
+ except ValueError:
+ setattr(coeffs, child.tag, child.text)
+ return coeffs
- return coeffs, cal_dates[cal_date_to_use]
+ def _read_oxy_coeffs(
+ self,
+ cfg_filename: Path,
+ portstbd: str = "",
+ ) -> tuple[Coeffs, str]:
+ """Find O2 calibration coefficients for the serial number in the .cfg file.
+
+ Looks up the appropriate XML cal file from
+ '/DMO/MDUC_CORE_CTD_200103/Calibration Files'.
+ portstbd is "", "port", or "stbd".
+ """
+ serial_numbers = self._parse_cfg_serial_numbers(cfg_filename)
+ serial_number = self._select_oxy_serial_number(serial_numbers, portstbd, cfg_filename)
+ self.logger.debug(
+ "Finding calibration file for oxygen serial number = %s on mission %s",
+ serial_number,
+ self.mission,
+ )
+ sensor_dir, cal_date_dirs = self._find_sensor_cal_dir(serial_number)
+ cal_dates = self._cal_date_xml_files(sensor_dir, cal_date_dirs, serial_number)
+ cal_date = self._select_best_cal_date(cal_dates)
+ coeffs = self._parse_cal_xml_coeffs(cal_dates[cal_date], serial_number)
+ return coeffs, cal_dates[cal_date]
def _read_eco_dev(self, dev_filename):
"""Read calibration information from the file associated with the