""" GCP Server — collects random trials from distributed eggs, runs Stouffer Z network variance analysis every 60 seconds, and serves the dot website. """ import hashlib import json as _json import os import sqlite3 import threading import time import urllib.request from collections import defaultdict from contextlib import contextmanager import numpy as np from fastapi import FastAPI, HTTPException, Request from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field from scipy import stats DB_PATH = os.environ.get("DB_PATH", "/data/gcp.db") ANALYSIS_INTERVAL = 60 # seconds between analysis runs DATA_OFFSET = 60 # analyse data at least 60 s old (propagation buffer) WINDOW_SECONDS = 3600 # 1-hour analysis window (matches GCP primary window) KEEP_SECONDS = 48 * 3600 # purge raw trials older than 48 h MIN_EGGS = 2 # need at least 2 eggs to compute network variance MIN_SAMPLES = 10 # need at least 10 time-steps in window # --------------------------------------------------------------------------- # Database helpers # --------------------------------------------------------------------------- def init_db(): with db() as con: con.executescript(""" CREATE TABLE IF NOT EXISTS trials ( id INTEGER PRIMARY KEY AUTOINCREMENT, egg_id TEXT NOT NULL, timestamp INTEGER NOT NULL, trial INTEGER NOT NULL ); CREATE INDEX IF NOT EXISTS idx_trials_ts ON trials(timestamp); CREATE TABLE IF NOT EXISTS analysis ( timestamp INTEGER PRIMARY KEY, index_value REAL NOT NULL, n_eggs INTEGER NOT NULL, n_samples INTEGER NOT NULL, window_seconds INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS egg_locations ( egg_id TEXT PRIMARY KEY, country TEXT, country_code TEXT, lat REAL, lon REAL, located_at INTEGER ); """) @contextmanager def db(): con = sqlite3.connect(DB_PATH) con.row_factory = sqlite3.Row try: yield con con.commit() finally: con.close() # --------------------------------------------------------------------------- # Geo-location (server-side, anonymised to 1° grid ≈ 111 km) # --------------------------------------------------------------------------- _geo_pending: set[str] = set() _geo_lock = threading.Lock() def _jitter(egg_id: str) -> tuple[float, float]: """Deterministic ±0.35° offset so eggs in the same cell don't stack.""" h = int(hashlib.md5(egg_id.encode()).hexdigest()[:8], 16) return ((h & 0xFFFF) / 0xFFFF - 0.5) * 0.7, ((h >> 16 & 0xFFFF) / 0xFFFF - 0.5) * 0.7 def _fetch_geo(egg_id: str, ip: str): try: url = f"http://ip-api.com/json/{ip}?fields=status,country,countryCode,lat,lon" with urllib.request.urlopen(url, timeout=6) as resp: data = _json.loads(resp.read()) if data.get("status") != "success": return lat = round(float(data["lat"])) + _jitter(egg_id)[0] lon = round(float(data["lon"])) + _jitter(egg_id)[1] with db() as con: con.execute( "INSERT OR REPLACE INTO egg_locations " "(egg_id, country, country_code, lat, lon, located_at) " "VALUES (?, ?, ?, ?, ?, ?)", (egg_id, data["country"], data["countryCode"], round(lat, 2), round(lon, 2), int(time.time())), ) except Exception as exc: print(f"[geo] {egg_id} {ip}: {exc}") finally: with _geo_lock: _geo_pending.discard(egg_id) def maybe_geolocate(egg_id: str, ip: str): if not ip or ip in ("127.0.0.1", "::1"): return with _geo_lock: if egg_id in _geo_pending: return with db() as con: if con.execute( "SELECT 1 FROM egg_locations WHERE egg_id = ?", (egg_id,) ).fetchone(): return with _geo_lock: _geo_pending.add(egg_id) threading.Thread(target=_fetch_geo, args=(egg_id, ip), daemon=True).start() # --------------------------------------------------------------------------- # Statistical analysis # --------------------------------------------------------------------------- def compute_index(rows): """ Compute the GCP network variance index from raw trial rows. Each egg produces one 200-bit trial per second: trial = count of 1-bits. Under H0: trial ~ Binomial(200, 0.5), mean=100, var=50. Steps: 1. Normalise each trial to a Z-score. 2. For each second, combine eggs via Stouffer Z. 3. Sum squared Stouffer Zs → network variance statistic ~ χ²(T). 4. Index = lower-tail CDF × 100. High index (>95) = small variance = coherence = blue. Low index (<5) = large variance = noise spike = red. """ # Build {timestamp: {egg_id: trial}} dict by_ts = defaultdict(dict) for row in rows: by_ts[row["timestamp"]][row["egg_id"]] = row["trial"] stouffer_sq = [] for ts in sorted(by_ts): egg_vals = list(by_ts[ts].values()) if len(egg_vals) < MIN_EGGS: continue z_scores = [(v - 100) / np.sqrt(50) for v in egg_vals] n = len(z_scores) stouffer = sum(z_scores) / np.sqrt(n) stouffer_sq.append(stouffer ** 2) if len(stouffer_sq) < MIN_SAMPLES: return None, 0, 0 T = len(stouffer_sq) net_var = float(np.sum(stouffer_sq)) index_value = float(stats.chi2.cdf(net_var, df=T)) * 100.0 n_eggs = len({row["egg_id"] for row in rows}) return index_value, n_eggs, T def analysis_loop(): """Background thread: run analysis every ANALYSIS_INTERVAL seconds.""" # Wait a bit on startup so the DB is definitely initialised. time.sleep(10) while True: try: now = int(time.time()) end = now - DATA_OFFSET start = end - WINDOW_SECONDS with db() as con: rows = con.execute( "SELECT egg_id, timestamp, trial FROM trials " "WHERE timestamp BETWEEN ? AND ? ORDER BY timestamp", (start, end), ).fetchall() index_value, n_eggs, n_samples = compute_index(rows) if index_value is not None: with db() as con: con.execute( "INSERT OR REPLACE INTO analysis " "(timestamp, index_value, n_eggs, n_samples, window_seconds) " "VALUES (?, ?, ?, ?, ?)", (now, index_value, n_eggs, n_samples, WINDOW_SECONDS), ) print( f"[analysis] index={index_value:.1f}% eggs={n_eggs} samples={n_samples}" ) # Purge old raw data with db() as con: con.execute( "DELETE FROM trials WHERE timestamp < ?", (now - KEEP_SECONDS,) ) except Exception as exc: print(f"[analysis] error: {exc}") time.sleep(ANALYSIS_INTERVAL) # --------------------------------------------------------------------------- # FastAPI app # --------------------------------------------------------------------------- app = FastAPI(title="Global Consciousness Project") class TrialData(BaseModel): egg_id: str = Field(..., min_length=1, max_length=64) timestamp: int trial: int = Field(..., ge=0, le=200) @app.on_event("startup") def startup(): os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) init_db() threading.Thread(target=analysis_loop, daemon=True, name="analysis").start() @app.post("/api/data") def receive_trial(request: Request, data: TrialData): now = int(time.time()) if abs(data.timestamp - now) > 300: raise HTTPException(status_code=422, detail="timestamp out of range") with db() as con: con.execute( "INSERT INTO trials (egg_id, timestamp, trial) VALUES (?, ?, ?)", (data.egg_id, data.timestamp, data.trial), ) client_ip = ( request.headers.get("X-Forwarded-For", "").split(",")[0].strip() or (request.client.host if request.client else "") ) maybe_geolocate(data.egg_id, client_ip) return {"status": "ok"} @app.get("/api/status") def get_status(): with db() as con: row = con.execute( "SELECT * FROM analysis ORDER BY timestamp DESC LIMIT 1" ).fetchone() if row: return { "index": round(row["index_value"], 2), "n_eggs": row["n_eggs"], "n_samples": row["n_samples"], "timestamp": row["timestamp"], } return {"index": 50.0, "n_eggs": 0, "n_samples": 0, "timestamp": 0} @app.get("/api/history") def get_history(limit: int = 60): with db() as con: rows = con.execute( "SELECT timestamp, index_value, n_eggs FROM analysis " "ORDER BY timestamp DESC LIMIT ?", (min(limit, 1440),), ).fetchall() return [ {"timestamp": r["timestamp"], "index": round(r["index_value"], 2), "n_eggs": r["n_eggs"]} for r in rows ] @app.get("/api/map") def get_map(): """Anonymised egg positions for the world map (active in last 5 min).""" cutoff = int(time.time()) - 300 with db() as con: rows = con.execute( """ SELECT el.egg_id, el.country, el.country_code, el.lat, el.lon, MAX(t.timestamp) AS last_active FROM egg_locations el JOIN trials t ON el.egg_id = t.egg_id WHERE t.timestamp > ? GROUP BY el.egg_id """, (cutoff,), ).fetchall() return [ { "country": r["country"], "country_code": r["country_code"], "lat": r["lat"], "lon": r["lon"], "last_active": r["last_active"], } for r in rows ] @app.get("/api/eggs") def get_active_eggs(): """Return eggs that have submitted data in the last 2 minutes.""" cutoff = int(time.time()) - 120 with db() as con: rows = con.execute( "SELECT DISTINCT egg_id, MAX(timestamp) as last_seen FROM trials " "WHERE timestamp > ? GROUP BY egg_id ORDER BY last_seen DESC", (cutoff,), ).fetchall() return [{"egg_id": r["egg_id"], "last_seen": r["last_seen"]} for r in rows] # Serve static files last so API routes take priority app.mount("/", StaticFiles(directory="/app/static", html=True), name="static")