Files
GCP-Dot/server/main.py
Hexadual 62b9995cd9
Some checks failed
Build & publish Docker images / Server image (push) Failing after 15s
Build & publish Docker images / Egg image (push) Failing after 17s
Add anonymised world map of active eggs
Server geolocates each egg by IP on first contact (ip-api.com), snaps
coordinates to the nearest 1-degree grid (~111 km) and adds a small
deterministic per-egg jitter so eggs in the same cell don't overlap.
Only country + rounded lat/lon are stored; exact IPs are never saved.

Front-end uses Leaflet with CartoDB dark tiles. Markers update on every
poll cycle and are coloured to match the current coherence index.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 01:32:19 -05:00

330 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
GCP Server — collects random trials from distributed eggs, runs Stouffer Z
network variance analysis every 60 seconds, and serves the dot website.
"""
import hashlib
import json as _json
import os
import sqlite3
import threading
import time
import urllib.request
from collections import defaultdict
from contextlib import contextmanager
import numpy as np
from fastapi import FastAPI, HTTPException, Request
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
from scipy import stats
DB_PATH = os.environ.get("DB_PATH", "/data/gcp.db")
ANALYSIS_INTERVAL = 60 # seconds between analysis runs
DATA_OFFSET = 60 # analyse data at least 60 s old (propagation buffer)
WINDOW_SECONDS = 3600 # 1-hour analysis window (matches GCP primary window)
KEEP_SECONDS = 48 * 3600 # purge raw trials older than 48 h
MIN_EGGS = 2 # need at least 2 eggs to compute network variance
MIN_SAMPLES = 10 # need at least 10 time-steps in window
# ---------------------------------------------------------------------------
# Database helpers
# ---------------------------------------------------------------------------
def init_db():
with db() as con:
con.executescript("""
CREATE TABLE IF NOT EXISTS trials (
id INTEGER PRIMARY KEY AUTOINCREMENT,
egg_id TEXT NOT NULL,
timestamp INTEGER NOT NULL,
trial INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_trials_ts ON trials(timestamp);
CREATE TABLE IF NOT EXISTS analysis (
timestamp INTEGER PRIMARY KEY,
index_value REAL NOT NULL,
n_eggs INTEGER NOT NULL,
n_samples INTEGER NOT NULL,
window_seconds INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS egg_locations (
egg_id TEXT PRIMARY KEY,
country TEXT,
country_code TEXT,
lat REAL,
lon REAL,
located_at INTEGER
);
""")
@contextmanager
def db():
con = sqlite3.connect(DB_PATH)
con.row_factory = sqlite3.Row
try:
yield con
con.commit()
finally:
con.close()
# ---------------------------------------------------------------------------
# Geo-location (server-side, anonymised to 1° grid ≈ 111 km)
# ---------------------------------------------------------------------------
_geo_pending: set[str] = set()
_geo_lock = threading.Lock()
def _jitter(egg_id: str) -> tuple[float, float]:
"""Deterministic ±0.35° offset so eggs in the same cell don't stack."""
h = int(hashlib.md5(egg_id.encode()).hexdigest()[:8], 16)
return ((h & 0xFFFF) / 0xFFFF - 0.5) * 0.7, ((h >> 16 & 0xFFFF) / 0xFFFF - 0.5) * 0.7
def _fetch_geo(egg_id: str, ip: str):
try:
url = f"http://ip-api.com/json/{ip}?fields=status,country,countryCode,lat,lon"
with urllib.request.urlopen(url, timeout=6) as resp:
data = _json.loads(resp.read())
if data.get("status") != "success":
return
lat = round(float(data["lat"])) + _jitter(egg_id)[0]
lon = round(float(data["lon"])) + _jitter(egg_id)[1]
with db() as con:
con.execute(
"INSERT OR REPLACE INTO egg_locations "
"(egg_id, country, country_code, lat, lon, located_at) "
"VALUES (?, ?, ?, ?, ?, ?)",
(egg_id, data["country"], data["countryCode"],
round(lat, 2), round(lon, 2), int(time.time())),
)
except Exception as exc:
print(f"[geo] {egg_id} {ip}: {exc}")
finally:
with _geo_lock:
_geo_pending.discard(egg_id)
def maybe_geolocate(egg_id: str, ip: str):
if not ip or ip in ("127.0.0.1", "::1"):
return
with _geo_lock:
if egg_id in _geo_pending:
return
with db() as con:
if con.execute(
"SELECT 1 FROM egg_locations WHERE egg_id = ?", (egg_id,)
).fetchone():
return
with _geo_lock:
_geo_pending.add(egg_id)
threading.Thread(target=_fetch_geo, args=(egg_id, ip), daemon=True).start()
# ---------------------------------------------------------------------------
# Statistical analysis
# ---------------------------------------------------------------------------
def compute_index(rows):
"""
Compute the GCP network variance index from raw trial rows.
Each egg produces one 200-bit trial per second: trial = count of 1-bits.
Under H0: trial ~ Binomial(200, 0.5), mean=100, var=50.
Steps:
1. Normalise each trial to a Z-score.
2. For each second, combine eggs via Stouffer Z.
3. Sum squared Stouffer Zs → network variance statistic ~ χ²(T).
4. Index = lower-tail CDF × 100.
High index (>95) = small variance = coherence = blue.
Low index (<5) = large variance = noise spike = red.
"""
# Build {timestamp: {egg_id: trial}} dict
by_ts = defaultdict(dict)
for row in rows:
by_ts[row["timestamp"]][row["egg_id"]] = row["trial"]
stouffer_sq = []
for ts in sorted(by_ts):
egg_vals = list(by_ts[ts].values())
if len(egg_vals) < MIN_EGGS:
continue
z_scores = [(v - 100) / np.sqrt(50) for v in egg_vals]
n = len(z_scores)
stouffer = sum(z_scores) / np.sqrt(n)
stouffer_sq.append(stouffer ** 2)
if len(stouffer_sq) < MIN_SAMPLES:
return None, 0, 0
T = len(stouffer_sq)
net_var = float(np.sum(stouffer_sq))
index_value = float(stats.chi2.cdf(net_var, df=T)) * 100.0
n_eggs = len({row["egg_id"] for row in rows})
return index_value, n_eggs, T
def analysis_loop():
"""Background thread: run analysis every ANALYSIS_INTERVAL seconds."""
# Wait a bit on startup so the DB is definitely initialised.
time.sleep(10)
while True:
try:
now = int(time.time())
end = now - DATA_OFFSET
start = end - WINDOW_SECONDS
with db() as con:
rows = con.execute(
"SELECT egg_id, timestamp, trial FROM trials "
"WHERE timestamp BETWEEN ? AND ? ORDER BY timestamp",
(start, end),
).fetchall()
index_value, n_eggs, n_samples = compute_index(rows)
if index_value is not None:
with db() as con:
con.execute(
"INSERT OR REPLACE INTO analysis "
"(timestamp, index_value, n_eggs, n_samples, window_seconds) "
"VALUES (?, ?, ?, ?, ?)",
(now, index_value, n_eggs, n_samples, WINDOW_SECONDS),
)
print(
f"[analysis] index={index_value:.1f}% eggs={n_eggs} samples={n_samples}"
)
# Purge old raw data
with db() as con:
con.execute(
"DELETE FROM trials WHERE timestamp < ?", (now - KEEP_SECONDS,)
)
except Exception as exc:
print(f"[analysis] error: {exc}")
time.sleep(ANALYSIS_INTERVAL)
# ---------------------------------------------------------------------------
# FastAPI app
# ---------------------------------------------------------------------------
app = FastAPI(title="Global Consciousness Project")
class TrialData(BaseModel):
egg_id: str = Field(..., min_length=1, max_length=64)
timestamp: int
trial: int = Field(..., ge=0, le=200)
@app.on_event("startup")
def startup():
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
init_db()
threading.Thread(target=analysis_loop, daemon=True, name="analysis").start()
@app.post("/api/data")
def receive_trial(request: Request, data: TrialData):
now = int(time.time())
if abs(data.timestamp - now) > 300:
raise HTTPException(status_code=422, detail="timestamp out of range")
with db() as con:
con.execute(
"INSERT INTO trials (egg_id, timestamp, trial) VALUES (?, ?, ?)",
(data.egg_id, data.timestamp, data.trial),
)
client_ip = (
request.headers.get("X-Forwarded-For", "").split(",")[0].strip()
or (request.client.host if request.client else "")
)
maybe_geolocate(data.egg_id, client_ip)
return {"status": "ok"}
@app.get("/api/status")
def get_status():
with db() as con:
row = con.execute(
"SELECT * FROM analysis ORDER BY timestamp DESC LIMIT 1"
).fetchone()
if row:
return {
"index": round(row["index_value"], 2),
"n_eggs": row["n_eggs"],
"n_samples": row["n_samples"],
"timestamp": row["timestamp"],
}
return {"index": 50.0, "n_eggs": 0, "n_samples": 0, "timestamp": 0}
@app.get("/api/history")
def get_history(limit: int = 60):
with db() as con:
rows = con.execute(
"SELECT timestamp, index_value, n_eggs FROM analysis "
"ORDER BY timestamp DESC LIMIT ?",
(min(limit, 1440),),
).fetchall()
return [
{"timestamp": r["timestamp"], "index": round(r["index_value"], 2), "n_eggs": r["n_eggs"]}
for r in rows
]
@app.get("/api/map")
def get_map():
"""Anonymised egg positions for the world map (active in last 5 min)."""
cutoff = int(time.time()) - 300
with db() as con:
rows = con.execute(
"""
SELECT el.egg_id, el.country, el.country_code, el.lat, el.lon,
MAX(t.timestamp) AS last_active
FROM egg_locations el
JOIN trials t ON el.egg_id = t.egg_id
WHERE t.timestamp > ?
GROUP BY el.egg_id
""",
(cutoff,),
).fetchall()
return [
{
"country": r["country"],
"country_code": r["country_code"],
"lat": r["lat"],
"lon": r["lon"],
"last_active": r["last_active"],
}
for r in rows
]
@app.get("/api/eggs")
def get_active_eggs():
"""Return eggs that have submitted data in the last 2 minutes."""
cutoff = int(time.time()) - 120
with db() as con:
rows = con.execute(
"SELECT DISTINCT egg_id, MAX(timestamp) as last_seen FROM trials "
"WHERE timestamp > ? GROUP BY egg_id ORDER BY last_seen DESC",
(cutoff,),
).fetchall()
return [{"egg_id": r["egg_id"], "last_seen": r["last_seen"]} for r in rows]
# Serve static files last so API routes take priority
app.mount("/", StaticFiles(directory="/app/static", html=True), name="static")