Source code for lephare.data_retrieval

"""
This module provides functionality for downloading and managing data files using
`pooch <https://pypi.org/project/pooch/0.5.2/>`__.
"""

import concurrent.futures
import os
import warnings
from functools import partial
from urllib.parse import urljoin, urlparse

import numpy as np
import pooch
import requests

from lephare import LEPHAREDIR
from lephare.prepare import all_types_to_keymap

DEFAULT_BASE_DATA_URL = "https://raw.githubusercontent.com/lephare-photoz/lephare-data/main/"
DEFAULT_REGISTRY_FILE = "data_registry.txt"
DEFAULT_LOCAL_DATA_PATH = LEPHAREDIR

# If a file is not downloaded the first time, retry this many times
MAX_RETRY_ATTEMPTS = 2

__all__ = [
    "download_all_files",
    "download_file",
    "download_registry_from_github",
    "filter_files_by_prefix",
    "make_default_retriever",
    "make_retriever",
    "read_list_file",
]


[docs] def filter_files_by_prefix(file_path, target_prefixes): """Returns all lines in a file that contain any of the target prefixes. Parameters ---------- file_path : str The path to the file. target_prefixes : list A list of target prefixes to check for in each line. Returns ------- list A list of lines that contain one of the target prefixes. """ matching_lines = [] with open(file_path, "r", encoding="utf-8") as file: for line in file: if any(line.startswith(prefix) for prefix in target_prefixes): matching_lines.append(line.split(" ")[0].strip()) return matching_lines
def _check_registry_is_latest_version(remote_registry_url, local_registry_file): """Checks whether the local registry file is the latest version compared to a remote registry. Parameters ---------- remote_registry_url : str The URL to the remote registry file, used to construct the URL to fetch the remote hash. local_registry_file : str The path to the local registry file whose up-to-date status is to be checked. Returns ------- bool Returns True if the local registry file is up to date, otherwise False. Notes ----- We make the assumption that the hash file for the registry will be stored in the same directory as the registry file, with the same name (sans extension) plus "_hash.sha256". Raises ------ Exception If there is any problem fetching the registry hash file, including network issues, server errors, or other HTTP errors. """ local_registry_hash = pooch.file_hash(local_registry_file, alg="sha256") remote_hash_url = os.path.splitext(remote_registry_url)[0] + "_hash.sha256" remote_hash_response = requests.get(remote_hash_url, headers={"User-Agent": "LePHARE"}, timeout=60) remote_hash_response.raise_for_status() # Raise exceptions for non-200 status codes return remote_hash_response.text.strip() == local_registry_hash
[docs] def download_registry_from_github(url="", outfile=""): """Fetch the contents of a file from a GitHub repository. Parameters ---------- url : str The URL of the registry file. Defaults to a "data_registry.txt" file at DEFAULT_BASE_DATA_URL. outfile : str The path where the file will be saved. Defaults to DEFAULT_REGISTRY_FILE. Raises ------ Exception If there is any problem fetching the registry hash file or full registry file, including network issues, server errors, or other HTTP errors. """ remote_registry_name = "data_registry.txt" # Assign defaults if keywords left blank if url == "": url = urljoin(DEFAULT_BASE_DATA_URL, remote_registry_name) if outfile == "": outfile = DEFAULT_REGISTRY_FILE # If local registry hash matches remote hash, our registry is already up-to-date: if os.path.isfile(outfile) and _check_registry_is_latest_version(url, outfile): print(f"Local registry file is up to date: {outfile}") return # Download the registry file response = requests.get(url, headers={"User-Agent": "LePHARE"}, timeout=120) response.raise_for_status() # Raise exceptions for non-200 status codes with open(outfile, "w", encoding="utf-8") as file: file.write(response.text) print(f"Registry file downloaded and saved as {outfile}.") return response.text
[docs] def read_list_file(list_file, prefix=""): """Reads file names from a list file and returns a list of file paths. Parameters ---------- list_file : str The name of the file containing the list of filenames. Can be local or a URL. prefix : str Optional prefix to add to all file names. When downloaded, file paths must be relative to the "base url," which is the top-level directory. Prefixes will be inferred from list_file paths or urls that contain "sed" or "filt"; otherwise; they should be manually specified. Returns ------- list of str A list of file paths read from the list file. """ file_names = [] # Check if the list_file is a URL if urlparse(list_file).scheme in ("http", "https"): response = requests.get(list_file, headers={"User-Agent": "LePHARE"}, timeout=60) response.raise_for_status() content = response.text else: with open(list_file, "r", encoding="utf-8") as file: content = file.read() # Infer the prefix if not provided # Note: pooch docs specify that registry files use Unix separators # Note as well: this may be phased out, if we decide to specify list # files as containing paths relative to the root dir if prefix == "": if "sed" in list_file: start_index = list_file.find("sed/") end_index = list_file.rfind("/") prefix = list_file[start_index:end_index] elif "filt" in list_file: start_index = list_file.find("filt/") end_index = list_file.rfind("/") prefix = list_file[start_index:end_index] # Read in file for line in content.splitlines(): file_name = line.split()[0].strip() if file_name[0] != "#": file_names.append(os.path.join(prefix, file_name)) return file_names
[docs] def make_default_retriever(): """Create a retriever with the default settings.""" return make_retriever( base_url=DEFAULT_BASE_DATA_URL, registry_file=DEFAULT_REGISTRY_FILE, data_path=DEFAULT_LOCAL_DATA_PATH )
[docs] def make_retriever( base_url=DEFAULT_BASE_DATA_URL, registry_file=DEFAULT_REGISTRY_FILE, data_path=DEFAULT_LOCAL_DATA_PATH, ): """Create a retriever for downloading files. Parameters ---------- base_url : str, optional The base URL for the data files. registry_file : str, optional The path to the registry file that lists the files and their hashes. data_path : str, optional The local path where the files will be downloaded. Returns ------- pooch.Pooch The retriever object for downloading files. """ retriever = pooch.create( base_url=base_url, path=data_path, registry=None, # We're using a registry file instead (set below) ) retriever.load_registry(registry_file) return retriever
def _create_directories_from_files(file_names): """Create directories for the given file names if they do not already exist. This function is for thread safety when downloading files in parallel. Parameters ---------- file_names : list of str List of file names with absolute paths. """ unique_directories = set( os.path.dirname(file_name) for file_name in file_names if os.path.dirname(file_name) ) for directory in unique_directories: if not os.path.exists(directory): os.makedirs(directory) print(f"Created directory: {directory}")
[docs] def download_file(retriever, file_name, ignore_registry=False, downloader=None): """Download a file using the retriever, optionally ignoring the registry. Parameters ---------- retriever : pooch.Pooch The retriever object for downloading files. file_name : str The name of the file to download. ignore_registry : bool If True, download the file without checking its hash against the registry. downloader : pooch.HTTPDownloader The downloader is required to set the user for building on readthedocs Returns ------- str The path to the downloaded file. """ if downloader is None: downloader = pooch.HTTPDownloader(headers={"User-Agent": "LePHARE"}) if ignore_registry: print(f"Downloading without registry: {file_name}...") return pooch.retrieve( url=urljoin(retriever.base_url, file_name), known_hash=None, fname=file_name, path=retriever.path, # The following may now be required by GitHub downloader=downloader, ) else: return retriever.fetch( file_name, downloader=downloader, )
[docs] def download_all_files(retriever, file_names, ignore_registry=False, retry=MAX_RETRY_ATTEMPTS): """Download all files in the given list using the retriever. Parameters ---------- retriever : pooch.Pooch The retriever object for downloading files. file_names : list of str List of file names to download. ignore_registry : bool If True, download the files without checking their hashes against the registry. retry : int Number of times to retry downloading a file if first attempt fails. """ if len(file_names) == 0: print("Download all files called for list of 0 files; done.") return # First make directories, for thread safety absolute_file_names = [os.path.join(retriever.path, file_name) for file_name in file_names] _create_directories_from_files(absolute_file_names) # Now the downloading print(f"Checking/downloading {len(file_names)} files...") with concurrent.futures.ThreadPoolExecutor() as executor: download_fn = partial(download_file, retriever, ignore_registry=ignore_registry) futures = [executor.submit(download_fn, file_name) for file_name in file_names] # We're gathering the completed futures here to make sure we aren't skipping any files, # which seemed to be happening earlier, when using an executor mapping function instead completed_futures = [] for future in concurrent.futures.as_completed(futures): try: completed_futures.append(future.result(timeout=60)) # timeout is in seconds except TimeoutError as e: print(f"Future completed with a timeout exception: {e}") except Exception as e: print(f"Future completed with an exception: {e}") print(f"{len(completed_futures)} completed.") # Finish with some checks on our downloaded files all_files_present = _check_downloaded_files(absolute_file_names, completed_futures) if not all_files_present and retry > 0: print("Retrying download for missing files...") download_all_files(retriever, file_names, ignore_registry=ignore_registry, retry=retry - 1)
def _check_downloaded_files(file_names, completed_futures): """Check if all files have been downloaded successfully and are not empty. Parameters ---------- file_names : list of str List of expected file names. completed_futures : list of str List of file names that were downloaded. Returns ------- bool True if all files are downloaded and non-empty, False otherwise. """ # Check if all files were downloaded missing_files = False potentially_missing_files = set(file_names) - set(completed_futures) for file in potentially_missing_files: if not os.path.exists(file): print("The following file was not downloaded:", file) missing_files = True if missing_files: return False # Check if any downloaded file is empty for file_name in completed_futures: if os.path.getsize(file_name) == 0: print(f"The file {file_name} is empty.") return False print("All files downloaded successfully and are non-empty.") return True def config_to_required_files(keymap, base_url=None): """Take a lephare config and return list of auxiliary files required for run. For the sed lists these must be present in the auxiliary files directory. If local full paths set the code will only retrieve opa, vega, and filters. In addition to the specified files we also add opa and vega files. These are always required. We use the tau opacities by default. Parameters ========== keymap : dict of lephare.keyvalue The dictionary of config keys containing filters etc required. base_url : str Url to overwrite default base. """ keymap = all_types_to_keymap(keymap) if base_url is None: base_url = DEFAULT_BASE_DATA_URL required_files = [] # We always need alloutputkeys.txt required_files += ["alloutputkeys.txt"] # Typical users want the standard output.para required_files += ["examples/output.para"] # Opacity always required opa_list = ["opa/OPACITY.dat"] + [f"opa/tau{i:02d}.out" for i in np.arange(81)] required_files += opa_list # vega always required vega_list = [ "vega/BD+17.sed", "vega/BD+17o4708.sed", "vega/SunLCB.sed", "vega/VegaLCB.sed", "vega/a0v.sed", "vega/a0v_n.sed", ] required_files += vega_list required_files += [f"filt/{f}" for f in keymap["FILTER_LIST"].value.split(",")] # Get user specified sed lists sed_keys = ["STAR_SED", "GAL_SED", "QSO_SED"] for key in sed_keys: try: # If find sed/ in the path, assume the list is present in lephare-data # and try to retreive the files list_file = keymap[key].value # Remove the beginning of the path before sed/ if list_file.find("sed/") > 0: list_file = (list_file[list_file.find("sed/") :]).strip() required_files += [list_file] # Add the url to retrieve the files list_file = base_url + list_file file_names = read_list_file(list_file, prefix=f"sed/{key.split('_')[0]}/") required_files += file_names except KeyError: warnings.warn(f"{key} keyword not set or not present in auxiliary files directory.") # Bethermin12 always required bet_list = "sed/GAL/BETHERMIN12/BETHERMIN12_MOD.list" required_files += [bet_list] required_files += read_list_file(base_url + bet_list, prefix="sed/GAL/") # Get extinction law files ext_list = [f"ext/{f}" for f in keymap["EXTINC_LAW"].value.split(",")] ext_list += ["ext/MW_seaton.dat"] # Appears to be always required required_files += ext_list return required_files def get_auxiliary_data(lephare_dir=LEPHAREDIR, keymap=None, additional_files=None, clone=True): """Get all auxiliary data required to run lephare. This gets all the filters, seds, and other data files. If no keymap is set this will git clone the full repository. Parameters ========== lephare_dir : str The path to the lephare directory for auxiliary files. keymap : dict The config dictionary. additional_files : list Any additional files to be downloaded from the auxiliary file repo. clone : bool If keymap is None, clone=True will git clone the lephare-data repoitory, else it will copy all the lephare-data files over into lephare_dir. This is useful e.g. for developers wanting the exact same code environment as in legacy lephare version. """ # ensure that all values in the keymap are keyword objects if keymap is not None: keymap = all_types_to_keymap(keymap) # Get the registry file file_text = download_registry_from_github() base_url = DEFAULT_BASE_DATA_URL repo_name = "lephare-data" repo_url = f"https://github.com/lephare-photoz/{repo_name}" registry_file = DEFAULT_REGISTRY_FILE data_path = lephare_dir if keymap is None and clone is True: # Assume if filt is present assume everything is. if os.path.isdir(f"{lephare_dir}/filt"): warnings.warn( "Some data appears present. Not downloading." "Consider setting a keymap to download a subset." ) else: # Get the full repository print(f"Downloading all auxiliary data (~1.5Gb) to {lephare_dir}.") print(f"Getting data from {repo_url}.") os.system(f"git clone {repo_url} {lephare_dir}") else: retriever = make_retriever(base_url=base_url, registry_file=registry_file, data_path=data_path) if keymap is not None: file_list = config_to_required_files(keymap) else: file_list = np.array(file_text.split())[0:-1:2] download_all_files(retriever, file_list, ignore_registry=False) if additional_files is not None: download_all_files(retriever, additional_files, ignore_registry=False) os.system(f"rm {registry_file}")