Source code for vmpt.mpt_io

"""Import MSA plans from APT/MPT exports (JSON plan files, shutter CSVs, .aptx archives)."""

from __future__ import annotations

import io
import json
import re
import tempfile
import urllib.request
import zipfile
from dataclasses import dataclass
from pathlib import Path
from typing import Optional

from vmpt.coords import V3_IDL_Y_ANGLE
from vmpt.empt_io import OpenShutter

# STScI public APT downloader URL pattern. e.g. .../apt/1208/ → 1208.aptx.
APT_URL_TEMPLATE = "https://www.stsci.edu/jwst-program-info/download/jwst/apt/{program_id}/"


[docs] @dataclass class MPTSlitlet: """A single slitlet entry from MPT JSON: starting shutter (q, s, d) and slitlet height h. Unfolds to h shutters at rows s, s+1, …, s+h-1.""" q: int s: int d: int h: int = 1 primary_id: Optional[int] = None # primary target source ID, if known
[docs] @dataclass class MPTPlan: """One MSA plan / config from an APT MPT JSON file.""" name: str aperture_pa_deg: float # APT's "aperturePA" — NIRSpec APA v3_pa_deg: float # derived: APA - V3IdlYAngle (mod 360) ra_deg: Optional[float] = None # from primary exposure if present dec_deg: Optional[float] = None grating: Optional[str] = None filter_name: Optional[str] = None slitlets: list[MPTSlitlet] = None catalog_name: Optional[str] = None primary_ids: list[int] = None n_open_shutters: int = 0
[docs] def to_open_shutters(self) -> list[OpenShutter]: """Unfold slitlets into a flat list of OpenShutter entries. Each slitlet at (q, s, d, h) maps to h shutters at rows s, s+1, …, s+h-1 in column d of quadrant q. The middle shutter (s + h//2) is the "target" row; the others are "sky". """ out: list[OpenShutter] = [] for sl in self.slitlets or []: mid_offset = sl.h // 2 for off in range(sl.h): role = "target" if off == mid_offset else "sky" tid = str(sl.primary_id) if sl.primary_id is not None else None out.append(OpenShutter( q=int(sl.q), s=int(sl.s + off), d=int(sl.d), target_id=tid, role=role, )) return out
[docs] def parse_mpt_json(path: str) -> list[MPTPlan]: """Parse an APT/MPT JSON plan file into one MPTPlan per `configs` entry. APT's MSA-Planner exports JSON with this shape: { "aperturePA": <degrees>, # NIRSpec aperture PA on sky "theta": <degrees>, # MSA roll (we don't use it directly) "catalog": {"name": ..., "primariesName": ..., ...}, "configs": [ {"name": "c1 : foo plan 1", "slitlets": [{"q":1,"d":12,"s":50,"h":3}, ...], "exposures": [{"ra": ..., "dec": ..., "gratingFilter": "PRISM_CLEAR", "sourceIds": [...], ...}], "primaryIds": [...], "fillerIds": [...] }, ... ] } Returns one MPTPlan per config. The plan's RA/Dec/grating/filter come from the first exposure of the config. The V3 PA is derived from aperturePA - V3IdlYAngle so vMPT can drive the overlay directly. Raises ValueError on malformed input. """ try: with open(path) as f: data = json.load(f) except (OSError, json.JSONDecodeError) as e: raise ValueError(f"Could not read MPT JSON {path}: {e}") from e if not isinstance(data, dict) or "configs" not in data: raise ValueError(f"{path}: not an MPT plan JSON (missing 'configs')") try: apa = float(data["aperturePA"]) except (KeyError, TypeError, ValueError) as e: raise ValueError(f"{path}: missing or invalid 'aperturePA'") from e v3pa = (apa - V3_IDL_Y_ANGLE) % 360.0 cat = data.get("catalog", {}) or {} catalog_name = cat.get("name") or cat.get("primariesName") plans: list[MPTPlan] = [] for i, cfg in enumerate(data["configs"]): if not isinstance(cfg, dict): continue name = str(cfg.get("name", f"config_{i}")) # Parse slitlets raw_sl = cfg.get("slitlets") or [] slitlets: list[MPTSlitlet] = [] primary_ids = cfg.get("primaryIds") or [] # Map each slitlet to a primary target id if there's a positional # correspondence (APT typically lists slitlets in primaryIds order). for j, sl in enumerate(raw_sl): try: slitlets.append(MPTSlitlet( q=int(sl["q"]), s=int(sl["s"]), d=int(sl["d"]), h=int(sl.get("h", 1)), primary_id=(int(primary_ids[j]) if j < len(primary_ids) else None), )) except (KeyError, TypeError, ValueError) as e: raise ValueError( f"{path}: malformed slitlet [{i}][{j}]: {e}" ) from e # Derive RA/Dec and grating from the first DISPERSED exposure. # APT plans interleave target-acquisition/imaging steps (which have # `gratingFilter: null`) with the science exposure that carries the # grating + filter — sometimes the dispersed exposure isn't first. # Fall back to exposures[0] for plans with no dispersed step at all # (e.g. an MSA shutter-mask preview). exps = cfg.get("exposures") or [] ra = dec = None grating = filt = None primary = None for e in exps: if isinstance(e, dict) and e.get("gratingFilter"): primary = e break if primary is None and exps: primary = exps[0] if isinstance(exps[0], dict) else None if primary is not None: try: ra = float(primary.get("ra")) if primary.get("ra") is not None else None dec = float(primary.get("dec")) if primary.get("dec") is not None else None except (TypeError, ValueError): pass gf = primary.get("gratingFilter") or "" if "_" in gf: grating, filt = gf.split("_", 1) elif "/" in gf: grating, filt = gf.split("/", 1) # Count physical open shutters (each slitlet contributes h) n_open = sum(sl.h for sl in slitlets) plans.append(MPTPlan( name=name, aperture_pa_deg=apa, v3_pa_deg=v3pa, ra_deg=ra, dec_deg=dec, grating=grating, filter_name=filt, slitlets=slitlets, catalog_name=catalog_name, primary_ids=list(primary_ids), n_open_shutters=n_open, )) return plans
[docs] def list_mpt_plans_in_aptx(aptx_path: str) -> list[str]: """Return the names of MPT-style JSON files embedded in an .aptx archive. .aptx files are zip archives containing one or more <plan>.json files in MPT format, alongside the proposal XML, manifest, and pointing files. We filter to JSON files that look like MPT plans (top-level keys include 'configs' and 'aperturePA'). """ out: list[str] = [] with zipfile.ZipFile(aptx_path) as zf: for info in zf.infolist(): if not info.filename.lower().endswith(".json"): continue try: with zf.open(info) as fh: data = json.load(fh) except (json.JSONDecodeError, OSError): continue if isinstance(data, dict) and "configs" in data and "aperturePA" in data: out.append(info.filename) return sorted(out)
[docs] def parse_mpt_json_in_aptx(aptx_path: str, member_name: str) -> list[MPTPlan]: """Extract one MPT JSON entry from an .aptx archive into a list of MPTPlan (delegating to parse_mpt_json after extraction).""" with zipfile.ZipFile(aptx_path) as zf, tempfile.NamedTemporaryFile( suffix=".json", delete=False, mode="wb", ) as out: out.write(zf.read(member_name)) tmp_path = out.name try: return parse_mpt_json(tmp_path) finally: Path(tmp_path).unlink(missing_ok=True)
[docs] def download_apt_program(program_id: int | str, dest_path: Optional[str] = None) -> str: """Fetch <program_id>.aptx from STScI's public proposal-info URL. Returns the local filesystem path of the downloaded archive. If `dest_path` is None, writes to a temp file. Raises ValueError if the fetch fails or the response isn't a recognizable .aptx. """ pid = str(program_id).strip() if not re.fullmatch(r"\d+", pid): raise ValueError(f"program_id must be an integer; got {program_id!r}") url = APT_URL_TEMPLATE.format(program_id=pid) try: with urllib.request.urlopen(url, timeout=30) as resp: payload = resp.read() except Exception as e: # noqa: BLE001 raise ValueError(f"Could not fetch APT {pid} from {url}: {e}") from e # Sanity check — zip magic bytes if not payload.startswith(b"PK"): raise ValueError(f"APT {pid}: server didn't return a zip file (got " f"{len(payload)} bytes starting {payload[:8]!r})") if dest_path is None: tmp = tempfile.NamedTemporaryFile(prefix=f"apt_{pid}_", suffix=".aptx", delete=False) tmp.write(payload) tmp.close() return tmp.name Path(dest_path).write_bytes(payload) return str(dest_path)
[docs] def parse_shutter_csv(path: str) -> list[OpenShutter]: """Parse a shutter_mask.csv exported by APT/MPT (or eMPT). Format: 731 lines (1 header + 730 data rows × 342 cells). Cells encode shutter state: '0' = commanded open (user's pick), '1' = closed, 'x' = failed-closed (operability), 's' = failed-open. We return only the '0' cells. Tiling matches our writer: CSV row 1..365 = Q1 + Q2 by d, CSV row 366..730 = Q3 + Q4 by d (d = row - 365); CSV col 1..171 = s for Q1/Q3, CSV col 172..342 = s for Q2/Q4 (s = col or col - 171). """ try: with open(path) as f: lines = f.read().splitlines() except OSError as e: raise ValueError(f"Could not read shutter CSV {path}: {e}") from e # Strip header (first line that doesn't start with a digit/letter) data_lines = [ln for ln in lines if ln and not ln.lstrip().startswith("#")] if len(data_lines) != 730: raise ValueError( f"{path}: expected 730 data rows in shutter CSV, got {len(data_lines)}" ) out: list[OpenShutter] = [] for ir, line in enumerate(data_lines): cells = [c.strip() for c in line.split(",")] if len(cells) != 342: raise ValueError( f"{path}: row {ir+1} has {len(cells)} cells; expected 342" ) # Determine quadrant pair and d-index from row index if ir < 365: d = ir + 1 q_left, q_right = 1, 2 else: d = ir - 365 + 1 q_left, q_right = 3, 4 for ic, c in enumerate(cells): if c != "0": continue if ic < 171: q = q_left s = ic + 1 else: q = q_right s = ic - 171 + 1 out.append(OpenShutter(q=q, s=s, d=d, target_id=None, role="manual")) return out