"""
This module provides general utility functions.
"""
import numbers
import os
import re
import time
from datetime import datetime, timezone
import traceback
from typing import Dict, List, Optional, Union
from pathlib import Path
import astropy.units as u
import boto3
import numpy as np
import requests
import sunpy.net.attrs as a
import sunpy.time
import sunpy.util.net
from astropy.time import Time
from astropy.timeseries import TimeSeries
from botocore import UNSIGNED
from botocore.client import Config
from botocore.exceptions import ClientError, NoCredentialsError
from dateutil.relativedelta import relativedelta
from parfive import Downloader
from sunpy.net.attr import AttrAnd, AttrOr, AttrWalker, SimpleAttr
from sunpy.net.base_client import BaseClient, QueryResponseTable, convert_row_to_table
import swxsoc
__all__ = [
"create_science_filename",
"parse_science_filename",
"SWXSOCClient",
"SearchTime",
"Level",
"Instrument",
"Descriptor",
"DevelopmentBucket",
"record_timeseries",
"get_dashboard_id",
"get_panel_id",
"query_annotations",
"create_annotation",
"remove_annotation_by_id",
"_record_dimension_timestream",
]
# Constants
L0_TIME_FORMATS = [
"%Y%m%dT%H%M%S", # YYYYMMDDTHHMMSS
"%Y%j-%H%M%S", # YYYYJJJ-HHMMSS
"%Y%j_%H%M%S", # YYYYJJJ_HHMMSS
"%y%m%d%H%M%S", # YYMMDDHHMMSS
]
TIME_FORMAT = "%Y%m%dT%H%M%S" # YYYYMMDDTHHMMSS
[docs]
def create_science_filename(
instrument: str,
time: str,
level: str,
version: str,
mode: str = "",
descriptor: str = "",
test: bool = False,
):
"""Return a compliant filename. The format is defined as
{mission}_{inst}_{mode}_{level}{test}_{descriptor}_{time}_v{version}.cdf
This format is only appropriate for data level >= 1.
Parameters
----------
instrument : `str`
The instrument name. Must be one of the following "eea", "nemesis", "merit", "spani"
time : `str` (in isot format) or ~astropy.time
The time
level : `str`
The data level. Must be one of the following "l0", "l1", "l2", "l3", "l4", "ql"
version : `str`
The file version which must be given as X.Y.Z
descriptor : `str`
An optional file descriptor.
mode : `str`
An optional instrument mode.
test : bool
Selects whether the file is a test file.
Returns
-------
filename : `str`
A CDF file name including the given parameters that matches the mission's file naming conventions
Raises
------
ValueError: If the instrument is not recognized as one of the mission's instruments
ValueError: If the data level is not recognized as one of the mission's valid data levels
ValueError: If the data version does not match the mission's data version formatting conventions
ValueError: If the data product descriptor or instrument mode do not match the mission's formatting conventions
"""
test_str = ""
if isinstance(time, str):
time_str = Time(time, format="isot").strftime(TIME_FORMAT)
else:
time_str = time.strftime(TIME_FORMAT)
if instrument not in swxsoc.config["mission"]["inst_names"]:
raise ValueError(
f"Instrument, {instrument}, is not recognized. Must be one of {swxsoc.config['mission']['inst_names']}."
)
if level not in swxsoc.config["mission"]["valid_data_levels"]:
raise ValueError(
f"Level, {level}, is not recognized. Must be one of {swxsoc.config['mission']['valid_data_levels']}."
)
# check that version is in the right format with three parts
if len(version.split(".")) != 3:
raise ValueError(
f"Version, {version}, is not formatted correctly. Should be X.Y.Z"
)
# check that version has integers in each part
for item in version.split("."):
try:
int(item)
except ValueError:
raise ValueError(f"Version, {version}, is not all integers.")
if test is True:
test_str = "test"
# the parse_science_filename function depends on _ not being present elsewhere
if ("_" in mode) or ("_" in descriptor):
raise ValueError(
"The underscore symbol _ is not allowed in mode or descriptor."
)
filename = f"{swxsoc.config['mission']['mission_name']}_{swxsoc.config['mission']['inst_to_shortname'][instrument]}_{mode}_{level}{test_str}_{descriptor}_{time_str}_v{version}"
filename = filename.replace("__", "_") # reformat if mode or descriptor not given
return filename + swxsoc.config["mission"]["file_extension"]
def _get_instrument_mapping(config: dict) -> dict:
"""
Maps instrument shortnames to their full names and additional names.
This is used for parsing filenames and ensuring consistency in naming.
Parameters
----------
config : dict
The configuration dictionary containing mission and instrument details.
Returns
-------
dict
A dictionary mapping shortnames to full names and additional names.
"""
return {
**{s: m for m, s in config["inst_to_shortname"].items()},
**{s: m for m, lst in config["inst_to_extra_inst_names"].items() for s in lst},
}
def _parse_standard_format(filename_components: list, config: dict) -> dict:
"""
Parses the standard filename format and extracts relevant fields.
Handles the following format:
{mission}_{inst}_{mode}_{level}{test}_{descriptor}_{time}_v{version}.{extension}
Parameters
----------
filename_components : list
The components of the filename split by "_".
config : dict
The configuration dictionary containing mission and instrument details.
Returns
-------
dict
A dictionary containing the parsed fields.
Raises
------
ValueError
If the filename does not match the expected format or contains invalid values.
"""
result = {}
mission_name = config["mission_name"]
shortnames = config["inst_shortnames"]
if filename_components[0] != mission_name:
raise ValueError(f"Not a valid mission name: {filename_components[0]}")
if filename_components[1] not in shortnames:
raise ValueError(f"Invalid instrument shortname: {filename_components[1]}")
result["instrument"] = _get_instrument_mapping(config)[filename_components[1]]
result["time"] = Time.strptime(filename_components[-2], TIME_FORMAT)
# Handle optional fields: mode, test, descriptor
result["test"] = (
"test" in filename_components[2] or "test" in filename_components[3]
)
if filename_components[2][:2] not in swxsoc.config["mission"]["valid_data_levels"]:
result["mode"] = filename_components[2]
result["level"] = filename_components[3].replace("test", "")
if len(filename_components) == 7:
result["descriptor"] = filename_components[4]
else:
result["level"] = filename_components[2].replace("test", "")
if len(filename_components) == 6:
result["descriptor"] = filename_components[3]
result["version"] = filename_components[-1].lstrip("v")
return result
def _extract_instrument_name(filename: str, config: dict) -> str:
"""
Extracts the instrument name from the filename using regex patterns.
Parameters
----------
filename : str
The filename from which to extract the instrument name.
config : dict
The configuration dictionary containing mission and instrument details.
Returns
-------
str
The extracted instrument name.
Raises
------
ValueError
If no valid instrument name is found in the filename.
"""
all_inst_names = [
name.lower()
for name in (
config["inst_names"]
+ config["inst_shortnames"]
+ [n for sublist in config["extra_inst_names"] for n in sublist]
)
]
mission_name = config["mission_name"].lower()
pattern = re.compile(
rf"(?:^|[_\-.]|{mission_name})("
+ "|".join(re.escape(name) for name in all_inst_names)
+ r"(?:\d+)?)(?:[_\-.]|$|\d)",
re.IGNORECASE,
)
matches = pattern.findall(filename.lower())
if not matches:
raise ValueError(f"No valid instrument name found in {filename}")
if len(matches) > 1:
raise ValueError(f"Multiple instrument names found: {matches}")
return matches[0]
def _extract_time(filename: str) -> Time:
"""
Extracts time from the filename using regex patterns.
Handles various formats including ISO 8601 and legacy L0 formats.
Parameters
----------
filename : str
The filename from which to extract the time.
Returns
-------
Time
The extracted time as an astropy Time object.
Raises
------
ValueError
If no recognizable time format is found in the filename.
"""
TIME_PATTERNS = [
re.compile(
r"\d{13}"
), # unix time stamps in milliseconds, check for this first so that it does not get confused with other patterns
re.compile(r"\d{8}[-_ T]?\d{6}"), # YYYYMMDD-HHMMSS
re.compile(r"\d{4}-\d{2}-\d{2}[-_ T]\d{2}:\d{2}:\d{2}"), # ISO 8601
re.compile(r"\d{7}[-_]\d{6}"), # Legacy L0 formats
re.compile(r"\d{12}"), # YYMMDDhhmmss
re.compile(r"\d{8}T\d{6}"), # YYYYMMDDTHHMMSS (added this line)
]
for pattern in TIME_PATTERNS:
matches = pattern.search(filename) # Search for time patterns
if matches:
time_str = matches.group(0)
if len(time_str) == 13:
t_unix = Time(int(time_str) / 1000.0, format="unix")
t_unix.format = "isot" # fix the string representation
if t_unix > Time.now():
swxsoc.log.warning(f"Found future time {t_unix}.")
return t_unix
# Try legacy L0 formats first
for fmt in L0_TIME_FORMATS:
try:
return Time(datetime.strptime(time_str, fmt))
except ValueError:
continue
# Fall back to ISO 8601 and others
try:
return Time(sunpy.parse_time(time_str))
except Exception:
continue
raise ValueError(f"No recognizable time format in {filename}")
[docs]
def parse_science_filename(filepath: str) -> dict:
"""
Parses a science filename into its constituent properties.
Parameters
----------
filepath : str
Fully qualified filepath of an input file.
Returns
-------
dict
Parsed fields such as instrument, mode, test, time, level, version, and descriptor.
Raises
------
ValueError
If mission name or instrument is not recognized, or time format is invalid.
"""
import swxsoc
config = swxsoc.config["mission"]
result = {
"instrument": None,
"mode": None,
"test": False,
"time": None,
"level": None,
"version": None,
"descriptor": None,
}
filename = os.path.basename(filepath)
file_name, file_ext = os.path.splitext(filename)
if file_ext == config["file_extension"]:
components = file_name.split("_")
parsed = _parse_standard_format(components, config)
result.update(parsed)
else:
instrument_name = _extract_instrument_name(filename, config)
parsed_time = _extract_time(filename)
from_shortname = _get_instrument_mapping(config)
result.update(
{
"mission": config["mission_name"].lower(),
"instrument": from_shortname.get(
instrument_name.lower(), instrument_name
),
"time": parsed_time,
"level": config["valid_data_levels"][0], # Default to first level
}
)
return result
# ================================================================================================
# SWXSOC FIDO CLIENT
# ================================================================================================
# Initialize the attribute walker
walker = AttrWalker()
# Map sunpy attributes to SWXSOC attributes for easy access
[docs]
class SearchTime(a.Time):
"""
Attribute for specifying the time range for the search.
Attributes
----------
start : `str`
The start time in ISO format.
end : `str`
The end time in ISO format.
"""
[docs]
class Level(a.Level):
"""
Attribute for specifying the data level for the search.
Attributes
----------
value : str
The data level value.
"""
[docs]
class Instrument(a.Instrument):
"""
Attribute for specifying the instrument for the search.
Attributes
----------
value : str
The instrument value.
"""
[docs]
class Descriptor(a.Detector):
"""
Attribute to specify the data type for the search.
Attributes
----------
value : str
The data type
"""
[docs]
class DevelopmentBucket(SimpleAttr):
"""
Attribute for specifying whether to search in the DevelopmentBucket for testing purposes.
Attributes
----------
value : bool
Whether to use the DevelopmentBucket. Defaults to False.
"""
@walker.add_creator(AttrOr)
def create_or(wlk, tree):
"""
Creates an 'AttrOr' object from the provided tree of attributes.
Parameters
----------
wlk : AttrWalker
The AttrWalker instance used for creating the attributes.
tree : AttrOr
The 'AttrOr' tree structure.
Returns
-------
list
A list of created attributes.
"""
results = []
for sub in tree.attrs:
results.append(wlk.create(sub))
return results
@walker.add_creator(AttrAnd)
def create_and(wlk, tree):
"""
Creates an 'AttrAnd' object from the provided tree of attributes.
Parameters
----------
wlk : AttrWalker
The AttrWalker instance used for creating the attributes.
tree : AttrAnd
The 'AttrAnd' tree structure.
Returns
-------
list
A list containing a single dictionary of attributes.
"""
result = {}
for sub in tree.attrs:
wlk.apply(sub, result)
return [result]
@walker.add_applier(SearchTime)
def apply_time(wlk, attr, params):
"""
Applies 'a.Time' attribute to the parameters.
Parameters
----------
wlk : AttrWalker
The AttrWalker instance used for applying the attributes.
attr : a.Time
The 'a.Time' attribute to be applied.
params : dict
The parameters dictionary to be updated.
"""
params.update({"startTime": attr.start.isot, "endTime": attr.end.isot})
@walker.add_applier(Level)
def apply_level(wlk, attr, params):
"""
Applies 'a.Level' attribute to the parameters.
Parameters
----------
wlk : AttrWalker
The AttrWalker instance used for applying the attributes.
attr : a.Level
The 'a.Level' attribute to be applied.
params : dict
The parameters dictionary to be updated.
"""
params.update({"level": attr.value.lower()})
@walker.add_applier(Instrument)
def apply_instrument(wlk, attr, params):
"""
Applies 'a.Instrument' attribute to the parameters.
Parameters
----------
wlk : AttrWalker
The AttrWalker instance used for applying the attributes.
attr : a.Instrument
The 'a.Instrument' attribute to be applied.
params : dict
The parameters dictionary to be updated.
"""
params.update({"instrument": attr.value.upper()})
@walker.add_applier(DevelopmentBucket)
def apply_development_bucket(wlk, attr, params):
"""
Applies 'DevelopmentBucket' attribute to the parameters.
Parameters
----------
wlk : AttrWalker
The AttrWalker instance used for applying the attributes.
attr : DevelopmentBucket
The 'DevelopmentBucket' attribute to be applied.
params : dict
The parameters dictionary to be updated.
"""
params.update({"use_development_bucket": attr.value})
@walker.add_applier(Descriptor)
def apply_descriptor(wlk, attr, params):
"""
Applies 'DevelopmentBucket' attribute to the parameters.
Parameters
----------
wlk : AttrWalker
The AttrWalker instance used for applying the attributes.
attr : DevelopmentBucket
The 'DevelopmentBucket' attribute to be applied.
params : dict
The parameters dictionary to be updated.
"""
params.update({"descriptor": attr.value})
[docs]
class SWXSOCClient(BaseClient):
"""
Client for searching for SWXSOC data on AWS.
This client provides search and fetch functionality for SWXSOC data and is based on the sunpy BaseClient for FIDO.
For more information on the sunpy BaseClient, see: https://docs.sunpy.org/en/stable/generated/api/sunpy.net.base_client.BaseClient.html
Note that AWS buckets may require access keys.
Examples
--------
>>> from swxsoc.util import SWXSOCClient, SearchTime, Level, Descriptor, Instrument
>>> client = SWXSOCClient()
>>> query = AttrAnd([SearchTime(start=Time("2025-07-10T00:00:00"), end=Time("2025-07-11T00:00:00")),
... Instrument("meddea"),
... Level("l0"),
... Descriptor("housekeeping")])
>>> results = client.search(query) # doctest: +SKIP
"""
size_column = "size"
[docs]
def search(self, query=None):
"""
Searches for data based on the given query.
Parameters
----------
query : AttrAnd
The query object specifying search criteria.
Returns
-------
QueryResponseTable
A table containing the search results.
"""
if query is None:
query = AttrAnd([])
queries = walker.create(query)
swxsoc.log.info(f"Searching with {queries}")
results = []
for query_parameters in queries:
results.extend(self._make_search(query_parameters))
if results == []:
return QueryResponseTable(names=[], rows=[], client=self)
names = [
"instrument",
"mode",
"test",
"time",
"level",
"version",
"descriptor",
"key",
"size",
"bucket",
"etag",
"storage_class",
"last_modified",
]
return QueryResponseTable(names=names, rows=results, client=self)
[docs]
@convert_row_to_table
def fetch(self, query_results, *, path, downloader, **kwargs):
"""
Fetches the files based on query results and queues them up to be downloaded to the specified path by your downloader.
Note: The downloader must be an instance of parfive.Downloader
Parameters
----------
query_results : list
The results of the search query.
path : str
The directory path where files should be saved.
downloader : Downloader
The parfive downloader instance used for fetching files.
"""
if not isinstance(downloader, Downloader):
raise ValueError("Downloader must be an instance of parfive.Downloader")
for row in query_results:
swxsoc.log.info(f"Fetching {row['key']}")
if path is None or path == ".":
path = os.getcwd()
if os.path.exists(path) and not os.path.isdir(path):
raise ValueError(f"Path {path} is not a directory")
filepath = self._make_filename(path, row)
presigned_url = self.generate_presigned_url(row["bucket"], row["key"])
url = (
presigned_url
if presigned_url is not None
else f"https://{row['bucket']}.s3.amazonaws.com/{row['key']}"
)
downloader.enqueue_file(url, filename=filepath)
@classmethod
def _make_filename(cls, path, row):
"""
Creates a filename based on the provided path and row data.
Parameters
----------
path : str
The directory path.
row : dict
The row data containing the file key.
Returns
-------
str
The full file path.
"""
return os.path.join(path, row["key"].split("/")[-1])
[docs]
@staticmethod
def generate_presigned_url(bucket_name, object_key, expiration=3600):
"""
Generates a presigned URL for accessing an object in S3. If credentials are not available
or access is denied, attempts an unsigned request for public access.
Parameters
----------
bucket_name : str
The name of the S3 bucket.
object_key : str
The key of the S3 object.
expiration : int, optional
The expiration time in seconds for the presigned URL. Default is 3600 seconds.
Returns
-------
str or None
The presigned URL if successful, or a direct unsigned URL if public access is allowed.
Otherwise, returns None.
"""
try:
# Attempt to generate a presigned URL with credentials
s3_client = boto3.client("s3")
# Try to list one object to check if credentials are available
s3_client.list_objects_v2(Bucket=bucket_name, MaxKeys=1)
response = s3_client.generate_presigned_url(
"get_object",
Params={"Bucket": bucket_name, "Key": object_key},
ExpiresIn=expiration,
)
return response
except NoCredentialsError:
swxsoc.log.warning("Credentials not available. Trying unsigned access.")
except ClientError as e:
error_code = e.response["Error"]["Code"]
if error_code == "AccessDenied":
swxsoc.log.warning(
f"Access denied to {bucket_name}/{object_key}. Trying unsigned access."
)
else:
swxsoc.log.warning(f"Error generating presigned URL: {e}")
return None
# If credentials are missing or access is denied, try unsigned access
try:
# Attempt to access the object with an unsigned request (public access)
swxsoc.log.info(f"Attempting unsigned access to {bucket_name}/{object_key}")
url = f"https://{bucket_name}.s3.amazonaws.com/{object_key}"
return url
except ClientError as unsigned_error:
print(f"Unsigned access failed: {unsigned_error}")
return None
@classmethod
def _can_handle_query(cls, *query):
"""
Determines if the client can handle the given query based on its attributes.
Parameters
----------
query : tuple
The query attributes to check.
Returns
-------
bool
True if the client can handle the query, otherwise False.
"""
query_attrs = set(type(x) for x in query)
supported_attrs = {SearchTime, Level, Instrument, DevelopmentBucket}
return supported_attrs.issuperset(query_attrs)
@classmethod
def _make_search(cls, query):
"""
Performs a search based on the provided query parameters.
Parameters
----------
query : dict
The query parameters including instrument, levels, time range, and development bucket flag.
Returns
-------
list
A list of rows containing the search results.
"""
instrument = query.get("instrument")
levels = query.get("level")
start_time = query.get("startTime")
end_time = query.get("endTime")
descriptor = query.get("descriptor")
use_development_bucket = query.get("use_development_bucket")
if levels is not None and not isinstance(levels, list):
levels = [levels]
if levels is not None and len(levels) > 0:
for level in levels:
if level not in swxsoc.config["mission"]["valid_data_levels"]:
raise ValueError(f"Invalid data level: {level}")
else:
levels = swxsoc.config["mission"]["valid_data_levels"]
if start_time is None:
start_time = "2000-01-01"
if end_time is None:
end_time = datetime.now().isoformat()
instrument_buckets = {
f"{swxsoc.config['mission']['inst_to_targetname'][inst]}": (
f"{'dev-' if use_development_bucket else ''}"
f"{swxsoc.config['mission']['mission_name']}-{inst}"
)
for inst in swxsoc.config["mission"]["inst_names"]
}
swxsoc.log.debug(f"Mapping of instruments to S3 buckets: {instrument_buckets}")
if instrument is None or instrument not in instrument_buckets:
swxsoc.log.info(
"No instrument specified or invalid instrument. Searching all instruments."
)
instrument_bucket_to_search = instrument_buckets.values()
else:
swxsoc.log.info(f"Searching for instrument: {instrument}")
instrument_bucket_to_search = [instrument_buckets[instrument]]
swxsoc.log.info(f"Searching in buckets: {instrument_bucket_to_search}")
files_in_s3 = cls.list_files_in_s3(instrument_bucket_to_search)
if levels is not None or start_time is not None or end_time is not None:
swxsoc.log.info(
f"Searching for files with level {levels} between {start_time} and {end_time}"
)
if descriptor:
swxsoc.log.info(f"Searching for files with descriptor: {descriptor}")
prefixes = cls.generate_prefixes(levels, start_time, end_time, descriptor)
matched_files = []
for this_s3_file in files_in_s3:
for this_prefix_list in prefixes:
if all(
this_token in str(Path(this_s3_file["Key"]).parent)
for this_token in this_prefix_list
):
matched_files.append(this_s3_file)
else:
swxsoc.log.info("Searching for all files")
# remove duplicates
unique_matched_files = []
seen = []
for this_file in matched_files:
if this_file["Key"] not in seen:
seen.append(this_file["Key"])
unique_matched_files.append(this_file)
matched_files = unique_matched_files
swxsoc.log.info(
f"Found {len(matched_files)} files in S3 matching search criteria"
)
rows = []
for s3_object in matched_files:
swxsoc.log.debug(f"Processing S3 object: {s3_object}")
try:
info = parse_science_filename(s3_object["Key"])
except ValueError:
info = {}
row = [
info.get("instrument", "unknown"),
info.get("mode", "unknown"),
info.get("test", False),
info.get("time", "unknown"),
info.get("level", "unknown"),
info.get("version", "unknown"),
info.get("descriptor", "unknown"),
s3_object["Key"],
s3_object["Size"] * u.byte,
s3_object["Bucket"],
s3_object["ETag"],
s3_object["StorageClass"],
s3_object["LastModified"],
]
rows.append(row)
return rows
[docs]
@staticmethod
def list_files_in_s3(bucket_names: list) -> list:
"""
Lists all files in the specified S3 buckets. If access is denied, it retries with an unsigned request.
Parameters
----------
bucket_names : list
A list of S3 bucket names.
Returns
-------
list
A list of dictionaries containing metadata about each S3 object.
"""
content = []
s3 = boto3.client("s3")
paginator = s3.get_paginator("list_objects_v2")
for bucket_name in bucket_names:
try:
# Try with authenticated client
pages = paginator.paginate(Bucket=bucket_name)
for page in pages:
for obj in page.get("Contents", []):
metadata = {
"Key": obj["Key"],
"LastModified": sunpy.time.parse_time(obj["LastModified"]),
"Size": obj["Size"],
"ETag": obj["ETag"],
"StorageClass": obj.get("StorageClass", "STANDARD"),
"Bucket": bucket_name,
}
content.append(metadata)
except (ClientError, NoCredentialsError) as e:
swxsoc.log.warning(f"Error accessing bucket {bucket_name}: {e}")
if isinstance(e, NoCredentialsError):
error_code = "NoCredentialsError"
elif isinstance(e, ClientError):
error_code = e.response["Error"]["Code"]
# Retry?
if error_code == "AccessDenied" or error_code == "NoCredentialsError":
swxsoc.log.warning(
f"Access denied to bucket {bucket_name}. Trying unsigned request."
)
# Retry with an unsigned (anonymous) client
try:
unsigned_s3 = boto3.client(
"s3", config=Config(signature_version=UNSIGNED)
)
unsigned_paginator = unsigned_s3.get_paginator(
"list_objects_v2"
)
pages = unsigned_paginator.paginate(Bucket=bucket_name)
for page in pages:
for obj in page.get("Contents", []):
metadata = {
"Key": obj["Key"],
"LastModified": sunpy.time.parse_time(
obj["LastModified"]
),
"Size": obj["Size"],
"ETag": obj["ETag"],
"StorageClass": obj.get("StorageClass", "STANDARD"),
"Bucket": bucket_name,
}
content.append(metadata)
except ClientError:
raise Exception(
f"Unsigned request failed for bucket {bucket_name} (Ensure you have the correct IAM permissions, or are on the VPN)"
)
else:
raise Exception(f"Error accessing bucket {bucket_name}: {e}")
return content
[docs]
@staticmethod
def generate_prefixes(
levels: list, start_time: str, end_time: str, descriptor: str
) -> list:
"""
Generates a list of prefixes based on the level and time range.
Parameters
----------
levels : list
A list of data levels.
start_time : str
The start time in ISO format.
end_time : str
The end time in ISO format.
descriptor : str
The file descriptor
Returns
-------
list
A list of prefixes.
"""
current_time = datetime.fromisoformat(start_time)
end_time = datetime.fromisoformat(end_time)
prefixes = []
while current_time <= end_time:
for level in levels:
these_tokens = [
f"{current_time.year}",
f"{current_time.month:02d}",
level,
]
if descriptor:
these_tokens.append(descriptor)
prefixes.append(these_tokens)
current_time += relativedelta(months=1)
swxsoc.log.debug(f"Generated prefix: {prefixes}")
return prefixes
[docs]
def record_timeseries(
ts: TimeSeries, ts_name: str = None, instrument_name: str = ""
) -> None:
"""
Record a timeseries of measurements to AWS Timestream for viewing on a dashboard like Grafana.
This function requires AWS credentials with permission to write to the AWS Timestream database.
Parameters
----------
ts : TimeSeries
A timeseries with column data to record. Note that times are assumed to be in UTC.
ts_name : str, optional
The name of the timeseries to record. If None or empty string, defaults to ts.meta['name']
or 'measurement_group'.
instrument_name : str, optional
The instrument name. If not provided or empty, uses ts.meta['INSTRUME'].
Returns
-------
None
Raises
------
ValueError
If instrument_name is invalid or not in the configured mission instrument names.
Notes
-----
Records are written in batches of 100 to comply with Timestream API limits.
Database and table names are automatically prefixed with 'dev-' when not in PRODUCTION environment.
NaN values are skipped entirely and not written to Timestream. When a NaN is encountered in the
timeseries data, that specific measure value is omitted from the record. The function logs the
total count of NaN values skipped across all columns and time points.
Data type inference follows a hierarchical approach to determine the appropriate Timestream type:
- **BOOLEAN**: Values of type `bool` or `np.bool_` are stored as BOOLEAN type with lowercase
string representation ("true" or "false") as required by Timestream.
- **DOUBLE**: Numeric values (instances of `numbers.Number`) are stored as DOUBLE type.
- **VARCHAR**: All other values default to VARCHAR type for text/string storage.
The boolean check is performed first since `bool` is a subclass of `int` in Python. This ensures
boolean flags are correctly identified and not mistakenly stored as numeric DOUBLE values.
"""
timestream_client = boto3.client("timestream-write", region_name="us-east-1")
# Get mission name swxsoc config
mission_name = swxsoc.config["mission"]["mission_name"]
# Validate Instrument name
instrument_name = (
instrument_name.lower()
if "INSTRUME" not in ts.meta
else ts.meta["INSTRUME"].lower()
)
if instrument_name == "" or instrument_name is None:
error = f"Invalid instrument name: {instrument_name}. Must be one of {swxsoc.config['mission']['inst_names']}."
swxsoc.log.error(error)
raise ValueError(error)
# Validate Timeseries name
if ts_name is None or ts_name == "":
ts_name = ts.meta.get("name", "measurement_group")
# Get the Database and Table names based on Dev / Prod environment
database_name = f"{mission_name}_sdc_aws_logs"
table_name = f"{mission_name}_measures_table"
if os.getenv("LAMBDA_ENVIRONMENT") != "PRODUCTION":
database_name = f"dev-{database_name}"
table_name = f"dev-{table_name}"
dimensions = [
{"Name": "mission", "Value": mission_name},
{"Name": "source", "Value": os.getenv("LAMBDA_ENVIRONMENT", "DEVELOPMENT")},
{"Name": "instrument", "Value": instrument_name},
]
# Create a list to hold all records to be written
records = []
total_nan_count = 0
# Loop over each time point in the timeseries, creating a record for each
for i, time_point in enumerate(ts.time):
measure_record = {
"Time": str(int(time_point.to_datetime().timestamp() * 1000)),
"Dimensions": dimensions,
"MeasureName": ts_name,
"MeasureValueType": "MULTI",
"MeasureValues": [],
}
for this_col in ts.colnames:
if this_col == "time": # skip the time column
continue
if len(ts[this_col].shape) == 1: # usual case, a single value in the column
# Handle both Quantity and regular values
if isinstance(ts[this_col], u.Quantity):
measure_unit = ts[this_col].unit
value = ts[this_col].value[i]
else:
measure_unit = ""
value = ts[this_col][i]
# Skip adding NaN values to the record
if isinstance(value, numbers.Number) and np.isnan(value):
total_nan_count += 1
continue
# Determine the appropriate Timestream data type
if isinstance(value, (bool, np.bool_)):
measure_type = "BOOLEAN"
measure_value = str(
value
).lower() # Timestream expects "true" or "false"
elif isinstance(value, numbers.Number):
measure_type = "DOUBLE"
measure_value = str(value)
else:
measure_type = "VARCHAR"
measure_value = str(value)
measure_record["MeasureValues"].append(
{
"Name": (
f"{this_col}_{measure_unit}" if measure_unit else this_col
),
"Value": measure_value,
"Type": measure_type,
}
)
else: # the values in the timeseries are arrays
values = ts[this_col][i]
if isinstance(values, u.Quantity):
values = values.value # remove the unit
values = values.flatten()
# Loop over each value in the array and add to MeasureValues
for i, value in enumerate(values):
# Skip adding NaN values to the record
if isinstance(value, numbers.Number) and np.isnan(value):
total_nan_count += 1
continue
# Determine the appropriate Timestream data type for array values
if isinstance(value, (bool, np.bool_)):
measure_type = "BOOLEAN"
measure_value = str(
value
).lower() # Timestream expects "true" or "false"
elif isinstance(value, numbers.Number):
measure_type = "DOUBLE"
measure_value = str(float(value))
else:
measure_type = "VARCHAR"
measure_value = str(value)
measure_record["MeasureValues"].append(
{
"Name": f"{this_col}_val{i}",
"Value": measure_value,
"Type": measure_type,
}
)
# Only add the record if there are MeasureValues to write
if measure_record["MeasureValues"]:
records.append(measure_record)
else:
swxsoc.log.debug(
f"Skipping record at time {time_point} for {ts_name} due to all NaN values."
)
# Log total NaN values skipped
if total_nan_count > 0:
swxsoc.log.info(f"Skipped {total_nan_count} NaN values in {ts_name}")
# Process records in batches of 100 to avoid exceeding the Timestream API limit
batch_size = 100
for start in range(0, len(records), batch_size):
chunk = records[start : start + batch_size] # noqa: E203
try:
result = timestream_client.write_records(
DatabaseName=database_name,
TableName=table_name,
Records=chunk,
)
swxsoc.log.info(
f"Successfully wrote {len(chunk)} {ts_name} records to Timestream: {database_name}/{table_name}, "
f"writeRecords Status: {result['ResponseMetadata']['HTTPStatusCode']}"
)
except timestream_client.exceptions.RejectedRecordsException as err:
swxsoc.log.error(f"Failed to write records to Timestream: {err}")
for rr in err.response["RejectedRecords"]:
swxsoc.log.info(f"Rejected Index {rr['RecordIndex']}: {rr['Reason']}")
if "ExistingVersion" in rr:
swxsoc.log.info(
f"Rejected record existing version: {rr['ExistingVersion']}"
)
except Exception as err:
swxsoc.log.error(f"Failed to write to Timestream: {err}")
# Log Stack trace for debugging
swxsoc.log.error(traceback.format_exc())
[docs]
def _record_dimension_timestream(
dimensions: list,
instrument_name: str = None,
measure_name: str = "timestamp",
measure_value: any = None,
measure_value_type: str = "DOUBLE",
timestamp: str = None,
) -> None:
"""
Record a single measurement to an `AWS timestream <https://docs.aws.amazon.com/timestream/>`_ for viewing on a dashboard such as Grafana.
.. warning::
This function requires AWS credentials with permission to write to the AWS timestream database.
:param dimensions: A list of dimensions to record. Each dimension should be a dictionary with 'Name' and 'Value' keys.
:type dimensions: list[dict]
:param instrument_name: Optional. Name of the instrument to add as a dimension. Defaults to None.
:type instrument_name: str, optional
:param measure_name: The name of the measure being recorded. Defaults to "timestamp".
:type measure_name: str
:param measure_value: The value of the measure being recorded. Defaults to the current UTC timestamp if not provided.
:type measure_value: any, optional
:param measure_value_type: The type of the measure value (e.g., "DOUBLE", "BIGINT"). Defaults to "DOUBLE".
:type measure_value_type: str
:param timestamp: The timestamp for the record in milliseconds. Defaults to the current time if not provided.
:type timestamp: str, optional
:return: None
"""
timestream_client = boto3.client("timestream-write", region_name="us-east-1")
# Use current time in milliseconds if no timestamp is provided
if not timestamp:
timestamp = int(time.time() * 1000)
# Default measure_value to current UTC timestamp if not provided
utc_now = datetime.now(timezone.utc)
if measure_value is None:
measure_value = str(utc_now.timestamp())
swxsoc.log.info(f"Using timestamp: {timestamp}")
# Lowercase instrument name for consistency if provided
if instrument_name:
instrument_name = instrument_name.lower()
# Add instrument_name as a dimension if provided
if instrument_name and instrument_name in swxsoc.config["mission"]["inst_names"]:
dimensions.append({"Name": "InstrumentName", "Value": instrument_name})
else:
swxsoc.log.info(
"No valid instrument name provided. Skipping instrument dimension."
)
try:
# Get mission name from environment or default to 'hermes'
mission_name = swxsoc.config["mission"]["mission_name"]
# Define database and table names based on mission and environment
database_name = f"{mission_name}_sdc_aws_logs"
table_name = f"{mission_name}_measures_table"
if os.getenv("LAMBDA_ENVIRONMENT") != "PRODUCTION":
database_name = f"dev-{database_name}"
table_name = f"dev-{table_name}"
record = {
"Time": str(timestamp),
"Dimensions": dimensions,
"MeasureName": measure_name,
"MeasureValue": str(measure_value),
"MeasureValueType": measure_value_type,
}
# Write records to Timestream
timestream_client.write_records(
DatabaseName=database_name,
TableName=table_name,
Records=[record],
)
swxsoc.log.info(
f"Successfully wrote record {record} to Timestream: {database_name}/{table_name}"
)
except Exception as e:
swxsoc.log.error(f"Failed to write to Timestream: {e}")
def _to_milliseconds(dt: datetime) -> int:
"""
Converts a datetime object to milliseconds since epoch.
Args:
dt (datetime): Datetime object to convert.
Returns:
int: Milliseconds since epoch.
"""
if isinstance(dt, Time):
# Convert astropy Time object to a standard datetime object in UTC
dt = dt.to_datetime(timezone=None) # Convert to naive datetime in UTC
return int(dt.timestamp() * 1000)
return int(dt.timestamp() * 1000)
[docs]
def get_dashboard_id(
dashboard_name: str, mission_dashboard: Optional[str] = None
) -> Optional[int]:
"""
Retrieves the dashboard UID by its name. Issues a warning if multiple dashboards with the same name are found.
Args:
dashboard_name (str): Name of the dashboard to retrieve.
Returns:
Optional[int]: The UID of the dashboard, or None if not found.
"""
try:
# Set the base URL and API key for Grafana Annotations API
# You need to set the GRAFANA_API_KEY environment variables to use this feature
API_KEY = os.environ.get("GRAFANA_API_KEY", None)
HEADERS = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}
BASE_URL = (
f"https://grafana.{swxsoc.config['mission']['mission_name']}.swsoc.smce.nasa.gov"
if not mission_dashboard
else f"https://grafana.{mission_dashboard}.swsoc.smce.nasa.gov"
)
response = requests.get(
f"{BASE_URL}/api/search", headers=HEADERS, params={"query": dashboard_name}
)
response.raise_for_status()
dashboards = response.json()
except requests.exceptions.HTTPError as e:
swxsoc.log.error(f"Failed to retrieve dashboards: {e}")
return None
except requests.exceptions.ConnectionError as e:
swxsoc.log.error(f"Failed to retrieve panels for dashboard: {e}")
return None
matching_dashboards = [
dashboard
for dashboard in dashboards
if "title" in dashboard and dashboard["title"] == dashboard_name
]
if len(matching_dashboards) == 0:
swxsoc.log.warning(
f"Dashboard with title '{dashboard_name}' not found. Annotation will be created without a dashboard."
)
if len(matching_dashboards) > 1:
swxsoc.log.warning(
f"Multiple dashboards with title '{dashboard_name}' found. "
f"Using the first matching dashboard UID ({matching_dashboards[0]['uid']}). Consider using unique dashboard titles."
)
return matching_dashboards[0]["uid"] if matching_dashboards else None
[docs]
def get_panel_id(
dashboard_id: int, panel_name: str, mission_dashboard: Optional[str] = None
) -> Optional[int]:
"""
Retrieves the panel ID by dashboard UID and panel name. Issues a warning if multiple panels with the same name are found.
Args:
dashboard_id (int): UID of the dashboard.
panel_name (str): Name of the panel to retrieve.
Returns:
Optional[int]: The ID of the panel, or None if not found.
"""
try:
# Set the base URL and API key for Grafana Annotations API
# You need to set the GRAFANA_API_KEY environment variables to use this feature
API_KEY = os.environ.get("GRAFANA_API_KEY", None)
HEADERS = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}
BASE_URL = (
f"https://grafana.{swxsoc.config['mission']['mission_name']}.swsoc.smce.nasa.gov"
if not mission_dashboard
else f"https://grafana.{mission_dashboard}.swsoc.smce.nasa.gov"
)
response = requests.get(
f"{BASE_URL}/api/dashboards/uid/{dashboard_id}", headers=HEADERS
)
response.raise_for_status()
panels = response.json().get("dashboard", {}).get("panels", [])
except requests.exceptions.HTTPError as e:
swxsoc.log.error(
f"Failed to retrieve panels for dashboard ID {dashboard_id}: {e}"
)
return None
except requests.exceptions.ConnectionError as e:
swxsoc.log.error(
f"Failed to retrieve panels for dashboard ID {dashboard_id}: {e}"
)
return None
matching_panels = [panel for panel in panels if panel["title"] == panel_name]
if len(matching_panels) == 0:
swxsoc.log.warning(
f"Panel with title '{panel_name}' not found in dashboard ID {dashboard_id}. Annotation will be created without a panel."
)
if len(matching_panels) > 1:
swxsoc.log.warning(
f"Multiple panels with title '{panel_name}' found in dashboard ID {dashboard_id}. "
f"Using the first matching panel ID ({matching_panels[0]['id']}). Consider using unique panel titles."
)
return matching_panels[0]["id"] if matching_panels else None
[docs]
def query_annotations(
start_time: datetime,
end_time: Optional[datetime] = None,
tags: Optional[List[str]] = None,
limit: Optional[int] = 100,
dashboard_id: Optional[int] = None,
panel_id: Optional[int] = None,
dashboard_name: Optional[str] = None,
panel_name: Optional[str] = None,
mission_dashboard: Optional[str] = None,
) -> List[Dict[str, Union[str, int]]]:
"""
Queries annotations within a specific timeframe with optional filters for tags, dashboard, and panel names.
Args:
start_time (datetime): Start time of the query in UTC.
end_time (Optional[datetime]): End time of the query; defaults to start_time if None.
tags (Optional[List[str]]): List of tags to filter the annotations.
limit (Optional[int]): Maximum number of annotations to retrieve.
dashboard_id (Optional[int]): UID of the dashboard to filter annotations.
panel_id (Optional[int]): ID of the panel to filter annotations.
dashboard_name (Optional[str]): Name of the dashboard to look up UID if `dashboard_id` is not provided.
panel_name (Optional[str]): Name of the panel to look up ID if `panel_id` is not provided.
Returns:
List[Dict[str, Union[str, int]]]: List of annotations matching the query criteria.
"""
# Look up dashboard and panel IDs if names are provided
if dashboard_name and not dashboard_id:
dashboard_id = get_dashboard_id(dashboard_name, mission_dashboard)
if dashboard_id and panel_name and not panel_id:
panel_id = get_panel_id(dashboard_id, panel_name, mission_dashboard)
if not end_time:
end_time = start_time
params = {
"from": _to_milliseconds(start_time),
"to": _to_milliseconds(end_time),
"limit": limit,
}
if tags:
params["tags"] = tags
if dashboard_id:
params["dashboardUID"] = dashboard_id
if panel_id:
params["panelId"] = panel_id
try:
# Set the base URL and API key for Grafana Annotations API
# You need to set the GRAFANA_API_KEY environment variables to use this feature
API_KEY = os.environ.get("GRAFANA_API_KEY", None)
HEADERS = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}
BASE_URL = (
f"https://grafana.{swxsoc.config['mission']['mission_name']}.swsoc.smce.nasa.gov"
if not mission_dashboard
else f"https://grafana.{mission_dashboard}.swsoc.smce.nasa.gov"
)
response = requests.get(
f"{BASE_URL}/api/annotations", headers=HEADERS, params=params
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
swxsoc.log.error(f"Failed to query annotations: {e}")
return []
except requests.exceptions.ConnectionError as e:
swxsoc.log.error(
f"Failed to retrieve panels for dashboard ID {dashboard_id}: {e}"
)
return []
[docs]
def create_annotation(
start_time: datetime,
text: str,
tags: List[str],
end_time: Optional[datetime] = None,
dashboard_id: Optional[int] = None,
panel_id: Optional[int] = None,
dashboard_name: Optional[str] = None,
panel_name: Optional[str] = None,
mission_dashboard: Optional[str] = None,
overwrite: bool = False,
) -> Dict[str, Union[str, int]]:
"""
Creates a new annotation for a specified event or time period, with optional filtering by dashboard and panel names.
Args:
start_time (datetime): Start time of the annotation in UTC.
text (str): Annotation text to display.
tags (List[str]): List of tags for categorizing the annotation.
end_time (Optional[datetime]): End time of the annotation, if applicable.
dashboard_id (Optional[int]): UID of the dashboard to associate the annotation.
panel_id (Optional[int]): ID of the panel to associate the annotation.
dashboard_name (Optional[str]): Name of the dashboard to look up UID if `dashboard_id` is not provided.
panel_name (Optional[str]): Name of the panel to look up ID if `panel_id` is not provided.
Returns:
Dict[str, Union[str, int]]: The created annotation data.
"""
# Look up dashboard and panel IDs if names are provided
if dashboard_name and not dashboard_id:
dashboard_id = get_dashboard_id(dashboard_name, mission_dashboard)
if dashboard_id and panel_name and not panel_id:
panel_id = get_panel_id(dashboard_id, panel_name, mission_dashboard)
# Overwrite functionality: query and remove existing identical annotations
if overwrite:
swxsoc.log.info("Overwriting existing annotations.")
existing_annotations = query_annotations(
start_time=start_time,
end_time=end_time or start_time,
tags=tags,
dashboard_id=dashboard_id,
panel_id=panel_id,
mission_dashboard=mission_dashboard,
)
for annotation in existing_annotations:
if annotation.get("text") == text:
annotation_id = annotation.get("id")
if annotation_id:
removed = remove_annotation_by_id(annotation_id, mission_dashboard)
if removed:
swxsoc.log.info(
f"Removed existing annotation with ID {annotation_id}."
)
payload = {
"time": _to_milliseconds(start_time),
"text": text,
"tags": tags,
}
if end_time:
payload["timeEnd"] = _to_milliseconds(end_time)
if dashboard_id:
payload["dashboardUID"] = dashboard_id
if panel_id:
payload["panelId"] = panel_id
try:
# Set the base URL and API key for Grafana Annotations API
# You need to set the GRAFANA_API_KEY environment variables to use this feature
API_KEY = os.environ.get("GRAFANA_API_KEY", None)
HEADERS = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}
BASE_URL = (
f"https://grafana.{swxsoc.config['mission']['mission_name']}.swsoc.smce.nasa.gov"
if not mission_dashboard
else f"https://grafana.{mission_dashboard}.swsoc.smce.nasa.gov"
)
response = requests.post(
f"{BASE_URL}/api/annotations", headers=HEADERS, json=payload
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
swxsoc.log.error(f"Failed to create annotation: {e}")
return {}
except requests.exceptions.ConnectionError as e:
swxsoc.log.error(
f"Failed to retrieve panels for dashboard ID {dashboard_id}: {e}"
)
return {}
[docs]
def remove_annotation_by_id(
annotation_id: int, mission_dashboard: Optional[str] = None
) -> bool:
"""
Deletes an annotation by its ID.
Args:
annotation_id (int): The ID of the annotation to delete.
Returns:
bool: True if the annotation was successfully deleted, False otherwise.
"""
try:
# Set the base URL and API key for Grafana Annotations API
# You need to set the GRAFANA_API_KEY environment variables to use this feature
API_KEY = os.environ.get("GRAFANA_API_KEY", None)
HEADERS = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}
BASE_URL = (
f"https://grafana.{swxsoc.config['mission']['mission_name']}.swsoc.smce.nasa.gov"
if not mission_dashboard
else f"https://grafana.{mission_dashboard}.swsoc.smce.nasa.gov"
)
full_url = f"{BASE_URL}/api/annotations/{annotation_id}"
response = requests.delete(full_url, headers=HEADERS)
response.raise_for_status()
return (
response.status_code == 200
) # Returns True if annotation was deleted successfully (204 No Content)
except requests.exceptions.HTTPError as e:
swxsoc.log.error(
f"Failed to remove annotation with ID {annotation_id}: {e} [swxsoc.util.util]"
)
return False
except requests.exceptions.ConnectionError as e:
swxsoc.log.error(f"Failed to connect to the server: {e}")
return False