Source code for swxsoc.swxdata

"""
Container class for Measurement Data.
"""

from collections import OrderedDict
from copy import deepcopy
from pathlib import Path
from typing import Optional, Union

import astropy
import ndcube
import numpy as np
from astropy import units as u
from astropy.nddata import NDData
from astropy.table import vstack
from astropy.time import Time
from astropy.timeseries import TimeSeries
from ndcube import NDCollection, NDCube

import swxsoc
from swxsoc.util.exceptions import warn_user
from swxsoc.util.schema import SWXSchema

__all__ = ["SWXData"]


[docs] class SWXData: """ A generic object for loading, storing, and manipulating space weather time series data. Parameters ---------- timeseries : `Union[astropy.timeseries.TimeSeries, Dict[str, astropy.timeseries.TimeSeries]]` The time-series data. This can be a single `astropy.timeseries.TimeSeries` object or a dictionary of `str` to `astropy.timeseries.TimeSeries` objects. If a dictionary, one key must be named 'epoch', the primary time axis. If non-index/time columns are included in any of the TimeSeries objects, they must be `~astropy.units.Quantity` arrays. support : `Optional[dict[Union[astropy.units.Quantity, astropy.nddata.NDData]]]` Support data arrays which do not vary with time (i.e. Non-Record-Varying data). spectra : `Optional[ndcube.NDCollection]` One or more `ndcube.NDCube` objects containing spectral or higher-dimensional timeseries data. meta : `Optional[dict]` The metadata describing the data in an ISTP-compliant format. schema : `Optional[SWXSchema]` An optional `~swxsoc.util.schema.SWXSchema` instance for metadata derivation. If not provided, a default `SWXSchema` will be created. Examples -------- >>> import os >>> import numpy as np >>> import astropy.units as u >>> from astropy.timeseries import TimeSeries >>> from ndcube import NDCube, NDCollection >>> from astropy.wcs import WCS >>> from astropy.nddata import NDData >>> import swxsoc >>> from swxsoc.swxdata import SWXData >>> # Set mission to HERMES for this example (eea is a HERMES instrument) >>> os.environ["SWXSOC_MISSION"] = "hermes" >>> swxsoc._reconfigure() >>> # Create a TimeSeries structure >>> data = u.Quantity([1, 2, 3, 4], "gauss", dtype=np.uint16) >>> ts = TimeSeries(time_start="2016-03-22T12:30:31", time_delta=3 * u.s, data={"Bx": data}) >>> # Create a Spectra structure >>> spectra = NDCollection( ... [ ... ( ... "test_spectra", ... NDCube( ... data=np.random.random(size=(4, 10)), ... wcs=WCS(naxis=2), ... meta={"CATDESC": "Test Spectra Variable"}, ... unit="eV", ... ), ... ) ... ] ... ) >>> # Create a Support Structure >>> support_data = { ... "data_mask": NDData(data=np.eye(100, 100, dtype=np.uint16), meta={"CATDESC": "Data Mask", "VAR_TYPE": "metadata"}) ... } >>> # Create Global Metadata Attributes >>> input_attrs = SWXData.global_attribute_template("eea", "l1", "1.0.0") >>> # Create SWXData Object >>> sw_data = SWXData(timeseries=ts, support=support_data, spectra=spectra, meta=input_attrs) Raises ------ ValueError: If the number of columns is less than 2 or the required 'time' column is missing. TypeError: If any column, excluding 'time', is not an `astropy.units.Quantity` object with units. ValueError: If the elements of a `TimeSeries` column are multidimensional TypeError: If any `support` data elements are not type `astropy.nddata.NDData` or `astropy.units.Quantity`. TypeError: If `spectra` is not an `NDCollection` object. References ---------- * `Astropy TimeSeries <https://docs.astropy.org/en/stable/timeseries/index.html/>`_ * `Astropy Quantity and Units <https://docs.astropy.org/en/stable/units/index.html>`_ * `Astropy Time <https://docs.astropy.org/en/stable/time/index.html>`_ * `Astropy NDData <https://docs.astropy.org/en/stable/nddata/>`_ * `Sunpy NDCube and NDCollection <https://docs.sunpy.org/projects/ndcube/en/stable/>`_ * `Space Physics Guidelines for CDF (ISTP) <https://spdf.gsfc.nasa.gov/istp_guide/istp_guide.html>`_ """ def __init__( self, timeseries: Union[ astropy.timeseries.TimeSeries, dict[str, astropy.timeseries.TimeSeries] ], support: Optional[ dict[Union[astropy.units.Quantity, astropy.nddata.NDData]] ] = None, spectra: Optional[ndcube.NDCollection] = None, meta: Optional[dict] = None, schema: Optional[SWXSchema] = None, ): # ================================================ # VALIDATE INPUTS # ================================================ # Verify TimeSeries compliance if isinstance(timeseries, dict): for key, value in timeseries.items(): self._validate_timeseries(value) else: self._validate_timeseries(timeseries) # Global Metadata Attributes are compiled from two places. You can pass in # global metadata throug the `meta` parameter or through the `TimeSeries.meta` # attribute. self._meta = {} if meta is not None and isinstance(meta, dict): self._meta.update(meta) if ( isinstance(timeseries, TimeSeries) and timeseries.meta is not None and isinstance(timeseries.meta, dict) ): self._meta.update(timeseries.meta) # Check Global Metadata Requirements - Require Descriptor, Data_level, Data_Version if "Descriptor" not in self._meta or self._meta["Descriptor"] is None: raise ValueError("'Descriptor' global meta attribute is required.") if "Data_level" not in self._meta or self._meta["Data_level"] is None: raise ValueError("'Data_level' global meta attribute is required.") if "Data_version" not in self._meta or self._meta["Data_version"] is None: raise ValueError("'Data_version' global meta attribute is required.") # Check NRV Data if support is not None: for key in support: if not ( isinstance(support[key], u.Quantity) or isinstance(support[key], NDData) ): raise TypeError( f"Variable '{key}' must be an astropy.units.Quantity or astropy.nddata.NDData object" ) # Check Higher-Dimensional Spectra if spectra is not None: if not isinstance(spectra, NDCollection): raise TypeError("Spectra must be an ndcube.NDCollection object") # ================================================ # CREATE DATA STRUCTURES # ================================================ self._default_timeseries_key = swxsoc.config["general"][ "default_timeseries_key" ] if isinstance(timeseries, dict): self._timeseries = {} for key, value in timeseries.items(): # Copy the TimeSeries self._timeseries[key] = TimeSeries(value, copy=True) # Add any Metadata from the original TimeSeries self._update_timeseries_measurement_meta( timeseries=value, epoch_key=key ) elif isinstance(timeseries, TimeSeries): self._timeseries = { self._default_timeseries_key: TimeSeries(timeseries, copy=True) } self._update_timeseries_measurement_meta( timeseries=timeseries, epoch_key=self._default_timeseries_key, ) # Copy the Non-Record Varying Data if support: self._support = deepcopy(support) else: self._support = {} # Add Support Metadata for key in self._support: self._support[key].meta = self.measurement_attribute_template() if hasattr(support[key], "meta"): self._support[key].meta.update(support[key].meta) # Copy the High-Dimensional Spectra if spectra: self._spectra = spectra else: self._spectra = NDCollection([]) # ================================================ # DERIVE METADATA ATTRIBUTES # ================================================ # Derive Metadata self.schema = schema if schema is not None else SWXSchema() self._derive_metadata() @property def timeseries(self): """ (`astropy.timeseries.TimeSeries` or `dict`) A `TimeSeries` representing one or more measurements as a function of time. If there are multiple `TimeSeries`, a dictionary is returned. """ if len(self._timeseries) > 1: return {key: value for key, value in self._timeseries.items()} else: return self._timeseries[self._default_timeseries_key] @property def support(self): """ (`dict[Union[astropy.units.Quantity, astropy.nddata.NDData]]`) A `dict` containing one or more non-time-varying support variables. """ return self._support @property def spectra(self): """ (`ndcube.NDCollection]`) A `NDCollection` object containing high-dimensional spectra data. """ return self._spectra @property def data(self): """ (`dict`) A `dict` containing each of `timeseries`, `spectra` and `support`. """ return { "timeseries": self._timeseries, "spectra": self._spectra, "support": self._support, } @property def meta(self): """ (`dict`) Global metadata associated with the measurement data. """ return self._meta @property def time(self): """ (`astropy.time.Time`) The times of the measurements. """ t = Time(self._timeseries[self._default_timeseries_key].time) # Set time format to enable plotting with astropy.visualisation.time_support() t.format = "iso" return t @property def time_range(self): """ (`tuple`) The start and end times of the times. """ return (self.time.min(), self.time.max()) def __repr__(self): """ Returns a representation of the `SWXData` class. """ return self.__str__() def __str__(self): """ Returns a string representation of the `SWXData` class. """ str_repr = "SWXData() Object:\n" # Global Attributes/Metedata str_repr += "Global Attrs:\n" for attr_name, attr_value in self._meta.items(): str_repr += f"\t{attr_name}: {attr_value}\n" # TimeSeries Data str_repr += "TimeSeries Data:\n" for epoch_key, ts in self._timeseries.items(): str_repr += f"\tTimeSeries: {epoch_key}\n" for var_name in ts.colnames: str_repr += f"\t\t{var_name}\n" # Support Data str_repr += "Support Data:\n" for var_name in self._support.keys(): str_repr += f"\t{var_name}\n" # Spectra Data str_repr += "Spectra Data:\n" for var_name in self._spectra.keys(): str_repr += f"\t{var_name}\n" return str_repr def __getitem__(self, var_name): """ Get the data for a specific variable. Parameters ---------- var_name : `str` The name of the variable to retrieve. Returns ------- `astropy.units.Quantity` or `astropy.nddata.NDData` or `ndcube.NDCube` The data for the variable. Raises ------ KeyError: If the variable name is not found in the `SWXData` object. """ for epoch_key, ts in self._timeseries.items(): if var_name in ts.columns: return ts[var_name] if var_name in self.support: return self.support[var_name] if var_name in self.spectra: return self.spectra[var_name] else: raise KeyError(f"Variable {var_name} not found in SWxData object.")
[docs] @staticmethod def global_attribute_template( instr_name: str = "", data_level: str = "", version: str = "" ) -> OrderedDict: """ Function to generate a template of the required ISTP-compliant global attributes. Parameters ---------- instr_name : `str` The instrument name. Must be "eea", "nemisis", "merit" or "spani". data_level : `str` The data level of the data. Must be "l0", "l1", "ql", "l2", "l3", "l4" version : `str` Must be of the form X.Y.Z. Returns ------- template : `collections.OrderedDict` A template for required global attributes. """ meta = SWXSchema().global_attribute_template() # Check the Optional Instrument Name if instr_name: if instr_name not in swxsoc.config["mission"]["inst_names"]: raise ValueError( f"Instrument, {instr_name}, is not recognized. Must be one of {swxsoc.config['mission']['inst_names']}." ) # Set the Property meta["Descriptor"] = ( f"{instr_name.upper()}>{swxsoc.config['mission']['inst_to_fullname'][instr_name]}" ) # Check the Optional Data Level if data_level: if data_level not in swxsoc.config["mission"]["valid_data_levels"]: raise ValueError( f"Level, {data_level}, is not recognized. Must be one of {swxsoc.config['mission']['valid_data_levels']}." ) # Set the Property if data_level != "ql": meta["Data_level"] = f"{data_level.upper()}>Level {data_level[1]}" else: meta["Data_level"] = f"{data_level.upper()}>Quicklook" # Check the Optional Data Version if version: # check that version is in the right format with three parts if len(version.split(".")) != 3: raise ValueError( f"Version, {version}, is not formatted correctly. Should be X.Y.Z" ) meta["Data_version"] = version return meta
[docs] @staticmethod def measurement_attribute_template() -> OrderedDict: """ Function to generate a template of the required measurement attributes. Returns ------- template : `collections.OrderedDict` A template for required variable attributes that must be provided. """ return SWXSchema().measurement_attribute_template()
[docs] @staticmethod def get_timeseres_epoch_key(timeseries, var_data, var_meta: dict = None): """ Function to determine the TimeSeries Epoch for a Record-Varying Variable. Parameters ---------- timeseries : `dict[str, astropy.timeseries.TimeSeries]` A dictionary of `str` to `astropy.timeseries.TimeSeries` objects. Each `TimeSeries` object represents a different epoch. var_data : `astropy.units.Quantity` The variable data that we want to find the epoch for. var_meta : `dict`, optional The metadata associated with the variable data. """ # Find the TimeSeries Epoch for this Record-Varying Variable if var_meta is not None and "DEPEND_0" in var_meta: epoch_key = var_meta["DEPEND_0"] else: # Check which epoch key to use potential_epoch_keys = [] for key, ts in timeseries.items(): if hasattr(var_data, "shape"): if len(ts.time) == var_data.shape[0]: potential_epoch_keys.append(key) elif hasattr(var_data, "data"): if len(ts.time) == len(var_data.data): potential_epoch_keys.append(key) if len(potential_epoch_keys) == 0: raise ValueError("No TimeSeries have the same length as the new data.") elif len(potential_epoch_keys) > 1: raise ValueError( "Multiple TimeSeries have the same length as the new data." ) epoch_key = potential_epoch_keys[0] return epoch_key
def _validate_timeseries(self, timeseries: astropy.timeseries.TimeSeries): """ Validate a timeseries. Parameters ---------- timeseries : astropy.timeseries.TimeSeries The timeseries to validate. Raises ------ TypeError If the timeseries is not a `astropy.timeseries.TimeSeries` object or a dictionary of `str` to `astropy.timeseries.TimeSeries` objects. If any column in the timeseries (other than 'time') is not an `astropy.units.Quantity` object. ValueError If the timeseries is empty. If any column in the timeseries is not a one-dimensional measurement. """ if not isinstance(timeseries, astropy.timeseries.TimeSeries): raise TypeError( "timeseries must be a `astropy.timeseries.TimeSeries` object or a dictionary of `str` to `astropy.timeseries.TimeSeries` objects." ) if ( isinstance(timeseries, astropy.timeseries.TimeSeries) and len(timeseries) == 0 ): raise ValueError( "timeseries cannot be empty, must include at least a 'time' column with valid times" ) for colname in timeseries.columns: # Verify that all Measurements are `Quantity` if colname != "time" and not isinstance(timeseries[colname], u.Quantity): raise TypeError( f"Column '{colname}' must be an astropy.units.Quantity object" ) # Verify that the Column is only a single dimension if len(timeseries[colname].shape) > 1: # If there is more than 1 Dimension raise ValueError( f"Column '{colname}' must be a one-dimensional measurement. Split additional dimensions into unique measurements." ) def _update_timeseries_measurement_meta( self, timeseries: TimeSeries, epoch_key: str ): """ Update the metadata for a specific timeseries in the collection. This method updates the metadata for both the time attribute and the measurements in the timeseries. If the time attribute or a measurement has a `meta` attribute, its contents are added to the corresponding attribute in the stored timeseries. Parameters ---------- timeseries : `astropy.timeseries.TimeSeries` The timeseries whose metadata is to be updated. This timeseries should already be part of the collection. epoch_key : str The key identifying the timeseries in the collection. """ # Time Attributes self._timeseries[epoch_key]["time"].meta = OrderedDict() if hasattr(timeseries["time"], "meta"): self._timeseries[epoch_key]["time"].meta.update(timeseries["time"].meta) # Measurement Attributes for col in timeseries.columns: if col != "time": self._timeseries[epoch_key][ col ].meta = self.measurement_attribute_template() if hasattr(timeseries[col], "meta"): self._timeseries[epoch_key][col].meta.update(timeseries[col].meta) def _derive_metadata(self): """ Funtion to derive global and measurement metadata based on a SWXSchema """ # Get Default Metadata for attr_name, attr_value in self.schema.default_global_attributes.items(): self._update_global_attribute(attr_name, attr_value) # Global Attributes for attr_name, attr_value in self.schema.derive_global_attributes(self).items(): self._update_global_attribute(attr_name, attr_value) for epoch_key, ts in self._timeseries.items(): # Time Measurement Attributes for col in ts.columns: for attr_name, attr_value in self.schema.derive_measurement_attributes( self, col ).items(): self._update_measurement_attribute( data_structure=ts, var_name=col, attr_name=attr_name, attr_value=attr_value, ) # Support/ Non-Record-Varying Data for col in self._support: for attr_name, attr_value in self.schema.derive_measurement_attributes( self, col ).items(): self._update_measurement_attribute( data_structure=self._support, var_name=col, attr_name=attr_name, attr_value=attr_value, ) # Spectra/ High-Dimensional Data for col in self._spectra: for attr_name, attr_value in self.schema.derive_measurement_attributes( self, col ).items(): self._update_measurement_attribute( data_structure=self._spectra, var_name=col, attr_name=attr_name, attr_value=attr_value, ) def _update_global_attribute(self, attr_name, attr_value): # If the attribute is set, check if we want to overwrite it if attr_name in self._meta and self._meta[attr_name] is not None: # We want to overwrite if: # 1) The actual value is not the derived value # 2) The schema marks this attribute to be overwriten if ( self._meta[attr_name] != attr_value and self.schema.global_attribute_schema[attr_name]["overwrite"] ): warn_user( f"Overriding Global Attribute {attr_name} : {self._meta[attr_name]} -> {attr_value}" ) self._meta[attr_name] = attr_value # If the attribute is not set, set it else: self._meta[attr_name] = attr_value def _update_measurement_attribute( self, data_structure, var_name, attr_name, attr_value ): if ( attr_name in data_structure[var_name].meta and data_structure[var_name].meta[attr_name] is not None and attr_name in self.schema.variable_attribute_schema["attribute_key"] ): attr_schema = self.schema.variable_attribute_schema["attribute_key"][ attr_name ] if ( data_structure[var_name].meta[attr_name] != attr_value and attr_schema["overwrite"] ): warn_user( f"Overriding Measurement {var_name} Attribute {attr_name} : {data_structure[var_name].meta[attr_name]} -> {attr_value}" ) data_structure[var_name].meta[attr_name] = attr_value else: data_structure[var_name].meta[attr_name] = attr_value
[docs] def add_measurement(self, measure_name: str, data: u.Quantity, meta: dict = None): """ Add a new time-varying scalar measurement (column). Parameters ---------- measure_name: `str` Name of the measurement to add. data: `astropy.units.Quantity` The data to add. Must have the same time stamps as the existing data. meta: `dict`, optional The metadata associated with the measurement. Raises ------ TypeError: If var_data is not of type Quantity. ValueError: If data has more than one dimension """ # Verify that all Measurements are `Quantity` if (not isinstance(data, u.Quantity)) or (not data.unit): raise TypeError( f"Measurement {measure_name} must be type `astropy.units.Quantity` and have `unit` assigned." ) # Verify that the Column is only a single dimension if len(data.shape) > 1: # If there is more than 1 Dimension raise ValueError( f"Column '{measure_name}' must be a one-dimensional measurement. Split additional dimensions into unique measurenents." ) # Find the TimeSeries Epoch for this Record-Varying Variable epoch_key = SWXData.get_timeseres_epoch_key(self._timeseries, data, meta) # Add the new measurement self._timeseries[epoch_key][measure_name] = data # Add any Metadata from the original Quantity self._timeseries[epoch_key][ measure_name ].meta = self.measurement_attribute_template() if hasattr(data, "meta"): self._timeseries[epoch_key][measure_name].meta.update(data.meta) if meta: self._timeseries[epoch_key][measure_name].meta.update(meta) # Derive Metadata Attributes for the Measurement self._derive_metadata()
[docs] def add_timeseries(self, epoch_key: str, timeseries: TimeSeries): """ Add a new TimeSeries object to the collection of epochs. Parameters ---------- epoch_key: `str` The key to identify the new TimeSeries. timeseries: `astropy.timeseries.TimeSeries` The time-series data to add. """ self._validate_timeseries(timeseries) # Check the epoch is not already used if epoch_key in self._timeseries: raise ValueError(f"Epoch key {epoch_key} is already in use.") else: # Add the TimeSeries self._timeseries[epoch_key] = TimeSeries(timeseries, copy=True) # Updata the Metadata self._update_timeseries_measurement_meta( timeseries=timeseries, epoch_key=epoch_key )
[docs] def add_support( self, name: str, data: Union[astropy.units.Quantity, astropy.nddata.NDData], meta: Optional[dict] = None, ): """ Add a new non-time-varying data array. Parameters ---------- name: `str` Name of the data array to add. data: `Union[astropy.units.Quantity, astropy.nddata.NDData]`, The data to add. meta: `Optional[dict]`, optional The metadata associated for the data array. Raises ------ TypeError: If var_data is not of type NDData. """ # Verify that all Measurements are `NDData` if not (isinstance(data, u.Quantity) or isinstance(data, NDData)): raise TypeError(f"Measurement {name} must be type `astropy.nddata.NDData`.") self._support[name] = data # Add any Metadata from the original Quantity or NDData if hasattr(data, "meta"): self._support[name].meta.update(data.meta) else: self._support[name].meta = self.measurement_attribute_template() # Add any Metadata Passed not in the NDData if meta: self._support[name].meta.update(meta) # Derive Metadata Attributes for the Measurement self._derive_metadata()
[docs] def add_spectra(self, name: str, data: NDCube, meta: dict = None): """ Add a new time-varying vector measurement. This include higher-dimensional time-varying data. Parameters ---------- name: `str` Name of the measurement to add. data: `ndcube.NDCube` The data to add. Must have the same time stamps as the existing data. meta: `dict`, optional The metadata associated with the measurement. Raises ------ TypeError: If var_data is not of type NDCube. """ # Verify that all Measurements are `NDCube` if not isinstance(data, NDCube): raise TypeError(f"Measurement {name} must be type `ndcube.NDCube`.") # Add the new measurement if len(self._spectra) == 0: aligned_axes = (0,) self._spectra = NDCollection([(name, data)], aligned_axes) else: # Check to see if we need to maintain the aligned axes if self._spectra.aligned_axes: first_aligned_axes = self._spectra.aligned_axes[ self._spectra._first_key ] aligned_axes = tuple(0 for _ in range(len(first_aligned_axes))) self._spectra.update([(name, data)], aligned_axes) else: self._spectra.update([(name, data)], self._spectra.aligned_axes) # Add any Metadata Passed not in the NDCube if meta: self._spectra[name].meta.update(meta) # Derive Metadata Attributes for the Measurement self._derive_metadata()
[docs] def remove(self, measure_name: str): """ Remove an existing measurement or support data array. Parameters ---------- measure_name: `str` Name of the variable to remove. """ found = False # Check TimeSeries for epoch_key, ts in self._timeseries.items(): if measure_name in ts.columns: self._timeseries[epoch_key].remove_column(measure_name) found = True # Check Support if measure_name in self._support: self._support.pop(measure_name) found = True # Check Spectra elif measure_name in self._spectra: self._spectra.pop(measure_name) found = True # Otherwise Raise and Error if not found: raise ValueError(f"Data for Measurement {measure_name} not found.")
[docs] def plot(self, axes=None, columns=None, subplots=True, **plot_args): """ Plot the measurement data. Parameters ---------- axes : `~matplotlib.axes.Axes`, optional If provided the image will be plotted on the given axes. Defaults to `None` and creates a new axis. columns : `list[str]`, optional If provided, only plot the specified measurements otherwise try to plot them all. subplots : `bool` If set, all columns are plotted in their own plot panel. **plot_args : `dict`, optional Additional plot keyword arguments that are handed to `~matplotlib.axes.Axes`. Returns ------- `~matplotlib.axes.Axes` The plot axes. """ from astropy.visualization import quantity_support, time_support from matplotlib.axes import Axes # Set up the plot axes based on the number of columns to plot axes, columns = self._setup_axes_columns(axes, columns, subplots=subplots) quantity_support() time_support() if subplots: i = 0 if isinstance(axes, Axes): # subplots is true but only one column given iter_axes = [axes] else: iter_axes = axes for this_ax, this_col in zip(iter_axes, columns): if i == 0: this_ax.set_title( f"{self.meta['Mission_group']} {self.meta['Descriptor']} {self.meta['Data_level']}" ) i += 1 this_ax.plot(self.time, self.timeseries[this_col], **plot_args) this_ax.set_ylabel(self.timeseries[this_col].meta["LABLAXIS"]) else: axes.set_title( f"{self.meta['Mission_group']} {self.meta['Descriptor']} {self.meta['Data_level']}" ) for this_col in columns: axes.plot( self.time, self.timeseries[this_col], label=self.timeseries[this_col].meta["LABLAXIS"], **plot_args, ) axes.legend() # Setup the Time Axis self._setup_x_axis(axes) return axes
def _setup_axes_columns(self, axes, columns, subplots=False): """ Validate data for plotting, and get default axes/columns if not passed by the user. Code courtesy of sunpy. """ import matplotlib.pyplot as plt # If no individual columns were input, try to plot all columns if columns is None: columns = list(self.timeseries.columns.copy()) columns.remove("time") # Create Axes or Subplots for displaying the data if axes is None: if not subplots: axes = plt.gca() else: axes = plt.gcf().subplots(ncols=1, nrows=len(columns), sharex=True) return axes, columns @staticmethod def _setup_x_axis(ax): """ Shared code to set x-axis properties. Code courtesy of sunpy. """ import matplotlib.dates as mdates if isinstance(ax, np.ndarray): ax = ax[-1] locator = ax.xaxis.get_major_locator() ax.xaxis.set_major_formatter(mdates.ConciseDateFormatter(locator))
[docs] def append(self, timeseries: TimeSeries): """ Add additional measurements to an existing column. Parameters ---------- timeseries : `astropy.timeseries.TimeSeries` The data to be appended (rows) as a `TimeSeries` object. """ # Verify TimeSeries compliance self._validate_timeseries(timeseries) # Check which epoch key to use selected_epoch_key = SWXData.get_timeseres_epoch_key( self._timeseries, timeseries.time ) # Save Metadata since it is not carried over with vstack metadata_holder = { col: self._timeseries[selected_epoch_key][col].meta for col in self._timeseries[selected_epoch_key].columns } # Vertically Stack the TimeSeries self._timeseries[selected_epoch_key] = vstack( [self._timeseries[selected_epoch_key], timeseries] ) # Add Metadata back to the Stacked TimeSeries for col in self._timeseries[selected_epoch_key].columns: self._timeseries[selected_epoch_key][col].meta = metadata_holder[col] # Re-Derive Metadata self._derive_metadata()
[docs] def save(self, output_path: Path = None, overwrite: bool = False): """ Save the data to a CDF file. Parameters ---------- output_path : `pathlib.Path`, optional Path to save location. Can be: - A directory: saves using Logical_file_id from metadata as filename - A full file path (with .cdf extension): saves using that filename If not provided, saves to the current directory with Logical_file_id. overwrite : `bool` If set, overwrites existing file of the same name. Returns ------- path : `str` A path to the saved file. """ from swxsoc.util.io import CDFHandler handler = CDFHandler() if not output_path: output_path = Path.cwd() output_path = Path(output_path) # Smart logic: detect if output_path is a directory or full file path if output_path.is_dir(): # It's a directory - use logical_file_id for filename file_path = output_path filename = None elif not output_path.suffix: # No suffix - add .cdf and treat as filename file_path = output_path.parent if output_path.parent != Path() else Path.cwd() filename = output_path.name + '.cdf' else: # Has suffix - use as filename file_path = output_path.parent if output_path.parent != Path() else Path.cwd() filename = output_path.name if overwrite: if filename: cdf_file_path = file_path / filename else: cdf_file_path = file_path / (self.meta["Logical_file_id"] + ".cdf") if cdf_file_path.exists(): cdf_file_path.unlink() return handler.save_data(data=self, file_path=file_path, filename=filename)
[docs] @classmethod def load(cls, file_path: Path): """ Load data from a file. Parameters ---------- file_path : `pathlib.Path` A fully specified file path of the data file to load. Returns ------- data : `SWXData` A `SWXData` object containing the loaded data. Raises ------ ValueError: If the file type is not recognized as a file type that can be loaded. """ from swxsoc.util.io import CDFHandler # Determine the file type file_extension = file_path.suffix # Create the appropriate handler object based on file type if file_extension == ".cdf": handler = CDFHandler() else: raise ValueError(f"Unsupported file type: {file_extension}") # Load data using the handler and return a SWXData object timeseries, support, spectra, meta = handler.load_data(file_path) return cls(timeseries=timeseries, support=support, spectra=spectra, meta=meta)