cosmicraysandearthquakes/scripts/06_check_data_availability.py
root e5a812fa14 Initial commit: full analysis pipeline source code
Scripts 01-08 implement the complete cosmic-ray/earthquake correlation
analysis from data ingestion through out-of-sample validation and
combined timeseries sinusoid fitting.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 02:45:10 +02:00

455 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
scripts/06_check_data_availability.py
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Determine the most recent date on which all three data sources are reliably
available, then download missing data for the 2020-to-present window.
Data sources
------------
1. NMDB — hourly pressure-corrected neutron monitor counts.
Reliable end = last date with ≥ 60% hourly coverage, minus 30 days
to allow for processing delays. Flags stations with > 30-day gaps.
2. USGS — M ≥ 4.5 global earthquake catalogue via FDSN.
Catalogue is generally complete within ~30 days.
Reliable end = today 45 days.
3. SIDC — SILSO daily sunspot numbers.
Definitive values: ~6-month lag. Provisional: ~30-day lag.
Reliable end (definitive) = today 180 days.
This script uses the provisional series with a note, so
reliable end = today 30 days.
Common window end = min(NMDB_reliable, USGS_reliable, SIDC_reliable).
Window start is fixed at 2020-01-01 (first date post-Homola study period).
Outputs
-------
results/data_availability.json — window dates + per-source details
results/data_availability.txt — human-readable report
Usage
-----
python scripts/06_check_data_availability.py
python scripts/06_check_data_availability.py --no-download # check only
"""
from __future__ import annotations
import argparse
import json
import logging
import sys
import time
from datetime import date, timedelta
from pathlib import Path
import numpy as np
import pandas as pd
import requests
import yaml
PROJECT_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(PROJECT_ROOT / "src"))
from crq.ingest.nmdb import load_station, resample_daily, download_station_year
from crq.ingest.usgs import download_year as usgs_download_year, load_usgs
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(name)s%(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
logger = logging.getLogger("crq.avail")
OOS_START = "2020-01-01"
COVERAGE_THRESHOLD = 0.60
MIN_COVERAGE_FRACTION = 0.50 # station needs 50% valid bins in OOS window
GAP_WARN_DAYS = 30
# ---------------------------------------------------------------------------
# SIDC download
# ---------------------------------------------------------------------------
_SIDC_URL = (
"https://www.sidc.be/silso/INFO/sndhcsv.php"
)
_SIDC_URL_ALT = (
"https://www.sidc.be/silso/DATA/SN_d_tot_V2.0.csv"
)
def download_sidc(sidc_dir: Path, timeout: int = 60) -> Path | None:
"""Download SIDC daily total sunspot number (V2.0). Returns path or None."""
sidc_dir.mkdir(parents=True, exist_ok=True)
dest = sidc_dir / "sunspots.csv"
for url in (_SIDC_URL, _SIDC_URL_ALT):
try:
resp = requests.get(url, timeout=timeout)
resp.raise_for_status()
dest.write_text(resp.text, encoding="utf-8")
logger.info("SIDC downloaded: %d bytes → %s", len(resp.text), dest)
return dest
except Exception as exc:
logger.warning("SIDC download failed (%s): %s", url, exc)
# Fall back to any existing file
if dest.exists() and dest.stat().st_size > 0:
logger.warning("SIDC download failed — using cached file %s", dest)
return dest
return None
def check_sidc(sidc_dir: Path, today: date) -> dict:
"""Parse SIDC file and determine reliable end date."""
path = sidc_dir / "sunspots.csv"
if not path.exists():
return {"status": "missing", "last_date": None, "reliable_end": None}
try:
# SIDC V2.0 CSV: year;month;day;fracyear;SSN;std;Nobs;provisional
df = pd.read_csv(
path,
sep=";",
header=None,
names=["year", "month", "day", "fracyear", "ssn", "std", "nobs", "prov"],
comment="#",
dtype=str,
)
df = df[df["year"].str.isnumeric()]
df["date"] = pd.to_datetime(
df["year"].str.strip() + "-" + df["month"].str.strip() + "-" + df["day"].str.strip(),
errors="coerce",
)
df = df.dropna(subset=["date"])
df["ssn"] = pd.to_numeric(df["ssn"].str.strip(), errors="coerce")
df["prov"] = pd.to_numeric(df["prov"].str.strip(), errors="coerce").fillna(1).astype(int)
last_date = df["date"].max().date()
# Provisional records (prov=1) may be revised; definitive = prov=0
definitive = df[df["prov"] == 0]["date"].max().date() if (df["prov"] == 0).any() else None
# Reliable end: today minus 30 days (provisional is good enough)
reliable_end = today - timedelta(days=30)
return {
"status": "ok",
"last_date": str(last_date),
"definitive_end": str(definitive) if definitive else None,
"reliable_end": str(min(reliable_end, last_date)),
"n_records": len(df),
"note": "Using provisional values (prov=1); definitive lag ~6 months",
}
except Exception as exc:
logger.warning("SIDC parse error: %s", exc)
return {"status": "parse_error", "error": str(exc), "reliable_end": None}
# ---------------------------------------------------------------------------
# NMDB download + check
# ---------------------------------------------------------------------------
def download_nmdb_oos(
station_ids: list[str],
nmdb_dir: Path,
oos_start_year: int,
oos_end_year: int,
sleep_between: float = 0.3,
) -> dict[str, list[int]]:
"""
Download all station-years in [oos_start_year, oos_end_year] that are
missing from nmdb_dir. Returns dict station_id -> list of years downloaded.
"""
downloaded: dict[str, list[int]] = {s: [] for s in station_ids}
for station in station_ids:
for year in range(oos_start_year, oos_end_year + 1):
dest = nmdb_dir / f"{station}{year}.csv"
if dest.exists() and dest.stat().st_size > 0:
logger.debug("skip %s %d (exists)", station, year)
continue
try:
download_station_year(station, year, nmdb_dir)
downloaded[station].append(year)
time.sleep(sleep_between)
except Exception as exc:
logger.warning("NMDB %s %d: %s", station, year, exc)
return downloaded
def check_nmdb_stations(
station_ids: list[str],
nmdb_dir: Path,
oos_start: str,
today: date,
) -> dict[str, dict]:
"""
For each station, determine coverage fraction in OOS window and
the most recent date with data.
"""
oos_start_ts = pd.Timestamp(oos_start)
oos_end_ts = pd.Timestamp(today.isoformat())
start_year = int(oos_start[:4])
end_year = today.year
station_info = {}
for station in station_ids:
hourly = load_station(station, start_year, end_year, nmdb_dir)
if hourly.empty:
station_info[station] = {
"status": "no_data",
"coverage_oos": 0.0,
"last_date": None,
"gap_days": None,
}
continue
hourly_oos = hourly.loc[oos_start:]
if hourly_oos.empty:
station_info[station] = {
"status": "no_oos_data",
"coverage_oos": 0.0,
"last_date": None,
"gap_days": None,
}
continue
daily_df = resample_daily(hourly_oos, station, coverage_threshold=COVERAGE_THRESHOLD)
daily = daily_df[station]
n_total = (oos_end_ts - oos_start_ts).days + 1
n_valid = int(daily.notna().sum())
coverage = n_valid / n_total
last_valid = daily.dropna().index.max().date() if not daily.dropna().empty else None
gap_days = (today - last_valid).days if last_valid else None
station_info[station] = {
"status": "ok" if coverage >= MIN_COVERAGE_FRACTION else "low_coverage",
"coverage_oos": round(coverage, 4),
"last_date": str(last_valid) if last_valid else None,
"gap_days": gap_days,
"flag_gap": gap_days > GAP_WARN_DAYS if gap_days is not None else True,
}
logger.info(
"NMDB %-6s coverage=%.1f%% last=%s gap=%s d",
station, 100 * coverage,
last_valid or "N/A",
gap_days or "N/A",
)
return station_info
def nmdb_reliable_end(station_info: dict[str, dict], today: date) -> date:
"""
NMDB reliable end: median last_date among stations with good coverage,
minus 30 days.
"""
last_dates = []
for info in station_info.values():
if info.get("coverage_oos", 0) >= MIN_COVERAGE_FRACTION and info.get("last_date"):
last_dates.append(date.fromisoformat(info["last_date"]))
if not last_dates:
return today - timedelta(days=90)
# Use the 25th percentile to be conservative
last_dates.sort()
p25_idx = max(0, len(last_dates) // 4)
return last_dates[p25_idx] - timedelta(days=30)
# ---------------------------------------------------------------------------
# USGS download + check
# ---------------------------------------------------------------------------
def download_usgs_oos(
usgs_dir: Path,
oos_start_year: int,
oos_end_year: int,
min_magnitude: float = 4.5,
) -> None:
"""Download missing USGS yearly files for OOS window."""
for year in range(oos_start_year, oos_end_year + 1):
dest = usgs_dir / f"usgs-{year}.csv"
if dest.exists() and dest.stat().st_size > 0:
logger.debug("USGS %d: skip (exists)", year)
continue
try:
usgs_download_year(year, usgs_dir, min_magnitude=min_magnitude)
logger.info("USGS %d: downloaded", year)
except Exception as exc:
logger.warning("USGS %d: %s", year, exc)
def check_usgs(usgs_dir: Path, today: date, oos_start: str) -> dict:
"""Determine USGS coverage and reliable end date."""
start_year = int(oos_start[:4])
end_year = today.year
available = []
total_events = 0
for year in range(start_year, end_year + 1):
p = usgs_dir / f"usgs-{year}.csv"
if p.exists() and p.stat().st_size > 0:
available.append(year)
try:
df = pd.read_csv(p, usecols=["time", "mag"])
total_events += len(df)
except Exception:
pass
reliable_end = today - timedelta(days=45)
return {
"status": "ok" if available else "missing",
"years_present": available,
"total_events": total_events,
"reliable_end": str(reliable_end),
"note": "Catalogue stability: complete within ~30 days; using today-45 days",
}
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
)
p.add_argument("--no-download", action="store_true",
help="Skip download attempts; check existing files only")
p.add_argument("--min-mag", type=float, default=4.5,
help="Minimum magnitude for USGS download (default 4.5)")
p.add_argument("--nmdb-dir", type=Path, default=PROJECT_ROOT/"data"/"raw"/"nmdb")
p.add_argument("--usgs-dir", type=Path, default=PROJECT_ROOT/"data"/"raw"/"usgs")
p.add_argument("--sidc-dir", type=Path, default=PROJECT_ROOT/"data"/"raw"/"sidc")
p.add_argument("--config", type=Path, default=PROJECT_ROOT/"config"/"stations.yaml")
p.add_argument("--output-dir", type=Path, default=PROJECT_ROOT/"results")
return p.parse_args()
def run(args: argparse.Namespace) -> dict:
args.output_dir.mkdir(parents=True, exist_ok=True)
today = date.today()
with open(args.config) as fh:
cfg = yaml.safe_load(fh)
station_ids = list(cfg["stations"].keys())
oos_start_year = int(OOS_START[:4])
# ------------------------------------------------------------------ #
# 1. Download missing data #
# ------------------------------------------------------------------ #
if not args.no_download:
logger.info("Downloading missing NMDB OOS data (%d-%d) …", oos_start_year, today.year)
download_nmdb_oos(station_ids, args.nmdb_dir, oos_start_year, today.year)
logger.info("Downloading missing USGS OOS data (%d-%d) …", oos_start_year, today.year)
download_usgs_oos(args.usgs_dir, oos_start_year, today.year, min_magnitude=args.min_mag)
logger.info("Downloading SIDC sunspot data …")
download_sidc(args.sidc_dir)
else:
logger.info("--no-download: skipping download, checking existing files only")
# ------------------------------------------------------------------ #
# 2. Check each source #
# ------------------------------------------------------------------ #
logger.info("Checking NMDB station coverage …")
nmdb_info = check_nmdb_stations(station_ids, args.nmdb_dir, OOS_START, today)
good_stations = [
sid for sid, info in nmdb_info.items()
if info.get("coverage_oos", 0) >= MIN_COVERAGE_FRACTION
]
flagged = [
sid for sid, info in nmdb_info.items()
if info.get("flag_gap") and info.get("coverage_oos", 0) > 0
]
nmdb_end = nmdb_reliable_end(nmdb_info, today)
usgs_info = check_usgs(args.usgs_dir, today, OOS_START)
sidc_info = check_sidc(args.sidc_dir, today)
usgs_end = date.fromisoformat(usgs_info["reliable_end"])
sidc_end = date.fromisoformat(sidc_info["reliable_end"]) if sidc_info.get("reliable_end") else today - timedelta(days=90)
common_end = min(nmdb_end, usgs_end, sidc_end)
constraining = {
"NMDB": nmdb_end,
"USGS": usgs_end,
"SIDC": sidc_end,
}
constrained_by = min(constraining, key=constraining.get)
# ------------------------------------------------------------------ #
# 3. Print summary #
# ------------------------------------------------------------------ #
print()
print("=" * 72)
print(" OUT-OF-SAMPLE DATA AVAILABILITY")
print(f" Run date: {today}")
print("=" * 72)
print(f"\n OOS window start: {OOS_START}")
print(f" NMDB reliable end: {nmdb_end} ({len(good_stations)} stations ≥{MIN_COVERAGE_FRACTION*100:.0f}% coverage)")
print(f" USGS reliable end: {usgs_end}")
print(f" SIDC reliable end: {sidc_end}")
print(f"\n *** Common reliable end: {common_end} (constrained by {constrained_by}) ***")
print(f"\n OOS window: {OOS_START}{common_end}")
print(f" Duration: {(date.fromisoformat(str(common_end)) - date.fromisoformat(OOS_START)).days} days")
print()
print(f" NMDB stations with ≥{MIN_COVERAGE_FRACTION*100:.0f}% OOS coverage ({len(good_stations)}):")
for sid in sorted(good_stations):
info = nmdb_info[sid]
flag = " *** GAP > 30d ***" if info.get("flag_gap") else ""
print(f" {sid:<8} coverage={info['coverage_oos']*100:5.1f}% last={info['last_date']}{flag}")
if flagged:
print(f"\n Stations with >30-day gap (may be offline): {', '.join(sorted(flagged))}")
print("=" * 72)
print()
# ------------------------------------------------------------------ #
# 4. Save JSON and text report #
# ------------------------------------------------------------------ #
payload = {
"run_date": str(today),
"oos_start": OOS_START,
"oos_end": str(common_end),
"constrained_by": constrained_by,
"nmdb_reliable_end": str(nmdb_end),
"usgs_reliable_end": str(usgs_end),
"sidc_reliable_end": str(sidc_end),
"good_stations_oos": sorted(good_stations),
"flagged_stations": sorted(flagged),
"nmdb_station_detail": {
sid: {k: v for k, v in info.items() if k != "flag_gap"}
for sid, info in nmdb_info.items()
},
"usgs_detail": usgs_info,
"sidc_detail": sidc_info,
}
json_path = args.output_dir / "data_availability.json"
json_path.write_text(json.dumps(payload, indent=2, default=str), encoding="utf-8")
logger.info("JSON saved: %s", json_path)
txt_lines = [
"OUT-OF-SAMPLE DATA AVAILABILITY REPORT",
f"Run date: {today}",
f"OOS window: {OOS_START}{common_end} (constrained by {constrained_by})",
"",
f"NMDB stations with ≥{MIN_COVERAGE_FRACTION*100:.0f}% OOS coverage: {len(good_stations)}",
*[
f" {sid:<8} coverage={nmdb_info[sid]['coverage_oos']*100:5.1f}% last={nmdb_info[sid]['last_date']}"
for sid in sorted(good_stations)
],
"",
f"USGS: years available = {usgs_info['years_present']} events = {usgs_info['total_events']:,}",
f"SIDC: last_date = {sidc_info.get('last_date')} (provisional note: {sidc_info.get('note', '')})",
]
txt_path = args.output_dir / "data_availability.txt"
txt_path.write_text("\n".join(txt_lines), encoding="utf-8")
logger.info("Text report saved: %s", txt_path)
return payload
if __name__ == "__main__":
run(_parse_args())