"""Run the entire pipeline backward."""
import os
from collections.abc import Callable
from datetime import datetime, timedelta
import numpy as np
from prefect import flow
from prefect_dask import DaskTaskRunner
from simpunch.level0 import generate_l0_cr, generate_l0_pmzp
from simpunch.level1 import generate_l1_cr, generate_l1_pmzp
from simpunch.level2 import generate_l2_ctm, generate_l2_ptm
from simpunch.level3 import generate_l3_ctm, generate_l3_ptm
[docs]
@flow(task_runner=DaskTaskRunner(address="localhost:8786"))
def generate_flow(file_tb: str,
file_pb: str,
time_obs: datetime,
out_dir: str,
backward_psf_model_path: str | Callable,
wfi_quartic_backward_model_path: str | Callable,
nfi_quartic_backward_model_path: str | Callable,
transient_probability: float = 0.03,
shift_pointing: bool = False) -> list[str]:
"""Generate all the products in the reverse pipeline."""
i = int(os.path.basename(file_tb).split("_")[4][4:]) # TODO: make less specific to the filename
rotation_indices = np.array([0, 0, 1, 1, 2, 2, 3, 3])
rotation_stage: int = rotation_indices[i % 8]
l3_ptm = generate_l3_ptm.submit(file_tb, file_pb, out_dir, time_obs, timedelta(minutes=4), rotation_stage)
l3_ctm = generate_l3_ctm.submit(file_tb, out_dir, time_obs, timedelta(minutes=4), rotation_stage)
l3_ptm = l3_ptm.result()
l3_ctm = l3_ctm.result()
l2_ptm = generate_l2_ptm.submit(l3_ptm, out_dir)
l2_ctm = generate_l2_ctm.submit(l3_ctm, out_dir)
l2_ptm = l2_ptm.result()
l2_ctm = l2_ctm.result()
l1_polarized = []
l1_clear = []
for spacecraft in ["1", "2", "3", "4"]:
l1_polarized.append(generate_l1_pmzp.submit(l2_ptm, out_dir, rotation_stage, spacecraft))
l1_clear.append(generate_l1_cr.submit(l2_ctm, out_dir, rotation_stage, spacecraft))
l1_polarized = [entry.result() for entry in l1_polarized] # this is a list of lists
l1_polarized = [item for sublist in l1_polarized for item in sublist] # so we flatten it
l1_clear = [entry.result() for entry in l1_clear]
l0_pmzp = []
for filename in l1_polarized:
l0_pmzp.append(generate_l0_pmzp.submit(filename, out_dir, backward_psf_model_path, # noqa: PERF401
wfi_quartic_backward_model_path, nfi_quartic_backward_model_path,
transient_probability, shift_pointing))
l0_cr = []
for filename in l1_clear:
l0_cr.append(generate_l0_cr.submit(filename, out_dir, backward_psf_model_path, # noqa: PERF401
wfi_quartic_backward_model_path, nfi_quartic_backward_model_path,
transient_probability, shift_pointing))
l0_pmzp = [entry.result() for entry in l0_pmzp]
l0_cr = [entry.result() for entry in l0_cr]
return [l3_ptm, l3_ctm, l2_ptm, l2_ctm] + l1_polarized + l1_clear + l0_pmzp + l0_cr # noqa: RUF005