"""
SWXSOC FIDO Client for searching and fetching data from AWS S3.
This module provides the SWXSOCClient class, which implements the sunpy
BaseClient interface for querying SWXSOC data archives.
"""
import os
from datetime import datetime
from pathlib import Path
import astropy.units as u
import boto3
import sunpy.time
from botocore import UNSIGNED
from botocore.client import Config
from botocore.exceptions import ClientError, NoCredentialsError
from dateutil.relativedelta import relativedelta
from parfive import Downloader
from sunpy.net.attr import AttrAnd
from sunpy.net.base_client import BaseClient, QueryResponseTable, convert_row_to_table
import swxsoc
from swxsoc.net.attr import DevelopmentBucket, Instrument, Level, SearchTime, walker
__all__ = ["SWXSOCClient"]
[docs]
class SWXSOCClient(BaseClient):
"""
Client for searching for SWXSOC data on AWS.
This client provides search and fetch functionality for SWXSOC data and is based on the sunpy BaseClient for FIDO.
For more information on the sunpy BaseClient, see: https://docs.sunpy.org/en/stable/generated/api/sunpy.net.base_client.BaseClient.html
Note that AWS buckets may require access keys.
Examples
--------
>>> from swxsoc.net.attr import AttrAnd, SearchTime, Level, Descriptor, Instrument
>>> from swxsoc.net.client import SWXSOCClient
>>> from astropy.time import Time
>>> client = SWXSOCClient()
>>> query = AttrAnd([SearchTime(start=Time("2025-07-10T00:00:00"), end=Time("2025-07-11T00:00:00")),
... Instrument("meddea"),
... Level("l0"),
... Descriptor("housekeeping")])
>>> results = client.search(query) # doctest: +SKIP
"""
size_column = "size"
[docs]
def search(self, query=None):
"""
Searches for data based on the given query.
Parameters
----------
query : AttrAnd
The query object specifying search criteria.
Returns
-------
QueryResponseTable
A table containing the search results.
"""
if query is None:
query = AttrAnd([])
queries = walker.create(query)
swxsoc.log.info(f"Searching with {queries}")
results = []
for query_parameters in queries:
results.extend(self._make_search(query_parameters))
if results == []:
return QueryResponseTable(names=[], rows=[], client=self)
names = [
"instrument",
"mode",
"test",
"time",
"level",
"version",
"descriptor",
"key",
"size",
"bucket",
"etag",
"storage_class",
"last_modified",
]
return QueryResponseTable(names=names, rows=results, client=self)
[docs]
@convert_row_to_table
def fetch(self, query_results, *, path, downloader, **kwargs):
"""
Fetches the files based on query results and queues them up to be downloaded to the specified path by your downloader.
Note: The downloader must be an instance of parfive.Downloader
Parameters
----------
query_results : list
The results of the search query.
path : str
The directory path where files should be saved.
downloader : Downloader
The parfive downloader instance used for fetching files.
"""
if not isinstance(downloader, Downloader):
raise ValueError("Downloader must be an instance of parfive.Downloader")
for row in query_results:
swxsoc.log.info(f"Fetching {row['key']}")
if path is None or path == ".":
path = os.getcwd()
if os.path.exists(path) and not os.path.isdir(path):
raise ValueError(f"Path {path} is not a directory")
filepath = self._make_filename(path, row)
presigned_url = self.generate_presigned_url(row["bucket"], row["key"])
url = (
presigned_url
if presigned_url is not None
else f"https://{row['bucket']}.s3.amazonaws.com/{row['key']}"
)
downloader.enqueue_file(url, filename=filepath)
@classmethod
def _make_filename(cls, path, row):
"""
Creates a filename based on the provided path and row data.
Parameters
----------
path : str
The directory path.
row : dict
The row data containing the file key.
Returns
-------
str
The full file path.
"""
return os.path.join(path, row["key"].split("/")[-1])
[docs]
@staticmethod
def generate_presigned_url(bucket_name, object_key, expiration=3600):
"""
Generates a presigned URL for accessing an object in S3. If credentials are not available
or access is denied, attempts an unsigned request for public access.
Parameters
----------
bucket_name : str
The name of the S3 bucket.
object_key : str
The key of the S3 object.
expiration : int, optional
The expiration time in seconds for the presigned URL. Default is 3600 seconds.
Returns
-------
str or None
The presigned URL if successful, or a direct unsigned URL if public access is allowed.
Otherwise, returns None.
"""
try:
# Attempt to generate a presigned URL with credentials
s3_client = boto3.client("s3")
# Try to list one object to check if credentials are available
s3_client.list_objects_v2(Bucket=bucket_name, MaxKeys=1)
response = s3_client.generate_presigned_url(
"get_object",
Params={"Bucket": bucket_name, "Key": object_key},
ExpiresIn=expiration,
)
return response
except NoCredentialsError:
swxsoc.log.warning("Credentials not available. Trying unsigned access.")
except ClientError as e:
error_code = e.response["Error"]["Code"]
if error_code == "AccessDenied":
swxsoc.log.warning(
f"Access denied to {bucket_name}/{object_key}. Trying unsigned access."
)
else:
swxsoc.log.warning(f"Error generating presigned URL: {e}")
return None
# If credentials are missing or access is denied, try unsigned access
try:
# Attempt to access the object with an unsigned request (public access)
swxsoc.log.info(f"Attempting unsigned access to {bucket_name}/{object_key}")
url = f"https://{bucket_name}.s3.amazonaws.com/{object_key}"
return url
except ClientError as unsigned_error:
print(f"Unsigned access failed: {unsigned_error}")
return None
@classmethod
def _can_handle_query(cls, *query):
"""
Determines if the client can handle the given query based on its attributes.
Parameters
----------
query : tuple
The query attributes to check.
Returns
-------
bool
True if the client can handle the query, otherwise False.
"""
query_attrs = set(type(x) for x in query)
supported_attrs = {SearchTime, Level, Instrument, DevelopmentBucket}
return supported_attrs.issuperset(query_attrs)
@classmethod
def _make_search(cls, query):
"""
Performs a search based on the provided query parameters.
Parameters
----------
query : dict
The query parameters including instrument, levels, time range, and development bucket flag.
Returns
-------
list
A list of rows containing the search results.
"""
from swxsoc.util.util import parse_science_filename
instrument = query.get("instrument")
levels = query.get("level")
start_time = query.get("startTime")
end_time = query.get("endTime")
descriptor = query.get("descriptor")
use_development_bucket = query.get("use_development_bucket")
if levels is not None and not isinstance(levels, list):
levels = [levels]
if levels is not None and len(levels) > 0:
for level in levels:
if level not in swxsoc.config["mission"]["valid_data_levels"]:
raise ValueError(f"Invalid data level: {level}")
else:
levels = swxsoc.config["mission"]["valid_data_levels"]
if start_time is None:
start_time = "2000-01-01"
if end_time is None:
end_time = datetime.now().isoformat()
instrument_buckets = {
f"{swxsoc.config['mission']['inst_to_targetname'][inst]}": (
f"{'dev-' if use_development_bucket else ''}"
f"{swxsoc.config['mission']['mission_name']}-{inst}"
)
for inst in swxsoc.config["mission"]["inst_names"]
}
swxsoc.log.debug(f"Mapping of instruments to S3 buckets: {instrument_buckets}")
if instrument is None or instrument not in instrument_buckets:
swxsoc.log.info(
"No instrument specified or invalid instrument. Searching all instruments."
)
instrument_bucket_to_search = instrument_buckets.values()
else:
swxsoc.log.info(f"Searching for instrument: {instrument}")
instrument_bucket_to_search = [instrument_buckets[instrument]]
swxsoc.log.info(f"Searching in buckets: {instrument_bucket_to_search}")
files_in_s3 = cls.list_files_in_s3(instrument_bucket_to_search)
if levels is not None or start_time is not None or end_time is not None:
swxsoc.log.info(
f"Searching for files with level {levels} between {start_time} and {end_time}"
)
if descriptor:
swxsoc.log.info(f"Searching for files with descriptor: {descriptor}")
prefixes = cls.generate_prefixes(levels, start_time, end_time, descriptor)
matched_files = []
for this_s3_file in files_in_s3:
for this_prefix_list in prefixes:
if all(
this_token in str(Path(this_s3_file["Key"]).parent)
for this_token in this_prefix_list
):
matched_files.append(this_s3_file)
else:
swxsoc.log.info("Searching for all files")
# remove duplicates
unique_matched_files = []
seen = []
for this_file in matched_files:
if this_file["Key"] not in seen:
seen.append(this_file["Key"])
unique_matched_files.append(this_file)
matched_files = unique_matched_files
swxsoc.log.info(
f"Found {len(matched_files)} files in S3 matching search criteria"
)
rows = []
for s3_object in matched_files:
swxsoc.log.debug(f"Processing S3 object: {s3_object}")
try:
info = parse_science_filename(s3_object["Key"])
except ValueError:
info = {}
row = [
info.get("instrument", "unknown"),
info.get("mode", "unknown"),
info.get("test", False),
info.get("time", "unknown"),
info.get("level", "unknown"),
info.get("version", "unknown"),
info.get("descriptor", "unknown"),
s3_object["Key"],
s3_object["Size"] * u.byte,
s3_object["Bucket"],
s3_object["ETag"],
s3_object["StorageClass"],
s3_object["LastModified"],
]
rows.append(row)
return rows
[docs]
@staticmethod
def list_files_in_s3(bucket_names: list) -> list:
"""
Lists all files in the specified S3 buckets. If access is denied, it retries with an unsigned request.
Parameters
----------
bucket_names : list
A list of S3 bucket names.
Returns
-------
list
A list of dictionaries containing metadata about each S3 object.
"""
content = []
s3 = boto3.client("s3")
paginator = s3.get_paginator("list_objects_v2")
for bucket_name in bucket_names:
try:
# Try with authenticated client
pages = paginator.paginate(Bucket=bucket_name)
for page in pages:
for obj in page.get("Contents", []):
metadata = {
"Key": obj["Key"],
"LastModified": sunpy.time.parse_time(obj["LastModified"]),
"Size": obj["Size"],
"ETag": obj["ETag"],
"StorageClass": obj.get("StorageClass", "STANDARD"),
"Bucket": bucket_name,
}
content.append(metadata)
except (ClientError, NoCredentialsError) as e:
swxsoc.log.warning(f"Error accessing bucket {bucket_name}: {e}")
if isinstance(e, NoCredentialsError):
error_code = "NoCredentialsError"
elif isinstance(e, ClientError):
error_code = e.response["Error"]["Code"]
# Retry?
if error_code == "AccessDenied" or error_code == "NoCredentialsError":
swxsoc.log.warning(
f"Access denied to bucket {bucket_name}. Trying unsigned request."
)
# Retry with an unsigned (anonymous) client
try:
unsigned_s3 = boto3.client(
"s3", config=Config(signature_version=UNSIGNED)
)
unsigned_paginator = unsigned_s3.get_paginator(
"list_objects_v2"
)
pages = unsigned_paginator.paginate(Bucket=bucket_name)
for page in pages:
for obj in page.get("Contents", []):
metadata = {
"Key": obj["Key"],
"LastModified": sunpy.time.parse_time(
obj["LastModified"]
),
"Size": obj["Size"],
"ETag": obj["ETag"],
"StorageClass": obj.get("StorageClass", "STANDARD"),
"Bucket": bucket_name,
}
content.append(metadata)
except ClientError:
raise Exception(
f"Unsigned request failed for bucket {bucket_name} (Ensure you have the correct IAM permissions, or are on the VPN)"
)
else:
raise Exception(f"Error accessing bucket {bucket_name}: {e}")
return content
[docs]
@staticmethod
def generate_prefixes(
levels: list, start_time: str, end_time: str, descriptor: str
) -> list:
"""
Generates a list of prefixes based on the level and time range.
Parameters
----------
levels : list
A list of data levels.
start_time : str
The start time in ISO format.
end_time : str
The end time in ISO format.
descriptor : str
The file descriptor
Returns
-------
list
A list of prefixes.
"""
current_time = datetime.fromisoformat(start_time)
end_time = datetime.fromisoformat(end_time)
prefixes = []
while current_time <= end_time:
for level in levels:
these_tokens = [
f"{current_time.year}",
f"{current_time.month:02d}",
level,
]
if descriptor:
these_tokens.append(descriptor)
prefixes.append(these_tokens)
current_time += relativedelta(months=1)
swxsoc.log.debug(f"Generated prefix: {prefixes}")
return prefixes