"""
Generate synthetic level 3 data.
PTM - PUNCH Level-3 Polarized Mosaic
CTM - PUNCH Level-3 Clear Mosaic
"""
import os
from datetime import datetime, timedelta
import astropy.units as u
import numpy as np
import scipy.ndimage
from astropy.constants import R_sun
from astropy.io import fits
from astropy.nddata import StdDevUncertainty
from astropy.wcs import WCS
from astropy.wcs.utils import add_stokes_axis_to_wcs, proj_plane_pixel_area
from ndcube import NDCube
from prefect import get_run_logger, task
from punchbowl.data import NormalizedMetadata, write_ndcube_to_fits
from punchbowl.data.punch_io import get_base_file_name
from simpunch.util import (fill_metadata_defaults, get_subdirectory,
update_spacecraft_location)
[docs]
def define_mask(shape: (int, int) = (4096, 4096), distance_value: float =0.68) -> np.ndarray:
"""Define a mask to describe the FOV for low-noise PUNCH data products."""
center = (int(shape[0] / 2), int(shape[1] / 2))
Y, X = np.ogrid[:shape[0], :shape[1]] # noqa: N806
dist_arr = np.sqrt((X - center[0]) ** 2 + (Y - center[1]) ** 2)
return (dist_arr / dist_arr.max()) < distance_value
[docs]
def define_trefoil_mask(rotation_stage: int = 0) -> np.ndarray:
"""Define a mask to describe the FOV for trefoil mosaic PUNCH data products."""
dir_path = os.path.dirname(os.path.realpath(__file__))
return np.load(os.path.join(dir_path, "data/trefoil_mask.npz"))["trefoil_mask"][rotation_stage,:,:]
[docs]
def generate_uncertainty(pdata: NDCube) -> NDCube:
"""Generate realistic uncertainty for PUNCH."""
# Input data is scaled to MSB
# Convert to photons
# Get the pixel scale in degrees
pixel_scale = abs(pdata.wcs.pixel_scale_matrix[0, 0]) * u.deg
# Convert the pixel scale to radians
pixel_scale_rad = pixel_scale.to(u.rad)
# Get the physical size of the Sun
sun_radius = R_sun.to(u.m)
# Calculate the physical area per pixel
pixel_area = proj_plane_pixel_area(pdata.wcs) * (u.deg ** 2)
physical_area_per_pixel = (pixel_area * (sun_radius ** 2) / (pixel_scale_rad ** 2)).to(u.m ** 2)
# Constants
h = 6.62607015e-34 * u.m ** 2 * u.kg / u.s # Planck's constant
c = 2.99792458e8 * u.m / u.s # Speed of light
wavelength = 530 * u.nm # Wavelength in nanometers
exposure_time = 60 * u.s # Exposure in seconds
# Calculate energy of a single photon
energy_per_photon = (h * c / wavelength).to(u.J)
# Now get the energy per unit of irradiance
# Given irradiance in W/m^2/sr
irradiance = 1 * u.W / (u.m ** 2 * u.sr)
# Convert irradiance to energy per second per pixel
energy_per_second = irradiance * 4 * np.pi * u.sr * physical_area_per_pixel
# Convert energy per second to photon count
photon_count = (energy_per_second * exposure_time).to(u.J) / energy_per_photon
photon_array = pdata.data * photon_count
photon_noise = np.sqrt(photon_array)
uncertainty = photon_noise
uncertainty[pdata.data == 0] = np.inf
pdata.uncertainty.array = uncertainty
return pdata
[docs]
def assemble_punchdata_polarized(input_tb: str, input_pb: str,
wcs: WCS, product_code: str,
product_level: str, mask: np.ndarray | None = None) -> NDCube:
"""Assemble a punchdata object with correct metadata."""
with fits.open(input_tb) as hdul:
data_tb = hdul[1].data / 1e8 # the 1e8 comes from the units on FORWARD output
if data_tb.shape == (2048, 2048):
data_tb = scipy.ndimage.zoom(data_tb, 2, order=0)
data_tb[np.where(data_tb == -9.999e-05)] = 0. # noqa: PLR2004
if mask is not None:
data_tb = data_tb * mask
with fits.open(input_pb) as hdul:
data_pb = hdul[1].data / 1e8 # the 1e8 comes from the units on FORWARD output
if data_pb.shape == (2048, 2048):
data_pb = scipy.ndimage.zoom(data_pb, 2, order=0)
data_pb[np.where(data_pb == -9.999e-05)] = 0. # noqa: PLR2004
if mask is not None:
data_pb = data_pb * mask
datacube = np.stack([data_tb, data_pb]).astype("float32")
uncertainty = StdDevUncertainty(np.zeros(datacube.shape))
uncertainty.array[datacube == 0] = 1
meta = NormalizedMetadata.load_template(product_code, product_level)
fill_metadata_defaults(meta)
return NDCube(data=datacube, wcs=wcs, meta=meta, uncertainty=uncertainty)
[docs]
def assemble_punchdata_clear(input_tb: str, wcs: WCS,
product_code: str, product_level: str, mask: np.ndarray | None =None) -> NDCube:
"""Assemble a punchdata object with correct metadata for a clear data product."""
with fits.open(input_tb) as hdul:
data_tb = hdul[1].data / 1e8 # the 1e8 comes from the units on FORWARD output
if data_tb.shape == (2048, 2048):
data_tb = scipy.ndimage.zoom(data_tb, 2, order=0)
data_tb[np.where(data_tb == -9.999e-05)] = 0. # noqa: PLR2004
if mask is not None:
data_tb = data_tb * mask
datacube = data_tb.astype("float32")
uncertainty = StdDevUncertainty(np.zeros(datacube.shape))
uncertainty.array[datacube == 0] = 1
meta = NormalizedMetadata.load_template(product_code, product_level)
fill_metadata_defaults(meta)
return NDCube(data=datacube, wcs=wcs, meta=meta, uncertainty=uncertainty)
[docs]
@task
def generate_l3_ptm(input_tb: str, input_pb: str, path_output: str,
time_obs: datetime, time_delta: timedelta, rotation_stage: int) -> str:
"""Generate PTM - PUNCH Level-3 Polarized Mosaic."""
logger = get_run_logger()
# Define the mosaic WCS (helio)
mosaic_shape = (4096, 4096)
mosaic_wcs = WCS(naxis=2)
mosaic_wcs.wcs.crpix = mosaic_shape[1] / 2 - 0.5, mosaic_shape[0] / 2 - 0.5
mosaic_wcs.wcs.crval = 0, 0
mosaic_wcs.wcs.cdelt = 0.0225, 0.0225
mosaic_wcs.wcs.ctype = "HPLN-ARC", "HPLT-ARC"
mosaic_wcs = add_stokes_axis_to_wcs(mosaic_wcs, 2)
# Mask data to define the field of view
mask = define_trefoil_mask(rotation_stage=rotation_stage)
mask = define_mask(shape=(4096, 4096), distance_value=0.68)
mask = None
# Read data and assemble into PUNCHData object
pdata = assemble_punchdata_polarized(input_tb, input_pb,
mosaic_wcs, product_code="PTM", product_level="3", mask=mask)
logger.info("Data assembled")
# Update required metadata
tstring_start = time_obs.strftime("%Y-%m-%dT%H:%M:%S.000")
tstring_end = (time_obs + time_delta).strftime("%Y-%m-%dT%H:%M:%S.000")
tstring_avg = (time_obs + time_delta / 2).strftime("%Y-%m-%dT%H:%M:%S.000")
pdata.meta["DATE-OBS"] = tstring_avg
pdata.meta["DATE-BEG"] = tstring_start
pdata.meta["DATE-END"] = tstring_end
pdata.meta["DATE-AVG"] = tstring_avg
pdata.meta["DATE"] = (time_obs + time_delta + timedelta(hours=12)).strftime("%Y-%m-%dT%H:%M:%S.000")
pdata.meta["DESCRPTN"] = "Simulated " + pdata.meta["DESCRPTN"].value
pdata.meta["TITLE"] = "Simulated " + pdata.meta["TITLE"].value
pdata = update_spacecraft_location(pdata, time_obs)
pdata = generate_uncertainty(pdata)
out_filename = os.path.join(path_output, get_subdirectory(pdata), get_base_file_name(pdata) + ".fits")
os.makedirs(os.path.dirname(out_filename), exist_ok=True)
logger.info(f"Writing data to {out_filename}")
write_ndcube_to_fits(pdata, out_filename)
logger.info("Data written")
return out_filename
[docs]
@task
def generate_l3_ctm(input_tb: str,
path_output: str,
time_obs: datetime,
time_delta: timedelta,
rotation_stage: int) -> str:
"""Generate CTM - PUNCH Level-3 Clear Mosaic."""
logger = get_run_logger()
# Define the mosaic WCS (helio)
mosaic_shape = (4096, 4096)
mosaic_wcs = WCS(naxis=2)
mosaic_wcs.wcs.crpix = mosaic_shape[1] / 2 - 0.5, mosaic_shape[0] / 2 - 0.5
mosaic_wcs.wcs.crval = 0, 0
mosaic_wcs.wcs.cdelt = 0.0225, 0.0225
mosaic_wcs.wcs.ctype = "HPLN-ARC", "HPLT-ARC"
# Mask data to define the field of view
mask = define_trefoil_mask(rotation_stage=rotation_stage)
mask = define_mask(shape=(4096, 4096), distance_value=0.68)
mask = None
# Read data and assemble into PUNCHData object
pdata = assemble_punchdata_clear(input_tb, mosaic_wcs, product_code="CTM", product_level="3", mask=mask)
logger.info("Data assembled")
# Update required metadata
tstring_start = time_obs.strftime("%Y-%m-%dT%H:%M:%S.000")
tstring_end = (time_obs + time_delta).strftime("%Y-%m-%dT%H:%M:%S.000")
tstring_avg = (time_obs + time_delta / 2).strftime("%Y-%m-%dT%H:%M:%S.000")
pdata.meta["DATE-OBS"] = tstring_avg
pdata.meta["DATE-BEG"] = tstring_start
pdata.meta["DATE-END"] = tstring_end
pdata.meta["DATE-AVG"] = tstring_avg
pdata.meta["DATE"] = (time_obs + time_delta + timedelta(hours=12)).strftime("%Y-%m-%dT%H:%M:%S.000")
pdata.meta["DESCRPTN"] = "Simulated " + pdata.meta["DESCRPTN"].value
pdata.meta["TITLE"] = "Simulated " + pdata.meta["TITLE"].value
pdata = update_spacecraft_location(pdata, time_obs)
pdata = generate_uncertainty(pdata)
out_filename = os.path.join(path_output, get_subdirectory(pdata), get_base_file_name(pdata) + ".fits")
os.makedirs(os.path.dirname(out_filename), exist_ok=True)
logger.info(f"Writing data to {out_filename}")
write_ndcube_to_fits(pdata, out_filename)
logger.info("Data written")
return out_filename