"""Target catalog loader (CSV / ASCII / FITS table)."""
from __future__ import annotations
import os
import re
from dataclasses import dataclass, field
import numpy as np
from astropy.io import ascii as ioascii
from astropy.table import Table
# Pattern that picks up the *numeric portion* of a value like "P0",
# "P1", "class-3", "1.5e-2". Used by _as_float so common JWST priority-
# class encodings (P0 = highest, P1 = …) flow through as numeric 0, 1,
# etc. without forcing the user to hand-edit their catalog.
_NUM_RE = re.compile(r"-?\d+(?:\.\d+)?(?:[eE][+-]?\d+)?")
# Catalog IDs above this threshold are taken mod ID_MOD before being
# stored. JADES-style IDs can run to 8–9 digits, but APT MPT and the
# eMPT pipeline both expect compact integer source numbers — anything
# beyond ~10⁷ tends to be silently truncated or rejected downstream.
# Collisions after the mod are vanishingly rare in real catalogs;
# we accept that trade-off in exchange for a clean integer space.
ID_MOD = 10_000_000
[docs]
@dataclass
class Catalog:
ids: np.ndarray
ra_deg: np.ndarray
dec_deg: np.ndarray
priority: np.ndarray
mag: np.ndarray
z: np.ndarray
label: np.ndarray
source_path: str
# `weight` is a sibling of `priority`: float64 with NaN for missing.
# Used by the optimizer's Meritocracy mode (sum of placed weights),
# and by the Hierarchy mode internally to break ties within a
# priority tier.
weight: np.ndarray = field(
default_factory=lambda: np.array([], dtype=float)
)
# ---- Per-target spectral constraints (v1.3.0+) ------------------
# Each row optionally constrains how its spectrum must fall on the
# detector given the current (disperser, filter). At every
# candidate pointing the optimizer fetches the source's centre-
# shutter wavelength endpoints via `vmpt.wavelengths.cutoffs` and
# drops the target if any constraint fails.
#
# required_lam[i] is a list of (lam_lo, lam_hi) tuples in μm — the
# **spectral coverage** the user requires for source i. Empty list
# = no requirement. Stored as a dtype=object array so the ragged
# per-row list lengths work in NumPy.
required_lam: np.ndarray = field(
default_factory=lambda: np.array([], dtype=object)
)
# no_gap[i] = True → the NRS1/NRS2 detector gap must NOT fall
# inside [lam_blue, lam_red] (i.e. cutoffs() must return NaN for
# both gap_lo and gap_hi). Strict interpretation per v1.3.0.
no_gap: np.ndarray = field(
default_factory=lambda: np.array([], dtype=bool)
)
# extend_blue[i] = True → the centre shutter's lam_blue must
# reach the disperser/filter's nominal blue limit (no left-edge
# truncation due to where in V2 the target sits).
extend_blue: np.ndarray = field(
default_factory=lambda: np.array([], dtype=bool)
)
# extend_red[i] = True → same, for the red end.
extend_red: np.ndarray = field(
default_factory=lambda: np.array([], dtype=bool)
)
# protect[i] = True → this target's spectrum is collision-
# protected (same semantics as the v1.2 catalog-wide protect_mask
# — the optimizer takes the logical OR of this flag and the v1.2
# cutoff-derived mask).
protect: np.ndarray = field(
default_factory=lambda: np.array([], dtype=bool)
)
# centration[i] is a per-target source-centering override. Empty
# string ("") means "use the optimizer's global Source centering
# setting" — the v1.3.0 default. Otherwise must be one of the
# five canonical labels (UNCONSTRAINED, ENTIRE_OPEN, MIDPOINT,
# CONSTRAINED, TIGHTLY_CONSTRAINED) — anything else gets coerced
# to "" at load time. The optimizer reads this verbatim, looks up
# CENTRATION_BUFFERS, and the per-target buffer wins **uncondition-
# ally** — even when it's laxer than the global. v1.3.1+.
centration: np.ndarray = field(
default_factory=lambda: np.array([], dtype=object)
)
# Original-column → values for every column the loader did NOT
# claim as one of the canonical fields above. Stored as object
# arrays so we don't lose mixed-type information (e.g., priority-
# class strings, free-text notes). The catalog editor exposes
# these via the column picker; we never mutate them algorithmically
# so they round-trip back into Save-as-CSV unchanged unless the
# user edits them.
extras: dict = field(default_factory=dict)
# Lookup tables for the loose column-matcher (`_find_col`). Each
# candidate is normalised with `_norm` (lowercase + strip bracketed
# units + collapse to alphanumeric + strip trailing unit tokens). The
# normalisation makes `RA`, `ra`, `RA[deg]`, `RA(deg)`, `RA_deg`,
# `ra_J2000`, `ALPHA_J2000`, `R.A.` all map to the same key.
_ID_KEYS = (
"id", "no", "nocat", "objid", "objectid", "sourceid", "source",
"src", "srcid", "targetid", "targid", "ident",
)
# Permissive ID fallbacks: accepted only when the column's values are
# numeric (else we'd silently sort sources by their human-readable name).
_ID_FALLBACK_KEYS = ("name", "label", "tag", "target", "targetname", "#")
_RA_KEYS = (
"ra", "rightascension", "raj2000", "alpha", "alphaj2000",
"rad", "radeg",
)
_DEC_KEYS = (
"dec", "declination", "decj2000", "delta", "deltaj2000",
"decd", "decdeg",
# Vizier-style "DEJ2000" normalises to "de" once "J2000" is
# stripped as a unit/epoch token. Adding "de" keeps that catalog
# convention working.
"de",
)
_PRI_KEYS = ("priority", "pr", "pri", "prio", "priorityclass")
_WEIGHT_KEYS = ("weight", "w", "wt", "weights")
_MAG_KEYS = (
"mag", "magnitude", "f444wmag", "magf444w", "f356wmag", "magf356w",
"f200wmag", "magf200w",
)
_Z_KEYS = ("z", "zspec", "zphot", "redshift", "zbest", "zuse")
_LABEL_KEYS = ("label", "name", "tag")
# ---- v1.3.0 per-target spectral-constraint columns -------------------
# Loose-matched the same way as the canonical fields above. `lam_req`
# stores a string per row ("1.0-1.3; 1.5-1.8") parsed at load via
# `_parse_lam_req_str` into a list[(float, float)]. The four boolean
# columns accept any of the truthy-ish text values _BOOL_TRUE_TOKENS.
_LAM_REQ_KEYS = (
"lamreq", "lambdareq", "lambdarequired", "wavelengthrequired",
"requiredlam", "reqlam", "requiredwavelength",
)
_NO_GAP_KEYS = ("nogap", "gapless", "nodetectorgap")
_EXT_BLUE_KEYS = ("extendblue", "extendsblue", "blueextends", "bluest")
_EXT_RED_KEYS = ("extendred", "extendsred", "redextends", "reddest")
_PROTECT_KEYS = (
"protect", "protected", "protectcollision", "collisionprotect",
)
# Per-target source-centering override (v1.3.1+). Cell value is one of
# `_VALID_CENTRATION_LEVELS` (case-insensitive); anything else becomes
# "" (no override). The match is loose — see `_as_centration_str` for
# the alias rules (e.g. "tight" → "TIGHTLY_CONSTRAINED").
_CENTRATION_KEYS = (
"centration", "centering", "sourcecentration", "sourcecentering",
"centerclass", "centeringclass",
)
_VALID_CENTRATION_LEVELS = (
"UNCONSTRAINED",
"ENTIRE_OPEN",
"MIDPOINT",
"CONSTRAINED",
"TIGHTLY_CONSTRAINED",
)
# Short aliases users are likely to type, mapping back to the canonical
# label. The normalised key is lowercase + non-alnum stripped, matching
# `_norm()`'s shape (so "tightly-constrained" works too).
_CENTRATION_ALIASES = {
"unconstrained": "UNCONSTRAINED", "unc": "UNCONSTRAINED",
"none": "UNCONSTRAINED", "off": "UNCONSTRAINED",
"entireopen": "ENTIRE_OPEN", "entire": "ENTIRE_OPEN",
"open": "ENTIRE_OPEN",
"midpoint": "MIDPOINT", "mid": "MIDPOINT", "middle": "MIDPOINT",
"constrained": "CONSTRAINED", "con": "CONSTRAINED",
"tight": "TIGHTLY_CONSTRAINED",
"tightly": "TIGHTLY_CONSTRAINED",
"tightlyconstrained": "TIGHTLY_CONSTRAINED",
"tightconstrained": "TIGHTLY_CONSTRAINED",
}
_BOOL_TRUE_TOKENS = frozenset(
("1", "true", "yes", "y", "t", "✓", "✔", "on")
)
# Trailing tokens that look like *units* on an otherwise-clean column
# name — stripped after lowercasing + alphanumeric collapse so
# `RA[deg]`, `RA_deg`, `ra (deg)`, `RAJ2000` all collapse to `ra`.
_UNIT_SUFFIX_TOKENS = (
"degrees", "degree", "deg",
"radians", "radian", "rad",
"arcseconds", "arcsec", "asec",
"j2000", "icrs", "fk5",
)
def _norm(name: str) -> str:
"""Normalise a column name for loose matching."""
if name is None:
return ""
s = str(name).lower()
# Strip bracketed / parenthesised unit suffixes ("RA[deg]" → "RA").
s = re.sub(r"\[[^\]]*\]", "", s)
s = re.sub(r"\([^)]*\)", "", s)
# Collapse remaining non-alphanumerics ("ra_deg" → "radeg", "R.A." → "ra").
s = re.sub(r"[^a-z0-9]+", "", s)
# Strip trailing unit tokens ("radeg" → "ra"). Loop so chained
# suffixes (e.g. "decjsiomdeg") peel off one by one.
changed = True
while changed:
changed = False
for tok in _UNIT_SUFFIX_TOKENS:
if len(s) > len(tok) and s.endswith(tok):
s = s[: -len(tok)]
changed = True
break
return s
def _find_col(table: Table, candidates) -> str | None:
"""Return the original column name matching any normalised candidate."""
norm_map: dict[str, str] = {}
for c in table.colnames:
norm_map.setdefault(_norm(c), c)
for cand in candidates:
n = _norm(cand)
if n and n in norm_map:
return norm_map[n]
return None
def _find_id_col(table: Table) -> tuple[str | None, bool]:
"""Locate the catalog's ID column.
Returns `(name, is_numeric_fallback)`. The fallback flag is True
when we accepted a permissive candidate (`name`, `label`, …)
*because* its values coerced to integers — used downstream to
decide whether to preserve the original token alongside the int ID.
"""
name = _find_col(table, _ID_KEYS)
if name is not None:
return name, False
# Permissive: accept name/label/tag only if values look like integers.
for cand in _ID_FALLBACK_KEYS:
col_name = _find_col(table, (cand,))
if col_name is None:
continue
col = table[col_name]
try:
arr = np.asarray(col, dtype=np.int64)
except (ValueError, TypeError):
continue
# Sanity: empty / all-zero columns are unlikely to be IDs.
if arr.size > 0:
return col_name, True
return None, False
def _coerce_int_ids(raw, nrows: int) -> np.ndarray:
"""Return an int64 ID array of length `nrows`, with mod ID_MOD
applied to any source ID at or above 10⁷.
If `raw` can't be coerced to int (string IDs like "RJ0600-x-P0"),
we return the raw values as an object array — the integer
extraction happens later in the exporter's `_to_int_id`."""
try:
ids = np.asarray(raw, dtype=np.int64)
except (ValueError, TypeError):
return np.asarray([str(v) for v in raw], dtype=object)
big = np.abs(ids) >= ID_MOD
if big.any():
ids = ids.copy()
ids[big] = np.mod(ids[big], ID_MOD)
return ids
def _as_float(table: Table, name: str | None) -> np.ndarray:
"""Coerce a column to float, tolerantly.
Catalogs in the wild use a few non-numeric conventions for fields
that vMPT wants as numbers — the most common is the **priority
class** (`P0`, `P1`, …). Rather than throwing, we:
• try the fast path (`np.asarray(..., dtype=float)`);
• on failure fall back to row-by-row parsing — empty strings and
masked values become NaN, and the *numeric portion* of any
string is extracted (so `"P0"` → 0.0, `"class-3"` → 3.0,
`"high-mu"` → NaN).
"""
n = len(table)
if name is None:
return np.full(n, np.nan, dtype=float)
col = table[name]
# Numeric column with astropy masks → fill masked entries with NaN.
# (np.asarray on a MaskedArray drops the mask and exposes the
# underlying buffer, which usually has 0 in the masked slots — not
# what we want for empty `mag` / `z` cells.) Apply this for any
# numeric dtype, not just floats — empty cells in an int column
# also need NaN handling.
if np.issubdtype(getattr(col, "dtype", np.dtype("O")), np.number):
try:
arr = np.ma.asarray(col)
# Cast to float FIRST so the NaN fill is representable,
# then fill. Otherwise filling an int-typed masked array
# with `np.nan` silently coerces to 0.
return np.ma.filled(arr.astype(float), np.nan)
except (ValueError, TypeError):
pass
# Non-numeric column — fall through to row-by-row parse below.
try:
return np.asarray(col, dtype=float)
except (ValueError, TypeError):
pass
# Non-numeric column → row-by-row parse, extracting trailing digits.
out = np.full(n, np.nan, dtype=float)
mask = getattr(col, "mask", None)
for i, v in enumerate(col):
if mask is not None and mask is not False:
try:
if mask[i]:
continue
except (TypeError, IndexError):
pass
if v is None:
continue
s = str(v).strip()
if not s or s.lower() in ("--", "nan", "none", "null"):
continue
try:
out[i] = float(s)
continue
except ValueError:
pass
m = _NUM_RE.search(s)
if m is not None:
try:
out[i] = float(m.group(0))
except ValueError:
pass
return out
def _as_str(table: Table, name: str | None) -> np.ndarray:
n = len(table)
if name is None:
return np.array([""] * n, dtype=object)
return np.asarray([str(v) for v in table[name]], dtype=object)
def _as_bool(table: Table, name: str | None) -> np.ndarray:
"""Coerce a column to bool, recognising any value in
:data:`_BOOL_TRUE_TOKENS` as True. Empty / NaN / 0 / "false" are
False. Used to read the per-target boolean constraint columns."""
n = len(table)
if name is None:
return np.zeros(n, dtype=bool)
col = table[name]
out = np.zeros(n, dtype=bool)
mask = getattr(col, "mask", None)
for i, v in enumerate(col):
if mask is not None and mask is not False:
try:
if mask[i]:
continue
except (TypeError, IndexError):
pass
if v is None:
continue
s = str(v).strip().lower()
if not s or s in ("nan", "none", "null", "--"):
continue
out[i] = s in _BOOL_TRUE_TOKENS
return out
def _parse_lam_req_str(s: str) -> list[tuple[float, float]]:
"""Parse the user-facing wavelength-range string format.
Format: zero or more ``"lo-hi"`` ranges in μm, semicolon- or
comma-separated. Examples:
"" → []
"1.0-1.3" → [(1.0, 1.3)]
"1.0-1.3; 1.5-1.8" → [(1.0, 1.3), (1.5, 1.8)]
"0.9 - 1.0, 2 - 3" → [(0.9, 1.0), (2.0, 3.0)]
Invalid fragments are silently dropped — the popover UI shows a
yellow warning on save when it spots them; from the loader's
perspective they just become missing constraints.
"""
if s is None:
return []
s = str(s).strip()
if not s or s.lower() in ("nan", "none", "null", "--"):
return []
out: list[tuple[float, float]] = []
for chunk in re.split(r"[;,]", s):
c = chunk.strip()
if not c:
continue
# "1.0-1.3" or "1.0 — 1.3" or "1.0 to 1.3"
m = re.match(
r"^\s*([\d.eE+-]+)\s*(?:-|–|—|to)\s*([\d.eE+-]+)\s*$",
c,
)
if not m:
continue
try:
lo, hi = float(m.group(1)), float(m.group(2))
except ValueError:
continue
if lo > hi:
lo, hi = hi, lo
if np.isfinite(lo) and np.isfinite(hi):
out.append((lo, hi))
return out
def _format_lam_req(ranges) -> str:
"""Inverse of :func:`_parse_lam_req_str` — serialise back to the
string format used in the CSV and in the popover input."""
if ranges is None:
return ""
parts: list[str] = []
for r in ranges:
try:
lo, hi = float(r[0]), float(r[1])
except (TypeError, ValueError, IndexError):
continue
if np.isfinite(lo) and np.isfinite(hi):
parts.append(f"{lo:g}-{hi:g}")
return "; ".join(parts)
def _as_lam_req(table: Table, name: str | None) -> np.ndarray:
"""Read a wavelength-required column from the catalog. Each cell is
parsed by :func:`_parse_lam_req_str` into a `list[tuple]`; the
result is wrapped in a `dtype=object` array (ragged-friendly)."""
n = len(table)
if name is None:
return np.array([[] for _ in range(n)], dtype=object)
col = table[name]
out = np.empty(n, dtype=object)
mask = getattr(col, "mask", None)
for i, v in enumerate(col):
masked = False
if mask is not None and mask is not False:
try:
masked = bool(mask[i])
except (TypeError, IndexError):
masked = False
out[i] = [] if masked else _parse_lam_req_str(v)
return out
def _normalise_centration(value) -> str:
"""Coerce a single centration cell to one of the canonical labels.
Returns ``""`` (no override) for empty / unrecognised values, or
one of the strings in :data:`_VALID_CENTRATION_LEVELS`. Matching
is case-insensitive and tolerant of underscores / hyphens (e.g.
``"tightly-constrained"`` and ``"Tight"`` both resolve to
``"TIGHTLY_CONSTRAINED"``).
"""
if value is None:
return ""
s = str(value).strip()
if not s or s.lower() in ("nan", "none", "null", "--"):
return ""
# Direct canonical match (handles the common case fast).
upper = s.upper().replace("-", "_").replace(" ", "_")
if upper in _VALID_CENTRATION_LEVELS:
return upper
# Loose-match against the alias table (lowercase + alnum-only).
key = re.sub(r"[^a-z0-9]+", "", s.lower())
if key in _CENTRATION_ALIASES:
return _CENTRATION_ALIASES[key]
return ""
def _as_centration_str(table: Table, name: str | None) -> np.ndarray:
"""Read a centration-override column. Cells coerce via
:func:`_normalise_centration`; unrecognised values silently become
``""`` (i.e. "use the optimizer's global setting")."""
n = len(table)
if name is None:
return np.array([""] * n, dtype=object)
col = table[name]
out = np.empty(n, dtype=object)
mask = getattr(col, "mask", None)
for i, v in enumerate(col):
masked = False
if mask is not None and mask is not False:
try:
masked = bool(mask[i])
except (TypeError, IndexError):
masked = False
out[i] = "" if masked else _normalise_centration(v)
return out
[docs]
def load_catalog(path: str) -> Catalog:
ext = os.path.splitext(path)[1].lower()
if ext in (".fits", ".fit", ".fz"):
table = Table.read(path)
elif ext == ".csv":
table = ioascii.read(path, format="csv")
else:
table = ioascii.read(path)
ra_col = _find_col(table, _RA_KEYS)
dec_col = _find_col(table, _DEC_KEYS)
if ra_col is None or dec_col is None:
raise ValueError(
f"Catalog at {path} missing RA/Dec columns. Have: {table.colnames}"
)
id_col, id_from_fallback = _find_id_col(table)
if id_col is None:
# Catalog has no ID-like column — fake sequential IDs 1..N so
# downstream code (slitlet auto-tag, MPT export) still works.
ids = np.arange(1, len(table) + 1, dtype=np.int64)
else:
ids = _coerce_int_ids(table[id_col], len(table))
pri_col = _find_col(table, _PRI_KEYS)
weight_col = _find_col(table, _WEIGHT_KEYS)
mag_col = _find_col(table, _MAG_KEYS)
z_col = _find_col(table, _Z_KEYS)
# Per-target constraint columns (v1.3.0+). All optional; defaults
# leave constraints unset and v1.2.x behaviour is preserved.
lam_req_col = _find_col(table, _LAM_REQ_KEYS)
no_gap_col = _find_col(table, _NO_GAP_KEYS)
extend_blue_col = _find_col(table, _EXT_BLUE_KEYS)
extend_red_col = _find_col(table, _EXT_RED_KEYS)
protect_col = _find_col(table, _PROTECT_KEYS)
# Per-target source-centering override (v1.3.1+).
centration_col = _find_col(table, _CENTRATION_KEYS)
# If `name`/`label` was used as the ID fallback, don't ALSO claim it
# as the label column — that would just duplicate the ID.
label_candidates = _LABEL_KEYS
if id_from_fallback and id_col is not None:
label_candidates = tuple(
k for k in _LABEL_KEYS if _norm(k) != _norm(id_col)
)
label_col = _find_col(table, label_candidates)
# Every column the loader didn't claim above survives in `extras`
# so the catalog editor can show it. Stored as object arrays so we
# don't lose mixed-type information.
claimed = {c for c in (id_col, ra_col, dec_col, pri_col, weight_col,
mag_col, z_col, label_col,
lam_req_col, no_gap_col, extend_blue_col,
extend_red_col, protect_col,
centration_col) if c}
extras: dict = {}
for col_name in table.colnames:
if col_name in claimed:
continue
col = table[col_name]
try:
extras[col_name] = np.asarray([
("" if v is None else str(v))
for v in (np.ma.filled(col, "") if hasattr(col, "mask") else col)
], dtype=object)
except (TypeError, ValueError):
# Fall back to a plain object copy; if even that fails we
# silently drop the column rather than crashing the loader.
try:
extras[col_name] = np.asarray(list(col), dtype=object)
except Exception: # noqa: BLE001
pass
return Catalog(
ids=ids,
ra_deg=np.asarray(table[ra_col], dtype=float),
dec_deg=np.asarray(table[dec_col], dtype=float),
priority=_as_float(table, pri_col),
weight=_as_float(table, weight_col),
mag=_as_float(table, mag_col),
z=_as_float(table, z_col),
label=_as_str(table, label_col),
required_lam=_as_lam_req(table, lam_req_col),
no_gap=_as_bool(table, no_gap_col),
extend_blue=_as_bool(table, extend_blue_col),
extend_red=_as_bool(table, extend_red_col),
protect=_as_bool(table, protect_col),
centration=_as_centration_str(table, centration_col),
source_path=path,
extras=extras,
)
[docs]
def save_catalog(cat: Catalog, path: str, *,
include_constraints: str = "auto") -> None:
"""Write a :class:`Catalog` back to CSV.
Emits the standard eight columns (``ID, RA, DEC, priority,
weight, mag, z, label``) followed optionally by the six v1.3.x
per-target constraint columns (``lam_req, no_gap, extend_blue,
extend_red, protect, centration``), then any ``extras`` columns
the catalog is carrying. The output is round-trip-compatible
with :func:`load_catalog` — write, reload, and the resulting
:class:`Catalog` matches the input modulo dtype.
Parameters
----------
cat : Catalog
The catalog to save.
path : str
Destination CSV path. Parent directories are NOT created
automatically — callers should ensure the parent exists.
include_constraints : {"auto", "always", "never"}
Controls whether the six constraint columns appear in the
output:
- ``"auto"`` (default): emit the columns iff at least one
row has a non-default value (the same rule the catalog
editor's Save-as-CSV button uses, so v1.2.x catalogs that
never picked up constraints get the same CSV format they
had before).
- ``"always"``: always emit the columns, even when every
row is at defaults. Useful when you want a "template"
CSV the user can hand-edit.
- ``"never"``: omit them. Use when you specifically want
to drop the constraint metadata.
Notes
-----
Wavelength-range cells round-trip via the same string format
the catalog editor uses (``"1.0-1.3; 1.5-1.8"``), parsed back
by :func:`_parse_lam_req_str` on the next load.
NaN / missing values render as empty cells in the CSV; on
reload they come back as NaN (for float columns) or empty
string (for label / extras).
"""
import csv
if include_constraints not in ("auto", "always", "never"):
raise ValueError(
f"include_constraints must be one of "
f"'auto', 'always', 'never'; got {include_constraints!r}"
)
n = len(cat.ra_deg)
def _fmt_int_or_blank(v) -> str:
try:
f = float(v)
if not np.isfinite(f):
return ""
return str(int(round(f)))
except (TypeError, ValueError):
return "" if v is None else str(v)
def _fmt_float_or_blank(v) -> str:
try:
f = float(v)
if not np.isfinite(f):
return ""
# Drop trailing zeros / trailing decimal so the CSV is
# tidy for hand-editing.
s = f"{f:.6f}".rstrip("0").rstrip(".")
return s or "0"
except (TypeError, ValueError):
return "" if v is None else str(v)
def _fmt_id(v) -> str:
# Catalog.ids can be int64 or object dtype (string IDs).
try:
return str(int(v))
except (TypeError, ValueError):
return "" if v is None else str(v)
# Decide constraint-column emission policy. `"auto"` mode emits
# only when at least one row has a non-default value.
required_lam = getattr(cat, "required_lam", None)
no_gap = np.asarray(getattr(cat, "no_gap", []), dtype=bool)
extend_blue = np.asarray(getattr(cat, "extend_blue", []), dtype=bool)
extend_red = np.asarray(getattr(cat, "extend_red", []), dtype=bool)
protect = np.asarray(getattr(cat, "protect", []), dtype=bool)
centration = np.asarray(getattr(cat, "centration", []), dtype=object)
has_lam = (required_lam is not None
and len(required_lam) == n
and any(bool(r) and len(r) > 0 for r in required_lam))
has_centration = (
centration.size == n
and any(bool(str(v).strip()) for v in centration)
)
has_constraints = (
has_lam
or (no_gap.size == n and no_gap.any())
or (extend_blue.size == n and extend_blue.any())
or (extend_red.size == n and extend_red.any())
or (protect.size == n and protect.any())
or has_centration
)
emit_constraints = (
include_constraints == "always"
or (include_constraints == "auto" and has_constraints)
)
extras = getattr(cat, "extras", {}) or {}
constraint_cols = (["lam_req", "no_gap", "extend_blue",
"extend_red", "protect", "centration"]
if emit_constraints else [])
header = (["ID", "RA", "DEC", "priority", "weight", "mag", "z",
"label", *constraint_cols, *extras.keys()])
with open(path, "w", newline="") as f:
w = csv.writer(f)
w.writerow(header)
for i in range(n):
row = [
_fmt_id(cat.ids[i]),
_fmt_float_or_blank(cat.ra_deg[i]),
_fmt_float_or_blank(cat.dec_deg[i]),
_fmt_int_or_blank(cat.priority[i])
if i < len(cat.priority) else "",
_fmt_int_or_blank(cat.weight[i])
if i < len(cat.weight) else "",
_fmt_float_or_blank(cat.mag[i])
if i < len(cat.mag) else "",
_fmt_float_or_blank(cat.z[i])
if i < len(cat.z) else "",
str(cat.label[i]) if i < len(cat.label) else "",
]
if emit_constraints:
rl = (required_lam[i] if required_lam is not None
and i < len(required_lam) else [])
row.append(_format_lam_req(rl) if rl else "")
row.append("1" if (no_gap.size > i
and bool(no_gap[i])) else "")
row.append("1" if (extend_blue.size > i
and bool(extend_blue[i])) else "")
row.append("1" if (extend_red.size > i
and bool(extend_red[i])) else "")
row.append("1" if (protect.size > i
and bool(protect[i])) else "")
# Empty cell when no override; otherwise the canonical
# label (already normalised by the loader or `_normalise_
# centration`). Writing an unrecognised label here is
# not a hard error — the next load just resets it to "".
if centration.size > i:
c = str(centration[i]).strip()
row.append(c if c else "")
else:
row.append("")
for k, vals in extras.items():
row.append(str(vals[i]) if i < len(vals) else "")
w.writerow(row)
[docs]
def catalog_in_view(cat: Catalog, ra_min, ra_max, dec_min, dec_max) -> np.ndarray:
ra = cat.ra_deg
dec = cat.dec_deg
in_dec = (dec >= dec_min) & (dec <= dec_max)
if ra_min <= ra_max:
in_ra = (ra >= ra_min) & (ra <= ra_max)
else:
# RA range wraps across 0/360
in_ra = (ra >= ra_min) | (ra <= ra_max)
return in_ra & in_dec