Files
GCP-Dot/server/main.py
Hexadual ee2249c5f8
Some checks failed
Build & publish Docker images / Egg image (push) Has been cancelled
Build & publish Docker images / Server image (push) Has been cancelled
Initial commit — GCP Dot self-hosted clone
Server (FastAPI + SQLite) runs Stouffer Z network variance analysis.
Egg container uses os.urandom for hardware-entropy 200-bit trials.
Gitea Actions workflow auto-builds and publishes both Docker images.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 01:21:22 -05:00

232 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
GCP Server — collects random trials from distributed eggs, runs Stouffer Z
network variance analysis every 60 seconds, and serves the dot website.
"""
import os
import sqlite3
import threading
import time
from collections import defaultdict
from contextlib import contextmanager
import numpy as np
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
from scipy import stats
DB_PATH = os.environ.get("DB_PATH", "/data/gcp.db")
ANALYSIS_INTERVAL = 60 # seconds between analysis runs
DATA_OFFSET = 60 # analyse data at least 60 s old (propagation buffer)
WINDOW_SECONDS = 3600 # 1-hour analysis window (matches GCP primary window)
KEEP_SECONDS = 48 * 3600 # purge raw trials older than 48 h
MIN_EGGS = 2 # need at least 2 eggs to compute network variance
MIN_SAMPLES = 10 # need at least 10 time-steps in window
# ---------------------------------------------------------------------------
# Database helpers
# ---------------------------------------------------------------------------
def init_db():
with db() as con:
con.executescript("""
CREATE TABLE IF NOT EXISTS trials (
id INTEGER PRIMARY KEY AUTOINCREMENT,
egg_id TEXT NOT NULL,
timestamp INTEGER NOT NULL,
trial INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_trials_ts ON trials(timestamp);
CREATE TABLE IF NOT EXISTS analysis (
timestamp INTEGER PRIMARY KEY,
index_value REAL NOT NULL,
n_eggs INTEGER NOT NULL,
n_samples INTEGER NOT NULL,
window_seconds INTEGER NOT NULL
);
""")
@contextmanager
def db():
con = sqlite3.connect(DB_PATH)
con.row_factory = sqlite3.Row
try:
yield con
con.commit()
finally:
con.close()
# ---------------------------------------------------------------------------
# Statistical analysis
# ---------------------------------------------------------------------------
def compute_index(rows):
"""
Compute the GCP network variance index from raw trial rows.
Each egg produces one 200-bit trial per second: trial = count of 1-bits.
Under H0: trial ~ Binomial(200, 0.5), mean=100, var=50.
Steps:
1. Normalise each trial to a Z-score.
2. For each second, combine eggs via Stouffer Z.
3. Sum squared Stouffer Zs → network variance statistic ~ χ²(T).
4. Index = lower-tail CDF × 100.
High index (>95) = small variance = coherence = blue.
Low index (<5) = large variance = noise spike = red.
"""
# Build {timestamp: {egg_id: trial}} dict
by_ts = defaultdict(dict)
for row in rows:
by_ts[row["timestamp"]][row["egg_id"]] = row["trial"]
stouffer_sq = []
for ts in sorted(by_ts):
egg_vals = list(by_ts[ts].values())
if len(egg_vals) < MIN_EGGS:
continue
z_scores = [(v - 100) / np.sqrt(50) for v in egg_vals]
n = len(z_scores)
stouffer = sum(z_scores) / np.sqrt(n)
stouffer_sq.append(stouffer ** 2)
if len(stouffer_sq) < MIN_SAMPLES:
return None, 0, 0
T = len(stouffer_sq)
net_var = float(np.sum(stouffer_sq))
index_value = float(stats.chi2.cdf(net_var, df=T)) * 100.0
n_eggs = len({row["egg_id"] for row in rows})
return index_value, n_eggs, T
def analysis_loop():
"""Background thread: run analysis every ANALYSIS_INTERVAL seconds."""
# Wait a bit on startup so the DB is definitely initialised.
time.sleep(10)
while True:
try:
now = int(time.time())
end = now - DATA_OFFSET
start = end - WINDOW_SECONDS
with db() as con:
rows = con.execute(
"SELECT egg_id, timestamp, trial FROM trials "
"WHERE timestamp BETWEEN ? AND ? ORDER BY timestamp",
(start, end),
).fetchall()
index_value, n_eggs, n_samples = compute_index(rows)
if index_value is not None:
with db() as con:
con.execute(
"INSERT OR REPLACE INTO analysis "
"(timestamp, index_value, n_eggs, n_samples, window_seconds) "
"VALUES (?, ?, ?, ?, ?)",
(now, index_value, n_eggs, n_samples, WINDOW_SECONDS),
)
print(
f"[analysis] index={index_value:.1f}% eggs={n_eggs} samples={n_samples}"
)
# Purge old raw data
with db() as con:
con.execute(
"DELETE FROM trials WHERE timestamp < ?", (now - KEEP_SECONDS,)
)
except Exception as exc:
print(f"[analysis] error: {exc}")
time.sleep(ANALYSIS_INTERVAL)
# ---------------------------------------------------------------------------
# FastAPI app
# ---------------------------------------------------------------------------
app = FastAPI(title="Global Consciousness Project")
class TrialData(BaseModel):
egg_id: str = Field(..., min_length=1, max_length=64)
timestamp: int
trial: int = Field(..., ge=0, le=200)
@app.on_event("startup")
def startup():
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
init_db()
threading.Thread(target=analysis_loop, daemon=True, name="analysis").start()
@app.post("/api/data")
def receive_trial(data: TrialData):
now = int(time.time())
# Reject data more than 5 minutes out of sync
if abs(data.timestamp - now) > 300:
raise HTTPException(status_code=422, detail="timestamp out of range")
with db() as con:
con.execute(
"INSERT INTO trials (egg_id, timestamp, trial) VALUES (?, ?, ?)",
(data.egg_id, data.timestamp, data.trial),
)
return {"status": "ok"}
@app.get("/api/status")
def get_status():
with db() as con:
row = con.execute(
"SELECT * FROM analysis ORDER BY timestamp DESC LIMIT 1"
).fetchone()
if row:
return {
"index": round(row["index_value"], 2),
"n_eggs": row["n_eggs"],
"n_samples": row["n_samples"],
"timestamp": row["timestamp"],
}
return {"index": 50.0, "n_eggs": 0, "n_samples": 0, "timestamp": 0}
@app.get("/api/history")
def get_history(limit: int = 60):
with db() as con:
rows = con.execute(
"SELECT timestamp, index_value, n_eggs FROM analysis "
"ORDER BY timestamp DESC LIMIT ?",
(min(limit, 1440),),
).fetchall()
return [
{"timestamp": r["timestamp"], "index": round(r["index_value"], 2), "n_eggs": r["n_eggs"]}
for r in rows
]
@app.get("/api/eggs")
def get_active_eggs():
"""Return eggs that have submitted data in the last 2 minutes."""
cutoff = int(time.time()) - 120
with db() as con:
rows = con.execute(
"SELECT DISTINCT egg_id, MAX(timestamp) as last_seen FROM trials "
"WHERE timestamp > ? GROUP BY egg_id ORDER BY last_seen DESC",
(cutoff,),
).fetchall()
return [{"egg_id": r["egg_id"], "last_seen": r["last_seen"]} for r in rows]
# Serve static files last so API routes take priority
app.mount("/", StaticFiles(directory="/app/static", html=True), name="static")