cosmicraysandearthquakes/scripts/11_additional_robustness.py
root 1d01fcf102 Add seven additional robustness checks (3a–3g) and update paper
3a  Block-bootstrap surrogates (B=804 bins≈11 yr, n=5000): raw r(+15d)
    p=0.022 — marginally significant on undetrended series, driven by
    the shared solar-cycle trend not a causal signal.

3b  Partial correlation (sunspot-regressed, no filter): r(+15d) drops
    from 0.079 to 0.029 (63%) — confirms solar-cycle confounding without
    preprocessing circularity.

3c  Spectral coherence in solar-cycle band = 0.840 (>0.776 threshold);
    kNN mutual information at τ=+15d = 0.000 nats (p=1.000) — no
    nonlinear dependence at the claimed lag.

3d  Missing-data impact: 0% NaN at station thresholds 2/3/5; r(+15d)
    unchanged — missing data is not a confound.

3e  Bin-size sensitivity: dominant peak at τ≈-520d for 1/5/27-day bins;
    r at +15d scales with bin size (solar-cycle leakage, not physical).

3f  GK declustering removes 28.4% of events as aftershocks; r(+15d)
    changes by only Δ=0.014 — aftershock clustering not a confound.

3g  Per-solar-cycle analysis (cycles 21–24): r(+15d) all positive
    (0.018–0.073) but dominant peak lags scattered (-65/-125/+125/-125d)
    — phase-drifting solar-cycle artefact, not physical precursor.

Script fixes: vectorised block bootstrap (401×5000 → NumPy matmul),
  kNN MI with sorted searchsorted (O(N log N), no inflated values),
  coherence nperseg 512→2048 (resolves solar-cycle band), axhspan→axvspan.

Paper: new Methods subsections (block bootstrap, partial correlation,
  nonlinear dependence); new Results subsections for each check; updated
  Conclusions with 7-item robustness summary; Kraskov2004 and
  GardnerKnopoff1974 added to refs.bib; Homola2023 updated to arXiv
  preprint 2204.12310; eq:energy duplicate label fixed. PDF: 35 pages.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-24 20:43:08 +02:00

977 lines
37 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
scripts/11_additional_robustness.py
====================================
Seven additional robustness checks for the CRseismic correlation analysis.
3a Block-bootstrap surrogates vs IAAFT null
Circular block bootstrap (block ≈ 1 solar cycle = 803 bins ≈ 11 yr at 5-day
resolution), 5 000 surrogates; compare p-values to stored IAAFT results.
3b Partial correlation controlling for sunspots (raw, unfiltered data).
Seismic_resid = seismic β·sunspots (OLS); compute r(τ) between CR and
Seismic_resid without any HP-filter preprocessing.
3c Spectral coherence + mutual information (kNN estimator).
Magnitude-squared coherence at each frequency; MI at lag=0 and lag=+3 bins
(+15 d) vs shuffle null (1 000 permutations).
3d Missing-data impact.
Fraction of NaN bins per station threshold (2, 3, 5), temporal clustering of
NaN bins near solar maxima, and r(+15 d) sensitivity.
3e Bin-size sensitivity (1-day, 5-day, 27-day Bartels rotation bins).
3f GardnerKnopoff earthquake declustering.
Remove aftershock sequences; recompute seismic energy from mainshock-only
catalogue; compare r(τ) to the clustered result.
3g Sub-period analysis (per-solar-cycle cross-correlations).
Cycles 2124 (≈ 19762019); independent r(τ) per cycle.
Outputs
-------
results/additional_robustness.json — machine-readable summary
results/figs/block_bootstrap_null.png — 3a
results/figs/partial_correlation.png — 3b
results/figs/spectral_coherence.png — 3c
results/figs/bin_size_sensitivity.png — 3e
results/figs/declustered_xcorr.png — 3f
results/figs/solar_cycle_xcorr.png — 3g
(3d is table-only, included in JSON)
"""
from __future__ import annotations
import json
import logging
import sys
import warnings
from pathlib import Path
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.signal
import scipy.spatial
import scipy.stats
import yaml
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT / "src"))
from crq.ingest.nmdb import load_station, resample_daily
from crq.ingest.usgs import load_usgs, seismic_energy_per_bin
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger(__name__)
warnings.filterwarnings("ignore", category=RuntimeWarning)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
STUDY_START = "1976-01-01"
STUDY_END = "2019-12-31"
BIN_DAYS = 5
MIN_MAG = 4.5
COV_THRESH = 0.60
MIN_STATIONS = 3
SEED = 42
LAG_BINS = np.arange(-200, 205, 1) # ±1000 d in 5-d steps
CLAIM_BIN = 3 # +15 d = bin 3
OUT_DIR = ROOT / "results"
FIG_DIR = ROOT / "results" / "figs"
NMDB_DIR = ROOT / "data" / "raw" / "nmdb"
USGS_DIR = ROOT / "data" / "raw" / "usgs"
CFG_FILE = ROOT / "config" / "stations.yaml"
SN_FILE = ROOT / "data" / "raw" / "sidc" / "sunspots.csv"
FIG_DIR.mkdir(parents=True, exist_ok=True)
SOLAR_CYCLES = {
21: ("1976-03-01", "1986-09-01"),
22: ("1986-09-01", "1996-08-01"),
23: ("1996-08-01", "2008-12-01"),
24: ("2008-12-01", "2019-12-31"),
}
# ---------------------------------------------------------------------------
# Shared utilities
# ---------------------------------------------------------------------------
def _station_names() -> list[str]:
with open(CFG_FILE) as fh:
return list(yaml.safe_load(fh)["stations"].keys())
def _bin_daily(daily: pd.Series, t0: pd.Timestamp, bin_days: int,
agg: str = "mean") -> pd.Series:
days = (daily.index - t0).days
bin_num = days // bin_days
bin_dates = t0 + pd.to_timedelta(bin_num * bin_days, unit="D")
g = daily.groupby(bin_dates)
return g.sum() if agg == "sum" else g.mean()
def _load_cr(study_start: str, study_end: str,
bin_days: int = BIN_DAYS,
min_stations: int = MIN_STATIONS) -> tuple[pd.Series, pd.DataFrame]:
"""Return (cr_index, station_daily_norm)."""
t0 = pd.Timestamp(study_start)
t1 = pd.Timestamp(study_end)
norm: dict[str, pd.Series] = {}
for stn in _station_names():
hourly = load_station(stn, t0.year, t1.year, NMDB_DIR)
if hourly.empty:
continue
daily = resample_daily(hourly, stn, coverage_threshold=COV_THRESH)[stn]
daily = daily.loc[study_start:study_end]
if daily.notna().sum() < 30:
continue
mu = daily.mean()
if not (np.isfinite(mu) and mu > 0):
continue
norm[stn] = daily / mu
if not norm:
raise RuntimeError("No NMDB data.")
mat = pd.DataFrame(norm)
n_valid = mat.notna().sum(axis=1)
global_daily = mat.mean(axis=1)
global_daily[n_valid < min_stations] = np.nan
cr = _bin_daily(global_daily, t0, bin_days)
cr.name = "cr_index"
log.info("CR (%d-d bins, min_stn=%d): %d bins, %d stations",
bin_days, min_stations, len(cr), len(norm))
return cr, mat
def _load_seismic_events(study_start: str, study_end: str,
min_mag: float = MIN_MAG) -> pd.DataFrame:
"""Return raw USGS event DataFrame with latitude, longitude, mag columns."""
t0 = pd.Timestamp(study_start)
t1 = pd.Timestamp(study_end)
ev = load_usgs(t0.year, t1.year, USGS_DIR)
ev = ev.loc[study_start:study_end]
ev = ev[ev["mag"] >= min_mag].copy()
log.info("Events M≥%.1f: %d in %s%s", min_mag, len(ev),
study_start, study_end)
return ev
def _energy_metric(events: pd.DataFrame, study_start: str, study_end: str,
bin_days: int, ref_index: pd.DatetimeIndex | None = None,
min_mag: float = MIN_MAG) -> pd.Series:
t0 = pd.Timestamp(study_start)
s = seismic_energy_per_bin(events, study_start, study_end, bin_days, t0,
min_mag=min_mag)
if ref_index is not None:
s = s.reindex(ref_index, fill_value=float(s.min()))
else:
s = s.fillna(float(s.min()))
return s
def _load_sunspots_binned(study_start: str, study_end: str,
bin_days: int = BIN_DAYS) -> pd.Series:
"""Parse the KSO comma-sep sunspot CSV → 5-day binned daily mean."""
df = pd.read_csv(SN_FILE, header=0)
df.columns = [c.strip() for c in df.columns]
df["date"] = pd.to_datetime(df["Date"].str.strip(), errors="coerce")
df = df.dropna(subset=["date"]).set_index("date")
sn = pd.to_numeric(df["Total"].astype(str).str.strip(), errors="coerce")
sn = sn.loc[study_start:study_end].rename("sunspot")
# Resample to daily (data is already monthly-ish — forward-fill)
daily_idx = pd.date_range(study_start, study_end, freq="D")
sn = sn.reindex(daily_idx).interpolate("linear").ffill().bfill()
t0 = pd.Timestamp(study_start)
return _bin_daily(sn, t0, bin_days)
def xcorr(x: np.ndarray, y: np.ndarray, lag_bins: np.ndarray) -> np.ndarray:
"""Pearson r(τ) for each lag bin; τ>0 means x leads y."""
N = len(x)
rs = np.full(len(lag_bins), np.nan)
for i, lag in enumerate(lag_bins):
if lag >= 0:
xa, ya = x[:N - lag], y[lag:]
else:
absl = -lag
xa, ya = x[absl:], y[:N - absl]
ok = np.isfinite(xa) & np.isfinite(ya)
n = ok.sum()
if n >= 10:
xo = xa[ok] - xa[ok].mean()
yo = ya[ok] - ya[ok].mean()
denom = np.sqrt((xo ** 2).sum() * (yo ** 2).sum())
if denom > 1e-12:
rs[i] = np.dot(xo, yo) / denom
return rs
# ---------------------------------------------------------------------------
# 3a — Block-bootstrap surrogates
# ---------------------------------------------------------------------------
def _circular_block_bootstrap(
x: np.ndarray, y: np.ndarray,
block_len: int,
n_surr: int,
lag_bins: np.ndarray,
seed: int = SEED,
) -> tuple[np.ndarray, np.ndarray]:
"""
Circular block bootstrap (CBB) null distribution.
Independently resample *x* and *y* with replacement using blocks of length
*block_len* (circular, so the series wraps). Returns arrays of shape
(n_surr,) containing the surrogate r(+15d) and peak |r| values.
Vectorised: all surrogates are built into a matrix (n_surr × N) and
the correlation is computed for all surrogates simultaneously per lag,
avoiding a 5000 × 401 = 2 M Python-loop iterations.
"""
rng = np.random.default_rng(seed)
N = len(x)
n_blocks = int(np.ceil(N / block_len))
n_lags = len(lag_bins)
# Build surrogate matrices (n_surr × N) — Python loop only over n_surr
log.info(" Building %d surrogate series (block_len=%d) …", n_surr, block_len)
xs_mat = np.empty((n_surr, N), dtype=np.float64)
ys_mat = np.empty((n_surr, N), dtype=np.float64)
for i in range(n_surr):
starts_x = rng.integers(0, N, size=n_blocks)
starts_y = rng.integers(0, N, size=n_blocks)
xs_mat[i] = np.concatenate([np.roll(x, -s)[:block_len] for s in starts_x])[:N]
ys_mat[i] = np.concatenate([np.roll(y, -s)[:block_len] for s in starts_y])[:N]
# Vectorised xcorr: loop over lags only (all surrogates in parallel)
log.info(" Vectorised xcorr over %d lags …", n_lags)
rs_mat = np.full((n_surr, n_lags), np.nan, dtype=np.float64)
for j, lag in enumerate(lag_bins):
if lag >= 0:
xa = xs_mat[:, :N - lag] if lag > 0 else xs_mat
ya = ys_mat[:, lag:] if lag > 0 else ys_mat
else:
absl = -lag
xa = xs_mat[:, absl:]
ya = ys_mat[:, :N - absl]
xm = xa - xa.mean(axis=1, keepdims=True)
ym = ya - ya.mean(axis=1, keepdims=True)
num = (xm * ym).sum(axis=1)
denom = np.sqrt((xm ** 2).sum(axis=1) * (ym ** 2).sum(axis=1))
valid = denom > 1e-12
rs_mat[valid, j] = num[valid] / denom[valid]
claim_idx = np.where(lag_bins == CLAIM_BIN)[0]
rs_15 = rs_mat[:, claim_idx[0]] if len(claim_idx) else np.full(n_surr, np.nan)
rs_pk = np.nanmax(np.abs(rs_mat), axis=1)
return rs_15, rs_pk
def run_3a(cr: pd.Series, sei: pd.Series) -> dict:
log.info("=== 3a: Block-bootstrap surrogates ===")
x = cr.values.copy()
y = sei.values.copy()
# Fill NaN with series mean for bootstrap (preserves length)
x = np.where(np.isfinite(x), x, np.nanmean(x))
y = np.where(np.isfinite(y), y, np.nanmean(y))
N = len(x)
# Block length ≈ 1 solar cycle (11 yr ≈ 4016 d ÷ 5 d/bin = 803 bins)
block_len = int(round(11 * 365.25 / BIN_DAYS))
log.info(" N=%d block_len=%d bins (≈%.1f yr) surrogates=5000",
N, block_len, block_len * BIN_DAYS / 365.25)
rs_15, rs_pk = _circular_block_bootstrap(x, y, block_len, 5000, LAG_BINS)
obs_rv = xcorr(x, y, LAG_BINS)
obs_15 = obs_rv[LAG_BINS == CLAIM_BIN][0]
obs_pk = np.nanmax(np.abs(obs_rv))
p_15 = float(np.mean(np.abs(rs_15) >= np.abs(obs_15)))
p_pk = float(np.mean(rs_pk >= obs_pk))
ci95_15 = (float(np.percentile(rs_15, 2.5)), float(np.percentile(rs_15, 97.5)))
log.info(" obs r(+15d)=%.4f block-bootstrap p(+15d)=%.4f p(peak)=%.4f",
obs_15, p_15, p_pk)
# Load IAAFT p-value for comparison
iaaft_raw = None
try:
with open(OUT_DIR / "detrended_results.json") as fh:
det = json.load(fh)
iaaft_raw = det["results"][0]["p_global_iaaft"]
except Exception:
pass
# Figure
fig, axes = plt.subplots(1, 2, figsize=(11, 4))
axes[0].hist(rs_15, bins=60, color="steelblue", alpha=0.7,
label="Block-bootstrap null")
axes[0].axvline(obs_15, color="red", lw=2, label=f"Observed r={obs_15:.3f}")
axes[0].axvline(ci95_15[0], color="grey", ls="--")
axes[0].axvline(ci95_15[1], color="grey", ls="--", label="95% CI")
axes[0].set_xlabel("r(+15 d)")
axes[0].set_title(f"Block bootstrap null\np(+15d) = {p_15:.3f}")
axes[0].legend(fontsize=8)
axes[1].hist(rs_pk, bins=60, color="darkorange", alpha=0.7,
label="Block-bootstrap null")
axes[1].axvline(obs_pk, color="red", lw=2, label=f"Observed peak |r|={obs_pk:.3f}")
axes[1].set_xlabel("Peak |r|")
axes[1].set_title(f"Peak |r| null\np(peak) = {p_pk:.3f}")
axes[1].legend(fontsize=8)
if iaaft_raw is not None:
axes[0].set_xlabel("r(+15 d)" + f"\n(IAAFT p_global={iaaft_raw:.4f})")
fig.tight_layout()
fig.savefig(FIG_DIR / "block_bootstrap_null.png", dpi=150, bbox_inches="tight")
plt.close(fig)
log.info("3a figure saved.")
return dict(
block_len_bins=block_len,
obs_r_15d=float(obs_15),
obs_peak_r=float(obs_pk),
p_block_15d=p_15,
p_block_peak=p_pk,
ci95_15d=list(ci95_15),
iaaft_p_global_raw=iaaft_raw,
interpretation=(
"p-value similar to IAAFT" if iaaft_raw is None or abs(p_15 - iaaft_raw) < 0.1
else "p-value changes materially vs IAAFT"
),
)
# ---------------------------------------------------------------------------
# 3b — Partial correlation controlling for sunspots (raw data)
# ---------------------------------------------------------------------------
def run_3b(cr: pd.Series, sei: pd.Series) -> dict:
log.info("=== 3b: Partial correlation (sunspot control) ===")
sn = _load_sunspots_binned(STUDY_START, STUDY_END)
# Align all three series
idx = cr.index.intersection(sei.index).intersection(sn.index)
x = cr.reindex(idx).values.astype(float)
y = sei.reindex(idx).values.astype(float)
z = sn.reindex(idx).values.astype(float)
ok = np.isfinite(x) & np.isfinite(y) & np.isfinite(z)
log.info(" Aligned N=%d, valid=%d", len(x), ok.sum())
# OLS: regress seismic on sunspot
z_ok = z[ok]; y_ok = y[ok]; x_ok = x[ok]
slope, intercept, *_ = scipy.stats.linregress(z_ok, y_ok)
y_resid = y_ok - (slope * z_ok + intercept)
log.info(" Sunspot regression on seismic: slope=%.4f intercept=%.4f", slope, intercept)
r_partial, pv = scipy.stats.pearsonr(x_ok, y_resid)
log.info(" Partial r(CR, Seismic|Sunspot) at zero lag = %.4f p=%.4e",
r_partial, pv)
# Full cross-correlation of CR vs seismic residual
x_full = np.full(len(idx), np.nan)
y_res_full = np.full(len(idx), np.nan)
x_full[ok] = x_ok
y_res_full[ok] = y_resid
rv_partial = xcorr(x_full, y_res_full, LAG_BINS)
rv_raw = xcorr(x, y, LAG_BINS)
r_partial_15 = rv_partial[LAG_BINS == CLAIM_BIN][0]
r_raw_15 = rv_raw[LAG_BINS == CLAIM_BIN][0]
peak_partial = float(np.nanmax(np.abs(rv_partial)))
log.info(" r(+15d) raw=%.4f partial=%.4f", r_raw_15, r_partial_15)
# Figure
fig, ax = plt.subplots(figsize=(9, 4))
lags_d = LAG_BINS * BIN_DAYS
ax.plot(lags_d, rv_raw, color="steelblue", alpha=0.7, label="Raw seismic")
ax.plot(lags_d, rv_partial, color="darkorange", lw=1.5,
label="Seismic residual (sunspot removed)")
ax.axhline(0, color="black", lw=0.6)
ax.axvline(15, color="grey", ls="--", lw=1.2, label="+15 d")
ax.set_xlabel("Lag τ (days)")
ax.set_ylabel("Pearson r")
ax.set_title(f"Partial correlation: CR vs seismic after sunspot regression\n"
f"r_raw(+15d)={r_raw_15:.3f} r_partial(+15d)={r_partial_15:.3f}")
ax.legend(fontsize=9)
fig.tight_layout()
fig.savefig(FIG_DIR / "partial_correlation.png", dpi=150, bbox_inches="tight")
plt.close(fig)
log.info("3b figure saved.")
drop_frac = (abs(r_raw_15) - abs(r_partial_15)) / max(abs(r_raw_15), 1e-9)
return dict(
sunspot_slope=float(slope),
r_raw_15d=float(r_raw_15),
r_partial_15d=float(r_partial_15),
drop_fraction_at_15d=float(drop_frac),
peak_r_partial=float(peak_partial),
interpretation=(
"Partial correlation near zero — solar-cycle confounding confirmed"
if abs(r_partial_15) < 0.05 else
"Partial correlation non-trivial — residual signal remains after sunspot control"
),
)
# ---------------------------------------------------------------------------
# 3c — Spectral coherence + mutual information
# ---------------------------------------------------------------------------
def _mi_knn(x: np.ndarray, y: np.ndarray, k: int = 5) -> float:
"""
Kraskov et al. (2004) kNN mutual information estimator (estimator 1).
Uses Chebyshev (L-inf) metric in joint space; marginal counts via
vectorised searchsorted — O(N log N), no Python loop over points.
Requires finite values only.
"""
from scipy.special import digamma
N = len(x)
x = (x - x.mean()) / (x.std() + 1e-12)
y = (y - y.mean()) / (y.std() + 1e-12)
xy = np.column_stack([x, y])
# k-NN in joint space using Chebyshev (L-inf) metric
tree_xy = scipy.spatial.cKDTree(xy)
dists, _ = tree_xy.query(xy, k=k + 1, p=np.inf, workers=-1)
eps = dists[:, -1] # k-th neighbour Chebyshev distance
# Count marginal neighbours via sorted binary search — O(N log N), vectorised
xs = np.sort(x)
ys = np.sort(y)
nx = (np.searchsorted(xs, x + eps, side="right")
- np.searchsorted(xs, x - eps, side="left") - 1)
ny = (np.searchsorted(ys, y + eps, side="right")
- np.searchsorted(ys, y - eps, side="left") - 1)
nx = np.maximum(nx, 0)
ny = np.maximum(ny, 0)
mi = digamma(k) + digamma(N) - np.mean(digamma(nx + 1)) - np.mean(digamma(ny + 1))
return float(max(mi, 0.0))
def run_3c(cr: pd.Series, sei: pd.Series) -> dict:
log.info("=== 3c: Spectral coherence + mutual information ===")
rng = np.random.default_rng(SEED)
idx = cr.index.intersection(sei.index)
x = cr.reindex(idx).values.astype(float)
y = sei.reindex(idx).values.astype(float)
ok = np.isfinite(x) & np.isfinite(y)
xf, yf = x[ok], y[ok]
# --- Spectral coherence ---
# nperseg=2048 gives freq. resolution (1/5)/2048 = 0.036 cycles/yr,
# sufficient to resolve the solar-cycle band (0.080.115 cycles/yr).
# nperseg=512 only gives 0.143 cycles/yr — no bins fall in the SC band.
nperseg = 2048
fs = 1.0 / BIN_DAYS # cycles per day
f_d, Cxy = scipy.signal.coherence(xf, yf, fs=fs, nperseg=nperseg,
window="hann", noverlap=nperseg * 3 // 4)
f_yr = f_d * 365.25 # cycles per year
# Solar cycle band: 0.080.11 cycles/yr (912 year period)
sc_mask = (f_yr >= 0.08) & (f_yr <= 0.115)
coh_sc = float(np.mean(Cxy[sc_mask]))
log.info(" Mean coherence in solar-cycle band: %.4f", coh_sc)
# Significance: 95th percentile of coherence for white-noise pairs
noverlap_used = nperseg * 3 // 4
step = nperseg - noverlap_used
n_seg = max(1, (ok.sum() - nperseg) // step + 1)
coh_95 = 1 - 0.05 ** (1.0 / (n_seg - 1)) if n_seg > 1 else np.nan
log.info(" Coherence 95%% significance threshold (for %d segments): %.4f",
n_seg, coh_95)
# --- Mutual information ---
N = len(xf)
mi_lag0 = _mi_knn(xf, yf)
# MI at lag +15d (=+3 bins)
lag15 = CLAIM_BIN
xi15, yi15 = xf[:N - lag15], yf[lag15:]
ok15 = np.isfinite(xi15) & np.isfinite(yi15)
mi_lag15 = _mi_knn(xi15[ok15], yi15[ok15])
log.info(" MI(lag=0)=%.4f MI(lag=+15d)=%.4f", mi_lag0, mi_lag15)
# Shuffle null for MI (1000 permutations)
n_shuf = 1000
mi_null_0 = np.empty(n_shuf)
mi_null_15 = np.empty(n_shuf)
for i in range(n_shuf):
ys = rng.permutation(yf)
mi_null_0[i] = _mi_knn(xf, ys)
ys15 = rng.permutation(yf[lag15:])
mi_null_15[i] = _mi_knn(xf[:N - lag15][ok15], ys15[ok15])
p_mi_0 = float(np.mean(mi_null_0 >= mi_lag0))
p_mi_15 = float(np.mean(mi_null_15 >= mi_lag15))
log.info(" p(MI lag=0)=%.4f p(MI lag=+15d)=%.4f", p_mi_0, p_mi_15)
# Figure
fig, axes = plt.subplots(1, 2, figsize=(11, 4))
# Coherence panel
ax = axes[0]
ax.plot(f_yr, Cxy, color="steelblue", lw=1, label="Coherence")
ax.axvspan(0.08, 0.115, alpha=0.15, color="orange", label="Solar-cycle band (0.080.115 yr⁻¹)")
if np.isfinite(coh_95):
ax.axhline(coh_95, color="red", ls="--", lw=1.2,
label=f"95% sig. ({coh_95:.3f}, K={n_seg} seg.)")
ax.set_xlim(0, 0.5)
ax.set_ylim(0, 1.05)
ax.set_xlabel("Frequency (cycles yr⁻¹)")
ax.set_ylabel("Magnitude-squared coherence")
coh_sc_str = f"{coh_sc:.3f}" if np.isfinite(coh_sc) else "N/A"
ax.set_title(f"Spectral coherence (nperseg={nperseg})\nMean coh. in SC band: {coh_sc_str}")
ax.legend(fontsize=8)
# MI panel
ax = axes[1]
ax.hist(mi_null_0, bins=40, color="steelblue", alpha=0.6, label="Shuffle null (lag=0)")
ax.hist(mi_null_15, bins=40, color="darkorange", alpha=0.6, label="Shuffle null (lag=+15d)")
ax.axvline(mi_lag0, color="steelblue", lw=2, ls="-", label=f"Obs MI(0)={mi_lag0:.3f}")
ax.axvline(mi_lag15, color="darkorange", lw=2, ls="--",
label=f"Obs MI(+15d)={mi_lag15:.3f}")
ax.set_xlabel("Mutual information (nats)")
ax.set_title(f"kNN mutual information\np(lag=0)={p_mi_0:.3f} p(lag=+15d)={p_mi_15:.3f}")
ax.legend(fontsize=7)
fig.tight_layout()
fig.savefig(FIG_DIR / "spectral_coherence.png", dpi=150, bbox_inches="tight")
plt.close(fig)
log.info("3c figure saved.")
return dict(
coherence_solar_cycle_band=coh_sc,
coherence_95pct_threshold=coh_95 if np.isfinite(coh_95) else None,
mi_lag0=mi_lag0,
mi_lag15d=mi_lag15,
p_mi_lag0=p_mi_0,
p_mi_lag15d=p_mi_15,
)
# ---------------------------------------------------------------------------
# 3d — Missing-data sensitivity
# ---------------------------------------------------------------------------
SOLAR_MAXIMA = [1979.7, 1989.6, 2000.3, 2014.2] # approximate decimal years
def _cr_nan_analysis(study_start: str, study_end: str,
min_stations: int) -> dict:
"""Return NaN stats and r(+15d) for the given station threshold."""
cr, _ = _load_cr(study_start, study_end, bin_days=BIN_DAYS,
min_stations=min_stations)
ev = _load_seismic_events(study_start, study_end)
sei = _energy_metric(ev, study_start, study_end, BIN_DAYS, cr.index)
nan_frac = float(cr.isna().mean())
# Are NaN bins concentrated near solar maxima?
is_nan = cr.isna()
bin_years = cr.index.year + cr.index.dayofyear / 365.25
# Flag years within ±1.5 yr of any solar maximum
near_max = np.zeros(len(cr), dtype=bool)
for sm in SOLAR_MAXIMA:
near_max |= np.abs(bin_years - sm) <= 1.5
nan_near = float(is_nan[near_max].mean())
nan_far = float(is_nan[~near_max].mean())
# r(+15d)
ok = np.isfinite(cr.values) & np.isfinite(sei.values)
x, y = cr.values[ok], sei.values[ok]
r15, _ = scipy.stats.pearsonr(x, y) # zero-lag as proxy; lag-3 below
# Actual lag-3
N = len(cr)
xa = cr.values[:N - CLAIM_BIN]; ya = sei.values[CLAIM_BIN:]
ok2 = np.isfinite(xa) & np.isfinite(ya)
r_15d, _ = scipy.stats.pearsonr(xa[ok2], ya[ok2])
return dict(
min_stations=min_stations,
nan_fraction=nan_frac,
nan_fraction_near_solar_max=nan_near,
nan_fraction_far_from_solar_max=nan_far,
clustering_ratio=nan_near / max(nan_far, 1e-9),
r_at_15d=float(r_15d),
)
def run_3d() -> dict:
log.info("=== 3d: Missing-data sensitivity ===")
results = []
for thr in (2, 3, 5):
log.info(" Station threshold = %d", thr)
res = _cr_nan_analysis(STUDY_START, STUDY_END, thr)
results.append(res)
log.info(" NaN=%.1f%% near-max=%.1f%% r(+15d)=%.4f",
100 * res["nan_fraction"],
100 * res["nan_fraction_near_solar_max"],
res["r_at_15d"])
return dict(station_threshold_sensitivity=results)
# ---------------------------------------------------------------------------
# 3e — Bin-size sensitivity
# ---------------------------------------------------------------------------
def _run_for_bin_size(bin_days: int) -> dict:
"""Load CR and seismic at *bin_days* resolution, compute r(τ)."""
cr, _ = _load_cr(STUDY_START, STUDY_END, bin_days=bin_days)
ev = _load_seismic_events(STUDY_START, STUDY_END)
sei = _energy_metric(ev, STUDY_START, STUDY_END, bin_days, cr.index)
# Lag range: ±1000 days in steps of bin_days
max_lag_bins = int(1000 / bin_days)
lbs = np.arange(-max_lag_bins, max_lag_bins + 1)
rv = xcorr(cr.values, sei.values, lbs)
# Find claimed lag bin (closest to +15d)
claim_bin = int(round(15 / bin_days))
r15 = rv[lbs == claim_bin][0] if np.any(lbs == claim_bin) else np.nan
peak_idx = np.nanargmax(np.abs(rv))
peak_r = rv[peak_idx]
peak_lag = int(lbs[peak_idx]) * bin_days # days
log.info(" %d-d bins: r(+%dd)=%.4f peak r=%.4f @ τ=%dd",
bin_days, claim_bin * bin_days, r15, peak_r, peak_lag)
return dict(
bin_days=bin_days,
r_at_claimed_lag=float(r15),
claimed_lag_days=claim_bin * bin_days,
peak_r=float(peak_r),
peak_lag_days=peak_lag,
lag_bins=lbs.tolist(),
r_values=rv.tolist(),
)
def run_3e() -> dict:
log.info("=== 3e: Bin-size sensitivity ===")
results = []
rv_by_bin = {}
for bd in (1, 5, 27):
r = _run_for_bin_size(bd)
results.append({k: v for k, v in r.items() if k not in ("lag_bins", "r_values")})
rv_by_bin[bd] = (r["lag_bins"], r["r_values"])
# Figure: 3-panel
fig, axes = plt.subplots(1, 3, figsize=(14, 4), sharey=False)
colours = ["steelblue", "darkorange", "seagreen"]
for ax, (bd, (lbs, rv)), col in zip(axes, rv_by_bin.items(), colours):
lags_d = np.array(lbs) * bd
rv = np.array(rv)
ax.plot(lags_d, rv, color=col, lw=0.8)
ax.axhline(0, color="black", lw=0.5)
ax.axvline(15, color="grey", ls="--", lw=1, label="+15 d")
closest = lbs[np.argmin(np.abs(np.array(lbs) - round(15 / bd)))]
pk_d = lbs[np.nanargmax(np.abs(rv))] * bd
ax.axvline(pk_d, color="red", ls=":", lw=1, label=f"Peak τ={pk_d}d")
ax.set_title(f"{bd}-day bins\nr(+15d≈{int(round(15/bd)*bd)}d)="
f"{rv[np.argmin(np.abs(np.array(lbs)-round(15/bd)))]:.3f}")
ax.set_xlabel("Lag (days)")
ax.set_ylabel("Pearson r")
ax.legend(fontsize=7)
fig.suptitle("Bin-size sensitivity: 1-day, 5-day, 27-day bins", fontsize=11)
fig.tight_layout()
fig.savefig(FIG_DIR / "bin_size_sensitivity.png", dpi=150, bbox_inches="tight")
plt.close(fig)
log.info("3e figure saved.")
return dict(bin_size_sensitivity=results)
# ---------------------------------------------------------------------------
# 3f — GardnerKnopoff declustering
# ---------------------------------------------------------------------------
def _gk_time_window(mag: float) -> float:
"""After-shock time window T(M) in days (Gardner & Knopoff 1974)."""
if mag < 6.5:
return 10 ** (0.5 * mag - 1.8)
else:
return 10 ** (0.46 * mag - 2.31)
def _gk_dist_window(mag: float) -> float:
"""After-shock distance window L(M) in km (Gardner & Knopoff 1974)."""
return 10 ** (0.1238 * mag + 0.983)
def _haversine_km(lat1: np.ndarray, lon1: np.ndarray,
lat2: float, lon2: float) -> np.ndarray:
"""Vectorised haversine distance in km."""
R = 6371.0
dlat = np.radians(lat2 - lat1)
dlon = np.radians(lon2 - lon1)
a = (np.sin(dlat / 2) ** 2 +
np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) *
np.sin(dlon / 2) ** 2)
return 2 * R * np.arcsin(np.sqrt(np.clip(a, 0, 1)))
def _gardner_knopoff_decluster(events: pd.DataFrame) -> pd.DataFrame:
"""
Remove aftershocks from *events* using the Gardner-Knopoff (1974) algorithm.
Returns the mainshock-only DataFrame. Uses forward-only time windows
(each mainshock's aftershock zone applies to all future events within T(M)).
"""
ev = events.dropna(subset=["latitude", "longitude", "mag"]).copy()
ev = ev.sort_index()
n = len(ev)
times = ev.index.values.astype("datetime64[ns]").astype(np.int64) / 1e9 / 86400 # days
lats = ev["latitude"].values
lons = ev["longitude"].values
mags = ev["mag"].values
is_as = np.zeros(n, dtype=bool)
log.info(" GK declustering: %d events …", n)
for i in range(n):
if is_as[i]:
continue
tw = _gk_time_window(mags[i])
dw = _gk_dist_window(mags[i])
# Forward window in time
t_end = times[i] + tw
j_lo = i + 1
# Find j_hi via binary search (times is sorted)
j_hi = np.searchsorted(times, t_end, side="right")
if j_lo >= j_hi:
continue
cands = np.arange(j_lo, j_hi)
cands = cands[~is_as[cands]] # skip already-flagged
if len(cands) == 0:
continue
dists = _haversine_km(lats[cands], lons[cands], lats[i], lons[i])
is_as[cands[dists <= dw]] = True
n_main = int((~is_as).sum())
n_as = int(is_as.sum())
log.info(" GK result: %d mainshocks %d aftershocks (%.1f%% removed)",
n_main, n_as, 100 * n_as / max(n, 1))
return ev[~is_as]
def run_3f(cr: pd.Series) -> dict:
log.info("=== 3f: GardnerKnopoff declustering ===")
events = _load_seismic_events(STUDY_START, STUDY_END, min_mag=MIN_MAG)
sei_full = _energy_metric(events, STUDY_START, STUDY_END, BIN_DAYS, cr.index)
mainshocks = _gardner_knopoff_decluster(events)
sei_decl = _energy_metric(mainshocks, STUDY_START, STUDY_END, BIN_DAYS, cr.index)
rv_full = xcorr(cr.values, sei_full.values, LAG_BINS)
rv_decl = xcorr(cr.values, sei_decl.values, LAG_BINS)
r15_full = float(rv_full[LAG_BINS == CLAIM_BIN][0])
r15_decl = float(rv_decl[LAG_BINS == CLAIM_BIN][0])
pk_full = float(np.nanmax(np.abs(rv_full)))
pk_decl = float(np.nanmax(np.abs(rv_decl)))
log.info(" r(+15d) full=%.4f declustered=%.4f", r15_full, r15_decl)
n_removed_frac = float(1 - len(mainshocks) / len(events))
# Figure
fig, ax = plt.subplots(figsize=(9, 4))
lags_d = LAG_BINS * BIN_DAYS
ax.plot(lags_d, rv_full, color="steelblue", alpha=0.8,
label=f"Full catalogue (n={len(events):,})")
ax.plot(lags_d, rv_decl, color="darkorange", lw=1.5,
label=f"Declustered / mainshocks only (n={len(mainshocks):,})")
ax.axhline(0, color="black", lw=0.5)
ax.axvline(15, color="grey", ls="--", lw=1.2, label="+15 d")
ax.set_xlabel("Lag τ (days)")
ax.set_ylabel("Pearson r")
ax.set_title(f"GardnerKnopoff declustering\n"
f"r(+15d): full={r15_full:.3f} declustered={r15_decl:.3f} "
f"({100*n_removed_frac:.0f}% removed)")
ax.legend(fontsize=9)
fig.tight_layout()
fig.savefig(FIG_DIR / "declustered_xcorr.png", dpi=150, bbox_inches="tight")
plt.close(fig)
log.info("3f figure saved.")
return dict(
n_events_full=len(events),
n_mainshocks=len(mainshocks),
fraction_removed=n_removed_frac,
r_15d_full=r15_full,
r_15d_declustered=r15_decl,
peak_r_full=pk_full,
peak_r_declustered=pk_decl,
interpretation=(
"Aftershock clustering is NOT a primary confound — result stable after GK declustering"
if abs(r15_full - r15_decl) < 0.03 else
"Result changes after GK declustering — aftershock contamination is a concern"
),
)
# ---------------------------------------------------------------------------
# 3g — Sub-period analysis (per solar cycle)
# ---------------------------------------------------------------------------
def run_3g(cr: pd.Series, sei: pd.Series) -> dict:
log.info("=== 3g: Sub-period analysis (per solar cycle) ===")
lag_range_bins = np.arange(-40, 41) # ±200 d in 5-d steps (short cycles)
fig, axes = plt.subplots(2, 2, figsize=(12, 8), sharey=True)
results = {}
for ax, (cycle_num, (t_start, t_end)) in zip(axes.flat, SOLAR_CYCLES.items()):
# Crop pre-aligned global CR/seismic series — avoids bin-date misalignment
cr_c = cr.loc[t_start:t_end]
sei_c = sei.loc[t_start:t_end]
if len(cr_c) < 40:
log.warning(" Cycle %d: insufficient data (%d bins)", cycle_num, len(cr_c))
ax.set_title(f"Cycle {cycle_num}: insufficient data")
results[f"cycle_{cycle_num}"] = None
continue
rv = xcorr(cr_c.values, sei_c.values, lag_range_bins)
if np.all(np.isnan(rv)):
log.warning(" Cycle %d: all-NaN xcorr — skipping", cycle_num)
ax.set_title(f"Cycle {cycle_num}: all-NaN")
results[f"cycle_{cycle_num}"] = None
continue
r15 = float(rv[lag_range_bins == CLAIM_BIN][0]) if np.any(lag_range_bins == CLAIM_BIN) else np.nan
pk_idx = np.nanargmax(np.abs(rv))
pk_r = float(rv[pk_idx])
pk_lag = int(lag_range_bins[pk_idx]) * BIN_DAYS
n_bins = len(cr_c)
log.info(" Cycle %d (%s%s): N=%d r(+15d)=%.4f peak=%.4f@%dd",
cycle_num, t_start, t_end, n_bins, r15, pk_r, pk_lag)
lags_d = lag_range_bins * BIN_DAYS
ax.plot(lags_d, rv, color="steelblue", lw=1.2)
ax.axhline(0, color="black", lw=0.5)
ax.axvline(15, color="grey", ls="--", lw=1, label="+15 d")
ax.axvline(pk_lag, color="red", ls=":", lw=1,
label=f"Peak τ={pk_lag}d r={pk_r:.3f}")
ax.set_title(f"Solar Cycle {cycle_num} ({t_start[:4]}{t_end[:4]})\n"
f"N={n_bins} bins r(+15d)={r15:.3f}")
ax.set_xlabel("Lag (days)")
ax.set_ylabel("Pearson r")
ax.legend(fontsize=7)
results[f"cycle_{cycle_num}"] = dict(
start=t_start, end=t_end,
n_bins=n_bins,
r_at_15d=r15,
peak_r=pk_r,
peak_lag_days=pk_lag,
)
fig.suptitle("Cross-correlation per solar cycle (19762019)", fontsize=11)
fig.tight_layout()
fig.savefig(FIG_DIR / "solar_cycle_xcorr.png", dpi=150, bbox_inches="tight")
plt.close(fig)
log.info("3g figure saved.")
r15_vals = [v["r_at_15d"] for v in results.values() if v is not None]
sign_consistent = all(r > 0 for r in r15_vals) or all(r < 0 for r in r15_vals)
return dict(
per_cycle=results,
r15d_range=[float(min(r15_vals)), float(max(r15_vals))],
sign_consistent_across_cycles=sign_consistent,
interpretation=(
"r(+15d) consistent in sign across all solar cycles"
if sign_consistent else
"r(+15d) sign INCONSISTENT across solar cycles — confirms solar-cycle artefact"
),
)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
log.info("Loading base CR and seismic data …")
cr, _stn = _load_cr(STUDY_START, STUDY_END)
events = _load_seismic_events(STUDY_START, STUDY_END)
sei = _energy_metric(events, STUDY_START, STUDY_END, BIN_DAYS, cr.index)
out = {}
log.info("Running 3a …")
out["3a_block_bootstrap"] = run_3a(cr, sei)
log.info("Running 3b …")
out["3b_partial_correlation"] = run_3b(cr, sei)
log.info("Running 3c …")
out["3c_coherence_mi"] = run_3c(cr, sei)
log.info("Running 3d …")
out["3d_missing_data"] = run_3d()
log.info("Running 3e …")
out["3e_bin_size"] = run_3e()
log.info("Running 3f …")
out["3f_declustering"] = run_3f(cr)
log.info("Running 3g …")
out["3g_solar_cycles"] = run_3g(cr, sei)
# Save JSON
out_path = OUT_DIR / "additional_robustness.json"
with open(out_path, "w") as fh:
json.dump(out, fh, indent=2, default=str)
log.info("Results saved: %s", out_path)
# ── Summary printout ───────────────────────────────────────────────────
print("\n" + "=" * 70)
print(" ADDITIONAL ROBUSTNESS SUMMARY")
print("=" * 70)
r = out["3a_block_bootstrap"]
print(f" 3a Block bootstrap p(+15d)={r['p_block_15d']:.4f} "
f"p(peak)={r['p_block_peak']:.4f}")
r = out["3b_partial_correlation"]
print(f" 3b Partial corr r_raw(+15d)={r['r_raw_15d']:.4f} "
f"r_partial={r['r_partial_15d']:.4f}")
r = out["3c_coherence_mi"]
print(f" 3c Coherence SC-band={r['coherence_solar_cycle_band']:.4f} "
f"MI p(lag=0)={r['p_mi_lag0']:.3f} p(+15d)={r['p_mi_lag15d']:.3f}")
for res in out["3d_missing_data"]["station_threshold_sensitivity"]:
print(f" 3d min_stn={res['min_stations']} "
f"NaN={100*res['nan_fraction']:.1f}% r(+15d)={res['r_at_15d']:.4f}")
for res in out["3e_bin_size"]["bin_size_sensitivity"]:
print(f" 3e {res['bin_days']:2d}-d bins "
f"r(+{res['claimed_lag_days']}d)={res['r_at_claimed_lag']:.4f} "
f"peak={res['peak_r']:.4f}@{res['peak_lag_days']}d")
r = out["3f_declustering"]
print(f" 3f Declustering removed={100*r['fraction_removed']:.0f}% "
f"r_full={r['r_15d_full']:.4f} r_decl={r['r_15d_declustered']:.4f}")
r = out["3g_solar_cycles"]
print(f" 3g Solar cycles r(+15d) range={r['r15d_range']} "
f"sign_consistent={r['sign_consistent_across_cycles']}")
print("=" * 70)
if __name__ == "__main__":
main()