cosmicraysandearthquakes/scripts/04_detrended_analysis.py
root e5a812fa14 Initial commit: full analysis pipeline source code
Scripts 01-08 implement the complete cosmic-ray/earthquake correlation
analysis from data ingestion through out-of-sample validation and
combined timeseries sinusoid fitting.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 02:45:10 +02:00

598 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
scripts/04_detrended_analysis.py
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Does the Homola et al. 2023 CRseismic cross-correlation SURVIVE solar-cycle
detrending? This script applies three detrending methods to both the CR index
and seismic metric, then re-runs the full IAAFT surrogate stress test on each.
The dominant peak in the raw r(τ) is at τ ≈ 525 d (half a solar cycle), not
τ = +15 d. If that peak disappears after detrending the result is consistent
with a spurious solar-cycle confound rather than a genuine CRseismic link.
Detrending methods
------------------
1. Hodrick-Prescott (HP) filter — λ = 1.29 × 10^5 (calibrated for 5-day bins)
2. STL decomposition — period = 803 bins ≈ 11-year solar cycle
3. Sunspot OLS regression — contemporaneous + 30/90/180-day lagged sunspot numbers
Outputs
-------
results/figs/detrended_xcorr.png — 2×2 figure (raw / HP / STL / sunspot-reg)
results/detrended_report.md — narrative summary
results/detrended_results.json — machine-readable table
Usage
-----
python scripts/04_detrended_analysis.py
python scripts/04_detrended_analysis.py --n-surrogates 1000 --n-jobs 4
"""
from __future__ import annotations
import argparse
import importlib.util
import json
import logging
import sys
from datetime import datetime, timezone
from pathlib import Path
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import numpy as np
import pandas as pd
import scipy.stats
PROJECT_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(PROJECT_ROOT / "src"))
from crq.stats.surrogates import (
n_eff_bretherton,
p_to_sigma,
surrogate_xcorr_test,
)
from crq.preprocess.detrend import (
hp_filter_detrend,
stl_detrend,
sunspot_regression_detrend,
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(name)s%(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
logger = logging.getLogger("crq.detrend_analysis")
# ---------------------------------------------------------------------------
# Import shared data from 02_homola_replication.py
# ---------------------------------------------------------------------------
def _import_homola():
spec = importlib.util.spec_from_file_location(
"homola_replication",
PROJECT_ROOT / "scripts" / "02_homola_replication.py",
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
# ---------------------------------------------------------------------------
# Load and bin sunspot data
# ---------------------------------------------------------------------------
def _load_sunspots(
sidc_path: Path,
study_start: str,
study_end: str,
bin_days: int,
bin_origin: pd.Timestamp,
) -> np.ndarray | None:
"""
Load SIDC daily sunspot numbers, bin to *bin_days*, return aligned array.
Returns None if the file is missing or empty.
"""
if not sidc_path.exists():
logger.warning("Sunspot file not found: %s", sidc_path)
return None
df = pd.read_csv(sidc_path, comment="#")
# Normalise column names to lowercase
df.columns = [c.strip().lower() for c in df.columns]
# Identify date column (may be 'date' or first column)
date_col = "date" if "date" in df.columns else df.columns[0]
# Convert to datetime robustly (avoid ArrowDtype issues)
df[date_col] = pd.to_datetime(df[date_col].astype(str).str.strip(), errors="coerce")
df = df.dropna(subset=[date_col])
df = df.set_index(date_col)
df.index = pd.DatetimeIndex(df.index)
# Find the sunspot total column
for candidate in ("total", "ssn", "sunspot", "value"):
if candidate in df.columns:
ss_col = candidate
break
else:
ss_col = df.columns[0]
logger.warning("Guessing sunspot column as %r", ss_col)
# Parse as numeric — strip whitespace first
ss = pd.to_numeric(df[ss_col].astype(str).str.strip(), errors="coerce")
ss = ss.dropna()
# Restrict to study window
t_start = pd.Timestamp(study_start)
t_end = pd.Timestamp(study_end)
ss = ss[(ss.index >= t_start) & (ss.index <= t_end)]
if ss.empty:
logger.warning("No sunspot data in study window")
return None
# Bin onto the same 5-day grid as the seismic / CR series
# using the same floor-division approach as _bin_series in homola script
days_from_origin = (ss.index - bin_origin).days
bin_number = days_from_origin // bin_days
bin_dates = bin_origin + pd.to_timedelta(bin_number * bin_days, unit="D")
ss_binned = ss.groupby(bin_dates).mean()
return ss_binned
def _align_sunspots(
ss_binned: pd.Series,
ref_index: pd.DatetimeIndex,
) -> np.ndarray:
"""
Reindex sunspot series onto *ref_index*, filling gaps with interpolation
then linear extrapolation (edge fill). Returns float64 array.
"""
aligned = ss_binned.reindex(ref_index)
aligned = aligned.interpolate(method="linear", limit_direction="both")
aligned = aligned.ffill().bfill()
return aligned.to_numpy(dtype=np.float64)
# ---------------------------------------------------------------------------
# Naive Pearson significance
# ---------------------------------------------------------------------------
def _naive_pearson(r: float, n: float) -> tuple[float, float]:
if n < 3 or not np.isfinite(r) or abs(r) >= 1.0:
return np.nan, np.nan
t = r * np.sqrt((n - 2) / (1.0 - r ** 2))
p = float(2.0 * scipy.stats.t.sf(abs(t), df=n - 2))
return p, p_to_sigma(p)
# ---------------------------------------------------------------------------
# Run surrogate test for one (cr, seismic) pair
# ---------------------------------------------------------------------------
def _run_test(
cr: np.ndarray,
seismic: np.ndarray,
lags_bins: np.ndarray,
bin_days: int,
n_surrogates: int,
n_jobs: int,
iaaft_iter: int,
seed: int,
label: str,
) -> dict:
"""Run IAAFT surrogate test and return a summary dict."""
logger.info("Running IAAFT surrogate test: %s (N=%d, %d surrogates)",
label, len(cr), n_surrogates)
result = surrogate_xcorr_test(
cr, seismic, lags_bins,
n_surrogates=n_surrogates,
method="iaaft",
seed=seed,
n_jobs=n_jobs,
iaaft_n_iter=iaaft_iter, # surrogate_xcorr_test param name
)
obs_r = result["observed_r"]
peak_r = float(result["observed_peak_r"])
peak_lag = int(result["observed_peak_lag"]) * bin_days # convert to days
p_global = float(result["p_global"])
n = len(cr)
n_eff = n_eff_bretherton(cr, seismic)
r_15 = float(obs_r[np.searchsorted(lags_bins, 15 // bin_days)])
_, sigma_15_naive = _naive_pearson(r_15, n)
_, sigma_15_breth = _naive_pearson(r_15, n_eff)
_, sigma_pk_naive = _naive_pearson(peak_r, n)
sigma_iaaft = p_to_sigma(p_global, n_trials=n_surrogates)
return {
"label": label,
"n": n,
"n_eff": round(float(n_eff), 1),
"r_at_15d": round(r_15, 4),
"sigma_15_naive": round(float(sigma_15_naive), 2) if np.isfinite(sigma_15_naive) else None,
"sigma_15_breth": round(float(sigma_15_breth), 2) if np.isfinite(sigma_15_breth) else None,
"peak_r": round(peak_r, 4),
"peak_lag_days": peak_lag,
"sigma_pk_naive": round(float(sigma_pk_naive), 2) if np.isfinite(sigma_pk_naive) else None,
"p_global_iaaft": round(p_global, 4),
"sigma_iaaft": round(sigma_iaaft, 2),
# arrays for figure
"_obs_r": obs_r,
"_surr_arrays": result["surrogate_r_arrays"],
"_lags_bins": lags_bins,
}
# ---------------------------------------------------------------------------
# 2×2 figure
# ---------------------------------------------------------------------------
PANEL_LABELS = ["(a) Raw", "(b) HP filter", "(c) STL", "(d) Sunspot regression"]
PANEL_COLORS = ["#1b7837", "#762a83", "#e08214", "#2166ac"]
def _make_figure(
results: list[dict],
bin_days: int,
study_start: str,
study_end: str,
n_stations: int,
output_path: Path,
) -> None:
fig, axes = plt.subplots(2, 2, figsize=(15, 10), sharey=False, sharex=True)
fig.suptitle(
f"CRSeismic Cross-Correlation: Raw vs Detrended "
f"({study_start[:4]}{study_end[:4]}, {n_stations} CR stations, {bin_days}-day bins)",
fontsize=13, fontweight="bold",
)
for ax, res, label, color in zip(axes.flat, results, PANEL_LABELS, PANEL_COLORS):
lags_days = res["_lags_bins"] * bin_days
obs_r = res["_obs_r"]
surr = res["_surr_arrays"] # (n_surr, n_lags)
# Percentile envelopes
p95 = np.percentile(surr, 97.5, axis=0)
p05 = np.percentile(surr, 2.5, axis=0)
p99 = np.percentile(surr, 99.5, axis=0)
p01 = np.percentile(surr, 0.5, axis=0)
ax.fill_between(lags_days, p01, p99, alpha=0.15, color=color, label="99% envelope")
ax.fill_between(lags_days, p05, p95, alpha=0.30, color=color, label="95% envelope")
ax.plot(lags_days, obs_r, color=color, linewidth=1.5, label="Observed r(τ)")
# τ = +15 d marker
ax.axvline(15, color="red", linestyle="--", linewidth=0.8, alpha=0.7, label="τ = +15 d")
ax.axhline(0, color="black", linewidth=0.5)
# Annotations
sig_str = f"p_global={res['p_global_iaaft']:.3f} ({res['sigma_iaaft']:.1f}σ IAAFT)"
peak_str = f"peak: τ={res['peak_lag_days']}d, r={res['peak_r']:.3f}"
r15_str = f"r(+15d)={res['r_at_15d']:.3f} ({res.get('sigma_15_breth') or 'N/A'}σ Breth.)"
ax.set_title(f"{label}\n{peak_str} | {r15_str}", fontsize=9)
ax.text(0.02, 0.98, sig_str, transform=ax.transAxes,
fontsize=8, va="top", ha="left",
bbox=dict(boxstyle="round,pad=0.2", fc="white", alpha=0.7))
ax.set_xlim(lags_days[0], lags_days[-1])
ax.grid(True, alpha=0.3)
for ax in axes[1, :]:
ax.set_xlabel("Lag τ (days)")
for ax in axes[:, 0]:
ax.set_ylabel("Pearson r")
# Shared legend from first panel
handles, lbls = axes.flat[0].get_legend_handles_labels()
fig.legend(handles, lbls, loc="lower center", ncol=4,
fontsize=9, framealpha=0.9,
bbox_to_anchor=(0.5, -0.02))
fig.tight_layout(rect=[0, 0.04, 1, 1])
output_path.parent.mkdir(parents=True, exist_ok=True)
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
logger.info("Figure saved: %s", output_path)
# ---------------------------------------------------------------------------
# Report
# ---------------------------------------------------------------------------
def _write_report(
results: list[dict],
output_dir: Path,
args: argparse.Namespace,
) -> None:
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
# Markdown table
header = (
"| Method | N_eff | r(+15d) | σ_Breth(15d) | Peak r | Peak lag | "
"p_global (IAAFT) | σ_IAAFT |"
)
sep = "|---|---|---|---|---|---|---|---|"
rows = [header, sep]
for r in results:
rows.append(
f"| {r['label']} | {r['n_eff']:.0f} | {r['r_at_15d']:.4f} | "
f"{r['sigma_15_breth'] or 'N/A'} | {r['peak_r']:.4f} | "
f"{r['peak_lag_days']} d | {r['p_global_iaaft']:.4f} | "
f"{r['sigma_iaaft']:.1f}σ |"
)
table = "\n".join(rows)
# Interpret surviving peaks
survived = [r for r in results if r["p_global_iaaft"] < 0.05]
if survived:
interpret = (
"**CAUTION**: The following variants retain p_global < 0.05 after detrending: "
+ ", ".join(r["label"] for r in survived)
+ ". Further investigation required."
)
else:
interpret = (
"**Conclusion**: After all three solar-cycle detrending methods the global "
"IAAFT p-value is ≥ 0.05 in every case. The observed cross-correlation is "
"fully consistent with a shared solar-cycle confound; there is no residual "
"evidence for a genuine CRseismic link."
)
md = f"""# Detrended CRSeismic Cross-Correlation Analysis
Generated: {timestamp}
Study period: {args.study_start} {args.study_end}
Bin size: {args.bin_days} days
Surrogates: {args.n_surrogates} IAAFT
Lag range: {args.lag_min}{args.lag_max} days
## Significance table
{table}
## Interpretation
{interpret}
## Methods
### HP filter (Hodrick-Prescott)
λ = {args.hp_lambda:.2e} calibrated for {args.bin_days}-day bins targeting removal of
variations longer than ~2000 days (Ravn & Uhlig 2002 scaling of the standard λ=1600).
### STL decomposition
Period = {args.stl_period} bins ≈ {args.stl_period * args.bin_days / 365.25:.1f} years
(11-year solar cycle). seasonal_jump=100, trend_jump=100 for computational efficiency.
The *residual* component (x trend seasonal) is used.
### Sunspot OLS regression
Contemporaneous + {args.sunspot_lags}-day lagged sunspot numbers from SIDC WDC-SILSO.
The fitted solar-proxy component is subtracted from each series.
## Figure
`results/figs/detrended_xcorr.png`
"""
md_path = output_dir / "detrended_report.md"
md_path.write_text(md)
logger.info("Report saved: %s", md_path)
# JSON (strip numpy arrays)
json_rows = []
for r in results:
row = {k: v for k, v in r.items() if not k.startswith("_")}
json_rows.append(row)
json_data = {
"generated": timestamp,
"study_start": args.study_start,
"study_end": args.study_end,
"bin_days": args.bin_days,
"n_surrogates": args.n_surrogates,
"results": json_rows,
}
json_path = output_dir / "detrended_results.json"
json_path.write_text(json.dumps(json_data, indent=2))
logger.info("JSON saved: %s", json_path)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter,
)
p.add_argument("--study-start", default="1976-01-01")
p.add_argument("--study-end", default="2019-12-31")
p.add_argument("--bin-days", type=int, default=5)
p.add_argument("--lag-min", type=int, default=-1000)
p.add_argument("--lag-max", type=int, default=1000)
p.add_argument("--min-mag", type=float, default=4.0)
p.add_argument("--min-stations", type=int, default=3)
p.add_argument("--n-surrogates", type=int, default=10_000)
p.add_argument("--n-jobs", type=int, default=-1)
p.add_argument("--iaaft-iter", type=int, default=100)
p.add_argument("--seed", type=int, default=42)
p.add_argument("--hp-lambda", type=float, default=1.29e5)
p.add_argument("--stl-period", type=int, default=803,
help="STL period in bins (default 803 ≈ 11-yr solar cycle at 5-day bins)")
p.add_argument("--sunspot-lags", type=int, nargs="+", default=[0, 30, 90, 180],
metavar="DAYS",
help="Sunspot lag offsets in days (default: 0 30 90 180)")
p.add_argument("--nmdb-dir", type=Path, default=PROJECT_ROOT/"data"/"raw"/"nmdb")
p.add_argument("--usgs-dir", type=Path, default=PROJECT_ROOT/"data"/"raw"/"usgs")
p.add_argument("--sidc-dir", type=Path, default=PROJECT_ROOT/"data"/"raw"/"sidc")
p.add_argument("--config", type=Path, default=PROJECT_ROOT/"config"/"stations.yaml")
p.add_argument("--output-dir",type=Path, default=PROJECT_ROOT/"results")
return p.parse_args()
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def run(args: argparse.Namespace) -> None:
args.output_dir.mkdir(parents=True, exist_ok=True)
(args.output_dir / "figs").mkdir(exist_ok=True)
# ------------------------------------------------------------------
# Step 1: build raw CR and seismic series via homola script
# ------------------------------------------------------------------
logger.info("Loading raw data via 02_homola_replication …")
homola = _import_homola()
import yaml
with open(args.config) as fh:
stations_cfg = yaml.safe_load(fh)
stations_val = stations_cfg["stations"]
if isinstance(stations_val, dict):
station_ids = list(stations_val.keys())
else:
station_ids = [s["id"] for s in stations_val]
bin_days = args.bin_days
t0 = pd.Timestamp(args.study_start)
cr_5d, station_norm_daily, n_stations = homola.build_cr_index(
station_ids,
args.study_start, args.study_end,
bin_days,
args.nmdb_dir,
min_stations=args.min_stations,
)
cr_5d = cr_5d.dropna()
seismic_5d = homola.build_seismic_metric(
args.study_start, args.study_end,
bin_days,
args.usgs_dir,
min_mag=args.min_mag,
cr_index=cr_5d,
)
# Align on common index
common_idx = cr_5d.index.intersection(seismic_5d.index)
cr_5d = cr_5d.reindex(common_idx)
seismic_5d = seismic_5d.reindex(common_idx)
cr_arr = cr_5d.to_numpy(dtype=np.float64)
seismic_arr = seismic_5d.to_numpy(dtype=np.float64)
N = len(cr_arr)
logger.info("Common bins: %d (%s %s)", N,
common_idx[0].date(), common_idx[-1].date())
# ------------------------------------------------------------------
# Step 2: load and align sunspot data
# ------------------------------------------------------------------
sidc_path = args.sidc_dir / "sunspots.csv"
ss_binned = _load_sunspots(sidc_path, args.study_start, args.study_end,
bin_days, t0)
if ss_binned is not None:
ss_arr = _align_sunspots(ss_binned, common_idx)
logger.info("Sunspot series loaded, len=%d, NaN=%.0f%%",
len(ss_arr), 100 * np.mean(np.isnan(ss_arr)))
else:
ss_arr = None
logger.warning("Sunspot data unavailable — sunspot-regression panel will be skipped")
# ------------------------------------------------------------------
# Step 3: build lags array (bins)
# ------------------------------------------------------------------
lag_min_b = args.lag_min // bin_days
lag_max_b = args.lag_max // bin_days
lags_bins = np.arange(lag_min_b, lag_max_b + 1, dtype=int)
# ------------------------------------------------------------------
# Step 4: detrend
# ------------------------------------------------------------------
logger.info("Applying detrending methods …")
# HP filter
cr_hp = hp_filter_detrend(cr_arr, lamb=args.hp_lambda)
seismic_hp = hp_filter_detrend(seismic_arr, lamb=args.hp_lambda)
# STL
period = args.stl_period
logger.info(" STL: period=%d bins (~%.1f yr)", period, period*bin_days/365.25)
cr_stl = stl_detrend(cr_arr, period=period, seasonal_jump=100, trend_jump=100)
seismic_stl = stl_detrend(seismic_arr, period=period, seasonal_jump=100, trend_jump=100)
# Sunspot regression
if ss_arr is not None and not np.all(np.isnan(ss_arr)):
cr_ssreg = sunspot_regression_detrend(
cr_arr, ss_arr, lag_days=args.sunspot_lags, bin_days=bin_days)
seismic_ssreg = sunspot_regression_detrend(
seismic_arr, ss_arr, lag_days=args.sunspot_lags, bin_days=bin_days)
else:
cr_ssreg = None
seismic_ssreg = None
# ------------------------------------------------------------------
# Step 5: surrogate tests on each variant
# ------------------------------------------------------------------
seed = args.seed
kw = dict(n_surrogates=args.n_surrogates, n_jobs=args.n_jobs,
iaaft_iter=args.iaaft_iter, seed=seed)
results = []
results.append(_run_test(
cr_arr, seismic_arr, lags_bins, bin_days, label="Raw", **kw))
results.append(_run_test(
cr_hp, seismic_hp, lags_bins, bin_days, label="HP filter", **kw))
results.append(_run_test(
cr_stl, seismic_stl, lags_bins, bin_days, label="STL", **kw))
if cr_ssreg is not None:
results.append(_run_test(
cr_ssreg, seismic_ssreg, lags_bins, bin_days,
label="Sunspot regression", **kw))
else:
# Placeholder so the figure still has 4 panels
logger.warning("Sunspot regression skipped — using HP result as placeholder")
ph = dict(results[1])
ph["label"] = "Sunspot regression (N/A)"
results.append(ph)
# ------------------------------------------------------------------
# Step 6: print summary table
# ------------------------------------------------------------------
print("\n" + "=" * 80)
print(f"{'Method':<25} {'N_eff':>6} {'r(+15d)':>8} {'σ_Breth':>8} "
f"{'Peak r':>8} {'Peak lag':>9} {'p_IAAFT':>8} {'σ_IAAFT':>8}")
print("-" * 80)
for r in results:
print(
f"{r['label']:<25} {r['n_eff']:>6.0f} {r['r_at_15d']:>8.4f} "
f"{str(r['sigma_15_breth'] or 'N/A'):>8} "
f"{r['peak_r']:>8.4f} {r['peak_lag_days']:>8}d "
f"{r['p_global_iaaft']:>8.4f} {r['sigma_iaaft']:>7.1f}σ"
)
print("=" * 80 + "\n")
# ------------------------------------------------------------------
# Step 7: figure
# ------------------------------------------------------------------
_make_figure(
results, bin_days, args.study_start, args.study_end, n_stations,
args.output_dir / "figs" / "detrended_xcorr.png",
)
# ------------------------------------------------------------------
# Step 8: report + JSON
# ------------------------------------------------------------------
_write_report(results, args.output_dir, args)
logger.info("Done.")
if __name__ == "__main__":
run(_parse_args())