# ruff: noqa: N999
import os
import xml.dom.minidom
import requests
import yaml
from lephare import LEPHAREDIR
from ._lephare import check_first_char, flt
__all__ = [
"FilterSvc",
]
SVO_URL = "http://svo2.cab.inta-csic.es/theory/fps"
[docs]
class FilterSvc:
"""Filter service for retrieving filters from various sources.
This class defines a number of methods for loading and manipulating filters.
"""
@classmethod
[docs]
def from_yaml(cls, yaml_file):
"""Load filter from a yaml file
Parameters
----------
yaml_file : `str`
Path to yaml file
Returns
-------
flt_array : list of `lephare.flt`
Array of wavelengths in Angstrom and filter transmissivity.
"""
config = yaml.load(open(yaml_file), Loader=yaml.BaseLoader)["filters"] # noqa: SIM115
if "calib" in config:
default_calib = config["calib"]
if "trans" in config:
default_trans = config["trans"]
flt_array = []
counter = 0
for entry in config["list"]:
name = entry["name"]
# filters are ordered starting from one (FORTRAN legacy)
counter += 1 # noqa: SIM113
calib = entry.get("calib", default_calib)
trans = entry.get("trans", default_trans)
if name[:4] == "svo:":
flt_obj = cls.from_svo(counter, name[4:], "AB", calib)
else:
flt_obj = cls.from_file(name, counter, trans, calib)
flt_array.append(flt_obj)
return flt_array
@classmethod
[docs]
def from_config(cls, config_file):
"""Load filter from a .para config file
Parameters
----------
config_file : `str`
Path to config file
Returns
-------
flt_array : list of `lephare.flt`
List of filters.
"""
keywords = ["FILTER_REP", "FILTER_LIST", "TRANS_TYPE", "FILTER_CALIB", "FILTER_FILE"]
keymap = {}
with open(config_file) as fstream:
count = 0
while count < len(keywords):
line = fstream.readline()
for key in keywords:
if key in line and check_first_char(line):
keymap[key] = line.split()[1]
count += 1
keymap["FILTER_REP"] = keymap["FILTER_REP"].replace("$LEPHAREDIR", LEPHAREDIR)
filter_list = keymap["FILTER_LIST"].split(",")
filter_calib = keymap["FILTER_CALIB"].split(",")
filter_trans = keymap["TRANS_TYPE"].split(",")
if len(filter_trans) == 1:
filter_trans = len(filter_list) * filter_trans
elif len(filter_trans) != len(filter_list):
raise RuntimeError("FILTER_LIST and FILTER_TRANS do not have the same size")
if len(filter_calib) == 1:
filter_calib = len(filter_list) * filter_calib
elif len(filter_calib) != len(filter_list):
raise RuntimeError("FILTER_LIST and FILTER_CALIB do not have the same size")
flt_array = []
for i in range(len(filter_list)):
name = os.path.join(keymap["FILTER_REP"], filter_list[i])
oneflt = flt(i, name, int(filter_trans[i]), int(filter_calib[i]))
flt_array.append(oneflt)
return flt_array
@classmethod
[docs]
def from_svo(cls, counter, filter_id, system="AB", calib=0):
"""Return filter from SVO
Parameters
----------
counter : `int`
Filter number
filter_id : `str`
Id of filter in SVO format.
system : `str`, optional
Photometric system
calib : `float`, optional
Calibration value. Not currently used.
Returns
-------
res : `lephare.flt`
Filter in native lephare format.
"""
res = FilterSvc.svo_request(counter, filter_id, system)
return res
@classmethod
[docs]
def from_file(cls, filename, counter=-1, trans=0, calib=0):
"""Return filter from SVO
Parameters
----------
filename : `str`
Path to filter file.
counter : `int`
Filter number
trans : `int`, optional
Photometric system.
calib : `int`, optional
Calibration value.
Returns
-------
f : `lephare.flt`
Filter in native lephare format.
"""
name = filename.replace("$LEPHAREDIR", LEPHAREDIR)
f = flt(counter, name, trans, calib)
return f
@classmethod
[docs]
def svo_request(cls, counter, filter_id, system):
"""Retrieve a filter from the SVO
Parameters
----------
counter : `int`
Filter number
filter_id : `str`
Id of filter in SVO format.
system : `str`, optional
Photometric system
Returns
-------
res : `lephare.flt`
Filter in native lephare format.
"""
try:
query = f"{SVO_URL}/fps.php?PhotCalID={filter_id}/{system}"
r = requests.get(query, timeout=5)
except ConnectionRefusedError: # pragma no cover
print(f"request {query} failed due to failure to connect to the server.")
return None
except requests.ConnectTimeout: # pragma no cover
print("Timeout on SVO server")
return None
try:
dd = xml.dom.minidom.parseString(r.content)
except xml.parsers.expat.ExpatError:
print("SVO server down")
return None
# assert query ok
for info in dd.getElementsByTagName("INFO"):
if info.getAttribute("name") == "QUERY_STATUS" and info.getAttribute("value") != "OK":
raise AssertionError(
f"QUERY_STATUS did not return OK; check the filter_id input: {SVO_URL}. "
f"For a list of valid filters, see {filter_id}"
)
# filter params: not used (yet?)
params = dd.getElementsByTagName("PARAM")
param_dict = {"Date": r.headers["Date"]}
for param in params:
param_dict[param.getAttribute("name")] = param.getAttribute("value")
trans_type = int(param_dict["DetectorType"])
name = filter_id.split("/")[1]
# filter curve
##check units
f1, f2 = dd.getElementsByTagName("FIELD")
# these assert are meant to secure against non
# standard entries in the SVO DB
assert f1.getAttribute("unit") == "Angstrom"
assert f1.getAttribute("datatype") == "double"
assert f2.getAttribute("datatype") == "double"
table = dd.getElementsByTagName("TABLEDATA")[0]
data = table.getElementsByTagName("TR")
with open("./" + name, "w") as stream:
for _, d in enumerate(data):
dd = d.getElementsByTagName("TD")
l_val = float(dd[0].childNodes[0].data)
t_val = float(dd[1].childNodes[0].data)
stream.write(f"{l_val} {t_val}\n")
# not super nice, but the only quick solution found to
# avoid exposing trans and clean methods.
flt_obj = flt(counter, "./" + name, trans_type, 0)
# flt_obj.read("dummy")
os.remove(name)
# save the SVO additional info in an instance property
flt_obj.svo_params = param_dict
return flt_obj
@classmethod
[docs]
def from_keymap(cls, keymap):
"""Load filter from a config keymap.
Parameters
----------
keymap : dict of lephare.keyword
The config keymap
Returns
-------
flt_array : list of `lephare.flt`
List of filters.
"""
keymap["FILTER_REP"].value = keymap["FILTER_REP"].value.replace("$LEPHAREDIR", LEPHAREDIR)
filter_list = keymap["FILTER_LIST"].value.split(",")
filter_calib = keymap["FILTER_CALIB"].value.split(",")
filter_trans = keymap["TRANS_TYPE"].value.split(",")
if len(filter_trans) == 1:
filter_trans = len(filter_list) * filter_trans
elif len(filter_trans) != len(filter_list):
raise RuntimeError("FILTER_LIST and FILTER_TRANS do not have the same size")
if len(filter_calib) == 1:
filter_calib = len(filter_list) * filter_calib
elif len(filter_calib) != len(filter_list):
raise RuntimeError("FILTER_LIST and FILTER_CALIB do not have the same size")
flt_array = []
for i in range(len(filter_list)):
name = os.path.join(keymap["FILTER_REP"].value, filter_list[i])
oneflt = flt(i, name, int(filter_trans[i]), int(filter_calib[i]))
flt_array.append(oneflt)
return flt_array