Source code for swxsoc.util.util

"""
This module provides general utility functions.
"""

import re
from datetime import datetime
from pathlib import Path
from typing import List, Optional

import sunpy.time
from astropy.time import Time

import swxsoc
from swxsoc.util.exceptions import warn_user

# --- Backward compatibility: moved symbols re-exported from new locations ---
from swxsoc.db.timeseries import (
    _record_dimension_timestream as _record_dimension_timestream,
    record_timeseries as record_timeseries,
)
from swxsoc.net.attr import (
    Descriptor as Descriptor,
    DevelopmentBucket as DevelopmentBucket,
    Instrument as Instrument,
    Level as Level,
    SearchTime as SearchTime,
    walker as walker,
)
from swxsoc.net.client import SWXSOCClient as SWXSOCClient
from swxsoc.util.grafana import (
    create_annotation as create_annotation,
    get_dashboard_id as get_dashboard_id,
    get_panel_id as get_panel_id,
    query_annotations as query_annotations,
    remove_annotation_by_id as remove_annotation_by_id,
)

__all__ = [
    "create_science_filename",
    "parse_science_filename",
]

TIME_FORMAT = "%Y%m%dT%H%M%S"  # YYYYMMDDTHHMMSS

TIME_PATTERNS = {
    "unix_ms": re.compile(r"(?<!\d)\d{13}(?!\d)"),  # unix time stamps in milliseconds
    "unix_s": re.compile(r"(?<!\d)\d{10}(?!\d)"),  # unix time stamps in seconds
    "%Y-%m-%dT%H:%M:%S": re.compile(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}"),  # ISO 8601
    "%Y%m%d-%H%M%S": re.compile(r"\d{8}-\d{6}"),  # YYYYMMDD-HHMMSS
    "%Y%m%dT%H%M%S": re.compile(r"\d{8}T\d{6}"),  # YYYYMMDDTHHMMSS
    "%Y%m%d%H%M%S": re.compile(r"(?<!\d)\d{14}(?!\d)"),  # YYYYMMDDHHMMSS
    "%y%m%d%H%M%S": re.compile(r"(?<!\d)\d{12}(?!\d)"),  # YYMMDDHHMMSS
    "%Y%j-%H%M%S": re.compile(r"\d{7}-\d{6}"),  # YYYYJJJ-HHMMSS
    "%Y%j_%H%M%S": re.compile(r"\d{7}_\d{6}"),  # YYYYJJJ_HHMMSS
    "%Y%m%d": re.compile(r"(?<!\d)\d{8}(?!\d)"),  # YYYYMMDD
}


[docs] def create_science_filename( instrument: str, time: str, level: str, version: str, mode: str = "", descriptor: str = "", test: bool = False, ): """Return a compliant filename. The format is defined as {mission}_{inst}_{mode}_{level}{test}_{descriptor}_{time}_v{version}.cdf This format is only appropriate for data level >= 1. Parameters ---------- instrument : `str` The instrument name. Must be one of the following "eea", "nemesis", "merit", "spani" time : `str` (in isot format) or ~astropy.time The time level : `str` The data level. Must be one of the following "l0", "l1", "l2", "l3", "l4", "ql" version : `str` The file version which must be given as X.Y.Z descriptor : `str` An optional file descriptor. mode : `str` An optional instrument mode. test : bool Selects whether the file is a test file. Returns ------- filename : `str` A CDF file name including the given parameters that matches the mission's file naming conventions Raises ------ ValueError: If the instrument is not recognized as one of the mission's instruments ValueError: If the data level is not recognized as one of the mission's valid data levels ValueError: If the data version does not match the mission's data version formatting conventions ValueError: If the data product descriptor or instrument mode do not match the mission's formatting conventions """ test_str = "" mission_config = swxsoc.config["mission"] if isinstance(time, str): time_str = Time(time, format="isot").strftime(TIME_FORMAT) else: time_str = time.strftime(TIME_FORMAT) if instrument not in mission_config["inst_names"]: raise ValueError( f"Instrument, {instrument}, is not recognized. Must be one of {mission_config['inst_names']}." ) if level not in mission_config["valid_data_levels"]: raise ValueError( f"Level, {level}, is not recognized. Must be one of {mission_config['valid_data_levels']}." ) # check that version is in the right format with three parts if len(version.split(".")) != 3: raise ValueError( f"Version, {version}, is not formatted correctly. Should be X.Y.Z" ) # check that version has integers in each part for item in version.split("."): try: int(item) except ValueError: raise ValueError(f"Version, {version}, is not all integers.") if test is True: test_str = "test" # the parse_science_filename function depends on _ not being present elsewhere if ("_" in mode) or ("_" in descriptor): raise ValueError( "The underscore symbol _ is not allowed in mode or descriptor." ) # Parse Filename and Instrument Name out of the config mission_name = mission_config["mission_name"] instrument_shortname = mission_config["inst_to_shortname"].get( instrument, instrument ) # Combine Parts into Filename filename = f"{mission_name}_{instrument_shortname}_{mode}_{level}{test_str}_{descriptor}_{time_str}_v{version}" filename = filename.replace("__", "_") # reformat if mode or descriptor not given return filename + mission_config["file_extension"]
def _get_instrument_mapping(config: dict) -> dict: """ Maps instrument shortnames to their full names and additional names. This is used for parsing filenames and ensuring consistency in naming. Parameters ---------- config : dict The configuration dictionary containing mission and instrument details. Returns ------- dict A dictionary mapping shortnames to full names and additional names. """ return { **{s: m for m, s in config["inst_to_shortname"].items()}, **{s: m for m, lst in config["inst_to_extra_inst_names"].items() for s in lst}, } def _parse_standard_format(filename: str, mission_config: dict) -> dict: """ Parses the standard filename format and extracts relevant fields. Handles the following format: {mission}_{inst}_{mode}_{level}{test}_{descriptor}_{time}_v{version}.{extension} Parameters ---------- filename : str The filename to parse (with or without path). mission_config : dict The configuration dictionary containing mission and instrument details. Returns ------- dict A dictionary containing the parsed fields. Raises ------ ValueError If the filename does not match the expected format or contains invalid values. """ result = {} mission_name = mission_config["mission_name"] shortnames = mission_config["inst_shortnames"] # Split the filename into components filename = Path(filename).stem components = filename.split("_") # Handle mission names that contain underscores (e.g. "swxsoc_pipeline") # by joining the appropriate number of leading components mission_name_parts = mission_name.split("_") n_mission_parts = len(mission_name_parts) parsed_mission_name = "_".join(components[:n_mission_parts]) if parsed_mission_name != mission_name: warn_user( f"Not a valid mission name: {parsed_mission_name}. Expected: {mission_name}. Reverting to parsing with assumption of configured mission name.", ) else: # Strip mission name parts so remaining components start with instrument components = components[n_mission_parts:] if components[0] not in shortnames: raise ValueError( f"Invalid instrument shortname: {components[0]}. Expected one of {shortnames}" ) # Parse Instrument Name inst_name = components[0] mapping = _get_instrument_mapping(mission_config) result["instrument"] = mapping.get(inst_name.lower(), inst_name) result["time"] = _extract_time( filename, expected_format=TIME_FORMAT, mission_config=mission_config ) # Handle optional fields: mode, test, descriptor result["test"] = "test" in components[1] or "test" in components[2] if components[1][:2] not in mission_config["valid_data_levels"]: result["mode"] = components[1] result["level"] = components[2].replace("test", "") if len(components) == 6: result["descriptor"] = components[3] else: result["level"] = components[1].replace("test", "") if len(components) == 5: result["descriptor"] = components[2] result["version"] = components[-1].lstrip("v") return result def _extract_instrument_name(filename: str, mission_config: dict) -> str: """ Extracts the instrument name from the filename using regex patterns. Parameters ---------- filename : str The filename from which to extract the instrument name. mission_config : dict The configuration dictionary containing mission and instrument details. Returns ------- str The extracted instrument name. Raises ------ ValueError If no valid instrument name is found in the filename. """ all_inst_names = [ name.lower() for name in ( mission_config["inst_names"] + mission_config["inst_shortnames"] + [n for sublist in mission_config["extra_inst_names"] for n in sublist] ) ] mission_name = mission_config["mission_name"].lower() pattern = re.compile( rf"(?:^|[_\-.]|{mission_name})(" # Group 1: Prefix + "|".join( re.escape(name) for name in all_inst_names ) # Group 2: Instrument name + r"(?:\d+)?)(?:[_\-.]|$|\d)", # Group 3: Suffix, re.IGNORECASE, ) matches = pattern.findall(filename.lower()) if not matches: raise ValueError(f"No valid instrument name found in {filename}") if len(matches) > 1: raise ValueError(f"Multiple instrument names found: {matches}") return matches[0] def _extract_data_level(filename: str, possible_levels: List[str]) -> str: """ Extracts the data level from the filename using regex patterns. If no data level is found, then the first possible level is returned. Parameters ---------- filename : str The filename from which to extract the data level. possible_levels : List[str] A list of possible data levels to search for. Returns ------- str The extracted data level. """ if len(possible_levels) == 1: # Exact match (e.g. 'raw') return possible_levels[0] # Grouped levels (L0-L3): Extract from filename # Search filename for 'l0', 'l1', etc. found_level = None for lvl in possible_levels: # Simple check: is 'l1' sandwiched by delimiters? if re.search(rf"[_\-.]{lvl}[_\-.]", filename, re.IGNORECASE): found_level = lvl break return found_level if found_level else possible_levels[0] def _extract_time( filename: str, expected_format: Optional[str] = None, mission_config: Optional[dict] = None, ) -> Time: """ Extracts time from the filename using regex patterns. Handles various formats including ISO 8601 and legacy L0 formats. Parameters ---------- filename : str The filename from which to extract the time. expected_format : Optional[str] The expected time format to use for parsing. mission_config : Optional[dict] The configuration dictionary containing mission details. Returns ------- Time The extracted time as an astropy Time object. Raises ------ ValueError If no recognizable time format is found in the filename. ValueError If the extracted time is outside the valid range defined in the mission configuration. """ time_parsers = [ _try_parse_with_expected_format, _try_all_patterns, ] # Use Strategy Pattern to try different parsers for parser in time_parsers: result = parser(filename, expected_format) if result: return _validate_time(result, mission_config=mission_config) raise ValueError(f"No recognizable time format in {filename}") def _try_parse_with_expected_format( filename: str, expected_format: str ) -> Optional[Time]: """ Try to parse time using the expected format. Parameters ---------- filename : str The filename from which to extract the time. expected_format : str The expected time format to use for parsing. Examples -------- >>> _try_parse_with_expected_format("swxsoc_eea_l1_20230115T123045_v1.0.0.cdf", "%Y%m%dT%H%M%S") <Time object: scale='utc' format='datetime' value=2023-01-15 12:30:45> >>> _try_parse_with_expected_format("padre_get_EPS_9_Data_1673785845000.csv", "unix_ms") <Time object: scale='utc' format='isot' value=2023-01-15T12:30:45.000> """ # Return early if no expected format is provided if not expected_format: return None # Get the regex pattern for the expected format pattern = TIME_PATTERNS.get(expected_format) if not pattern: swxsoc.log.warning( f"No regex pattern found for expected time format '{expected_format}'. " "Falling back to all patterns." ) return None # Look for a match in the filename using the expected format match = pattern.search(filename) if not match: swxsoc.log.warning( f"No time string matching expected format '{expected_format}' found in {filename}." ) return None time_str = match.group(0) return _parse_time_string(time_str, expected_format) def _try_all_patterns(filename: str, *args, **kwargs) -> Optional[Time]: """ Try to parse time using all known patterns. Parameters ---------- filename : str The filename from which to extract the time. Returns ------- Time The extracted time as an astropy Time object, or None if not found. Examples -------- >>> _try_all_patterns("swxsoc_eea_l1_20230115T123045_v1.0.0.cdf") <Time object: scale='utc' format='datetime' value=2023-01-15 12:30:45> """ for format_str, pattern in TIME_PATTERNS.items(): match = pattern.search(filename) if match: time_str = match.group(0) parsed_time = _parse_time_string(time_str, format_str) if parsed_time: return parsed_time return None def _parse_time_string(time_str: str, format_str: str) -> Optional[Time]: """ Parse a time string with a specific format. Parameters ---------- time_str : str The time string to parse. format_str : str The format string to use for parsing. Examples -------- >>> _parse_time_string("2023-01-15 12:30:45", "%Y-%m-%d %H:%M:%S") <Time object: scale='utc' format='datetime' value=2023-01-15 12:30:45> >>> _parse_time_string("1673785845000", "unix_ms") <Time object: scale='utc' format='isot' value=2023-01-15T12:30:45.000> >>> _parse_time_string("invalid", "%Y-%m-%d") """ # Special case for unix time if format_str in ("unix_ms", "unix_s"): return _parse_unix_timestamp(time_str, format_str) # Try datetime string formatters try: return Time(datetime.strptime(time_str, format_str)) except ValueError: pass # Fall back to sunpy parser as last resort try: return Time(sunpy.time.parse_time(time_str)) except Exception: return None def _parse_unix_timestamp(time_str: str, format_str: str) -> Time: """ Parse Unix timestamp in milliseconds or seconds. Parameters ---------- time_str : str The Unix timestamp string. format_str : str The format identifier: ``"unix_ms"`` for milliseconds, or ``"unix_s"`` for seconds. Returns ------- Time The parsed time as an astropy Time object. Examples -------- >>> _parse_unix_timestamp("1673785845000", "unix_ms") <Time object: scale='utc' format='isot' value=2023-01-15T12:30:45.000> >>> _parse_unix_timestamp("1673785845", "unix_s") <Time object: scale='utc' format='isot' value=2023-01-15T12:30:45.000> """ divisor = 1000.0 if format_str == "unix_ms" else 1.0 t_unix = Time(int(time_str) / divisor, format="unix") t_unix.format = "isot" # Need to set format to isot for consistency return t_unix def _validate_time(extracted_time: Time, mission_config: Optional[dict] = None) -> Time: """ Validate the extracted time against configured mission constraints. When mission_config is provided, raises ValueError for times outside the valid range. When mission_config is None, issues warnings for suspicious times but does not raise. Parameters ---------- extracted_time : Time The extracted time to validate. mission_config : Optional[dict], optional The configuration dictionary containing mission details with 'min_valid_time' and 'max_valid_time' keys. If None, performs basic validation with warnings only. Returns ------- Time The validated time (same as input). Raises ------ ValueError If mission_config is provided and the extracted time is before the configured minimum valid time (mission_config['min_valid_time']). ValueError If mission_config is provided and the extracted time is after the configured maximum valid time (mission_config['max_valid_time']). """ if mission_config is None: # Fallback to basic validation when no config provided if extracted_time > Time.now(): swxsoc.log.warning(f"Found future time {extracted_time}.") if extracted_time < Time("1970-01-01"): swxsoc.log.warning(f"Found suspiciously old time {extracted_time}.") return extracted_time # Get configured time constraints min_valid_time = mission_config.get("min_valid_time") max_valid_time = mission_config.get("max_valid_time") # Validate minimum time if min_valid_time and extracted_time < min_valid_time: raise ValueError( f"Extracted time {extracted_time} is before mission minimum valid time {min_valid_time}." ) # Validate maximum time if max_valid_time and extracted_time > max_valid_time: raise ValueError( f"Extracted time {extracted_time} is after mission maximum valid time {max_valid_time}." ) return extracted_time
[docs] def parse_science_filename(filepath: str) -> dict: """ Parses a science filename into its constituent properties. Parameters ---------- filepath : str Fully qualified filepath of an input file. Returns ------- dict Parsed fields such as instrument, mode, test, time, level, version, and descriptor. Raises ------ ValueError If mission name or instrument is not recognized, or time format is invalid. """ import swxsoc # setup defaults mission_config = swxsoc.config["mission"] filepath = Path(filepath) filename = filepath.name file_ext = filepath.suffix result = { "instrument": None, "mode": None, "test": False, "time": None, "level": None, "version": None, "descriptor": None, } # Case 1: The file is in a standard format used for archive/science files if file_ext == mission_config["file_extension"]: parsed = _parse_standard_format(filename, mission_config) result.update(parsed) return result # Extract instrument name for file rule matching try: inst_name_raw = _extract_instrument_name(filename, mission_config) mapping = _get_instrument_mapping(mission_config) inst_name = mapping.get(inst_name_raw.lower(), inst_name_raw) result["instrument"] = inst_name except ValueError as e: raise ValueError(f"Error extracting instrument name: {e}") # Check for specific File Rules matched_rule = None mission_rules = mission_config.get("inst_file_rules", {}) inst_rules = mission_rules.get(inst_name, []) for rule in inst_rules: # Check Extension if file_ext.lower() == rule["extension"].lower(): matched_rule = rule break # Case 2: The file is in a non-standard format, but matches a known rule if matched_rule: # Extract Data Level data_level = _extract_data_level(filename, matched_rule["levels"]) # Get the expected time format based on rule definition expected_format = matched_rule.get("time_format") # Parse time using the expected format parsed_time = _extract_time( filename, expected_format=expected_format, mission_config=mission_config ) result.update( { "mission": mission_config["mission_name"].lower(), "level": data_level, "time": parsed_time, } ) # Case 3: The file does not match any known format else: parsed_time = _extract_time(filename, mission_config=mission_config) result.update( { "mission": mission_config["mission_name"].lower(), "instrument": inst_name, # At least we got the instrument from the filename "time": parsed_time, "level": _extract_data_level( filename, mission_config["valid_data_levels"] ), } ) return result
def get_instrument_package(instrument_name: str) -> str: """ Determines the package name of the correct instrument package to use for processing a file based on the instrument name. This is determined through two possibilities: 1. The instrument name is directly mapped to a package in the instrument configuration under "instrument_package". 2. The package is default determined by "{mission__name}_{instrument_name}" Parameters ---------- instrument_name : str The name of the instrument to find the package for. Returns ------- str The name of the package to use for processing files from the specified instrument. Raises ------ ValueError If the instrument name is not recognized as one of the mission's instruments. """ mission_config = swxsoc.config["mission"] # sanitize instrument name for matching (e.g. case insensitive) instrument_name = instrument_name.lower() # check if the instrument is available for the mission if instrument_name not in mission_config["inst_names"]: raise ValueError( f"Instrument, {instrument_name}, is not recognized. Must be one of {list(mission_config['inst_names'])}." ) # get the instrument configuration inst_package = mission_config["inst_packages"].get(instrument_name) if inst_package: # if a package is explicitly defined for the instrument, use it return inst_package else: # otherwise, default to the convention of {mission_name}_{instrument_name} return f"{mission_config['mission_name'].lower()}_{instrument_name.lower()}"