All checks were successful
Build & publish Docker images / Build & push all images (push) Successful in 5s
72 lines
1.8 KiB
Python
72 lines
1.8 KiB
Python
import os
|
|
import time
|
|
import hashlib
|
|
import platform
|
|
|
|
import requests
|
|
|
|
ID_FILE = "/data/egg_id"
|
|
|
|
|
|
def generate_trial():
|
|
"""Sum of 200 random bits drawn from the OS CSPRNG (hardware entropy pool)."""
|
|
raw = os.urandom(25) # 25 bytes = 200 bits
|
|
return sum(bin(b).count("1") for b in raw)
|
|
|
|
|
|
def load_or_create_id():
|
|
try:
|
|
with open(ID_FILE) as f:
|
|
egg_id = f.read().strip()
|
|
if egg_id:
|
|
return egg_id
|
|
except OSError:
|
|
pass
|
|
egg_id = hashlib.sha256(os.urandom(32)).hexdigest()[:16]
|
|
try:
|
|
os.makedirs("/data", exist_ok=True)
|
|
with open(ID_FILE, "w") as f:
|
|
f.write(egg_id)
|
|
except OSError:
|
|
pass
|
|
return egg_id
|
|
|
|
|
|
def main():
|
|
server_url = os.environ.get("SERVER_URL", "http://localhost:8000").rstrip("/")
|
|
egg_id = os.environ.get("EGG_ID") or load_or_create_id()
|
|
|
|
print(f"GCP Egg id={egg_id} server={server_url} platform={platform.system()}")
|
|
|
|
session = requests.Session()
|
|
errors = 0
|
|
|
|
while True:
|
|
loop_start = time.time()
|
|
timestamp = int(loop_start)
|
|
trial = generate_trial()
|
|
|
|
try:
|
|
resp = session.post(
|
|
f"{server_url}/api/data",
|
|
json={"egg_id": egg_id, "timestamp": timestamp, "trial": trial},
|
|
timeout=5,
|
|
)
|
|
if resp.status_code == 200:
|
|
errors = 0
|
|
else:
|
|
errors += 1
|
|
if errors % 10 == 1:
|
|
print(f"Server returned {resp.status_code} (error #{errors})")
|
|
except Exception as exc:
|
|
errors += 1
|
|
if errors % 10 == 1:
|
|
print(f"Send error (#{errors}): {exc}")
|
|
|
|
elapsed = time.time() - loop_start
|
|
time.sleep(max(0.0, 1.0 - elapsed))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|