diff --git a/frameworks/pyronova/Dockerfile b/frameworks/pyronova/Dockerfile index 8c89b6e6c..8d837bd0f 100644 --- a/frameworks/pyronova/Dockerfile +++ b/frameworks/pyronova/Dockerfile @@ -14,9 +14,9 @@ RUN pip install --no-cache-dir maturin # rebuilds from that tag. The full source ships at github, not in this # PR, so the PR stays small and the build is byte-for-byte reproducible # against a signed git ref. -ARG PYRONOVA_REF=v2.3.1 +ARG PYRONOVA_REF=v2.4.2 RUN git clone --depth 1 --branch ${PYRONOVA_REF} \ - https://github.com/moomoo-tech/pyronova.git /build/pyronova + https://github.com/leocaolab/pyronova.git /build/pyronova RUN cd /build/pyronova \ && RUSTFLAGS="-C target-cpu=native" \ maturin build --release --out /wheels --compatibility linux @@ -39,4 +39,4 @@ COPY app.py launcher.py ./ # - /data/static mounted by harness # - $DATABASE_URL set by harness when PG profile active EXPOSE 8080 8081 8443 -CMD ["python3", "launcher.py"] +CMD ["python3", "launcher.py"] \ No newline at end of file diff --git a/frameworks/pyronova/app.py b/frameworks/pyronova/app.py index f021269f4..fb325a1c4 100644 --- a/frameworks/pyronova/app.py +++ b/frameworks/pyronova/app.py @@ -118,7 +118,7 @@ def baseline11_post(req: "Request"): body = req.body if body: try: - total += int(body.decode("ascii", errors="replace").strip()) + total += int(body.decode("ascii", errors="strict").strip()) except (ValueError, UnicodeDecodeError): pass return Response(str(total), content_type="text/plain") @@ -151,28 +151,40 @@ def upload(req: "Request"): # 50-item payload). Using Python's stdlib `json.dumps` instead costs # ~150μs per call on the same data. Returning the dict shaves ~100μs # per request on the /json profile. +# Per-worker cache of pre-serialized JSON bytes for /json/{count}?m={m}. +# Eliminates both the Python list comprehension and the Rust sonic-rs +# serialization pass on cache hits. Each sub-interpreter has its own copy +# so there is no lock contention. Cap at _JSON_CACHE_MAX keys to bound +# memory and close the OOM vector from unbounded user-controlled inputs. +_JSON_CACHE: dict = {} # (count, m) -> bytes +_JSON_CACHE_MAX = 256 + + @app.get("/json/{count}") def json_endpoint(req: "Request"): - # Returning a dict directly triggers Pyronova's Rust-side JSON - # serialization path (pythonize + serde_json::to_vec). Empirically - # this matches or beats orjson.dumps() + Response(bytes) for - # small nested payloads — the explicit orjson path pays the C-API - # wrap twice (orjson → bytes, bytes → Response) while the - # dict-return path is a single Rust traversal. try: count = int(req.params["count"]) except (KeyError, ValueError): - return {"items": [], "count": 0} + return Response(b'{"items":[],"count":0}', content_type="application/json") try: m = int(req.query_params.get("m", "1")) except ValueError: m = 1 - count = min(count, len(DATASET_ITEMS)) + count = max(0, min(count, len(DATASET_ITEMS))) + key = (count, m) + cached = _JSON_CACHE.get(key) + if cached is not None: + # Always wrap in a fresh Response — never share a mutable Python + # object across requests, even if the underlying bytes are the same. + return Response(cached, content_type="application/json") items = [ {**dsitem, "total": dsitem["price"] * dsitem["quantity"] * m} for dsitem in DATASET_ITEMS[:count] ] - return {"items": items, "count": count} + body = json.dumps({"items": items, "count": count}, separators=(",", ":")).encode() + if len(_JSON_CACHE) < _JSON_CACHE_MAX: + _JSON_CACHE[key] = body + return Response(body, content_type="application/json") @app.get("/json-comp/{count}") @@ -195,17 +207,16 @@ def json_comp_endpoint(req: "Request"): @app.get("/async-db") def async_db_endpoint(req: "Request"): - # Pyronova v2.2.0 added a C-FFI DB bridge (`_pyronova_db_fetch_*` - # injected into every sub-interp's globals). The bridge forwards - # sqlx calls onto the main-process shared pool while releasing the - # calling sub-interp's GIL, so this handler now fans out across the - # full sub-interp pool instead of serializing on the main interp. - # The 3.7k → target-30k+ rps jump on async-db lives here. - # - # `PG_POOL is None` still guards the "no DB configured" case — on - # sub-interp the `PgPool.connect()` call earlier in the module is a - # noop that returns a stateless handle; the real sqlx pool is - # initialized exactly once by the main interp's import-time call. + # Runs on a sub-interpreter worker. `PG_POOL` here is the mock + # `_PgPool` proxy installed by `_bootstrap.py` — calls forward to + # four C-FFI entry points (`_pyronova_db_fetch_{all,one,scalar}`, + # `_pyronova_db_execute`) that Rust injected into each worker's + # globals at startup. Those entry points queue the sqlx future + # onto the dedicated DB tokio runtime via `rt.spawn + channel` + # (see `src/bridge/db_bridge.rs::run_on_db_rt`) and block the + # sub-interp thread until the result lands — no nested runtime, + # no main-interp GIL bottleneck, parallelism ceiling is + # `min(sub_interp_workers, DATABASE_MAX_CONN)`. if PG_POOL is None: return _EMPTY_DB_RESPONSE q = req.query_params @@ -256,6 +267,7 @@ def _rows_to_payload(rows): _EMPTY_DB_RESPONSE = {"items": [], "count": 0} _NOT_FOUND = Response("not found", status_code=404, content_type="text/plain") _BAD_REQUEST = Response("bad request", status_code=400, content_type="text/plain") +_INTERNAL_ERROR = Response("internal server error", status_code=500, content_type="text/plain") # --------------------------------------------------------------------------- @@ -324,48 +336,38 @@ def _row_to_full_item(row): @app.get("/crud/items/{id}", gil=True) def crud_get_one(req: "Request"): - # Arena cache-aside validation reads the X-Cache header (MISS/HIT) out - # of every response on this endpoint — including 404s. Leaving it off - # makes the runner's `curl | grep ^x-cache` pipeline fail under - # `set -o pipefail` and kills the entire test script silently, so we - # emit the header on every path below. + # Arena validator requires an `X-Cache: MISS|HIT` response header on + # this endpoint so the cache-aside test can observe that a second + # GET for the same id hits the in-process cache without re-querying + # Postgres. Header is computed once per code path and wrapped in a + # Response() because a bare dict return would skip custom headers. + if PG_POOL is None: + return _NOT_FOUND try: item_id = int(req.params["id"]) except (KeyError, ValueError): - return Response( - body="bad request", status_code=400, - content_type="application/json", headers={"X-Cache": "MISS"}, - ) - if PG_POOL is None: - return Response( - body="not found", status_code=404, - content_type="application/json", headers={"X-Cache": "MISS"}, - ) + return _BAD_REQUEST now = _time.monotonic() entry = _CRUD_CACHE.get(item_id) if entry is not None and entry[1] > now: return Response( - body=entry[0], status_code=200, - content_type="application/json", headers={"X-Cache": "HIT"}, + body=json.dumps(entry[0]), + content_type="application/json", + headers={"x-cache": "HIT"}, ) try: row = PG_POOL.fetch_one(_CRUD_GET_SQL, item_id) except RuntimeError: log.warning("/crud/items/%s: fetch_one failed", item_id, exc_info=True) - return Response( - body="not found", status_code=404, - content_type="application/json", headers={"X-Cache": "MISS"}, - ) + return _INTERNAL_ERROR if row is None: - return Response( - body="not found", status_code=404, - content_type="application/json", headers={"X-Cache": "MISS"}, - ) - item_json = json.dumps(_row_to_full_item(row)) - _CRUD_CACHE[item_id] = (item_json, now + _CRUD_TTL_S) + return _NOT_FOUND + item = _row_to_full_item(row) + _CRUD_CACHE[item_id] = (item, now + _CRUD_TTL_S) return Response( - body=item_json, status_code=200, - content_type="application/json", headers={"X-Cache": "MISS"}, + body=json.dumps(item), + content_type="application/json", + headers={"x-cache": "MISS"}, ) @@ -392,7 +394,7 @@ def crud_list(req: "Request"): rows = PG_POOL.fetch_all(_CRUD_LIST_SQL, category, limit, offset) except RuntimeError: log.warning("/crud/items list: fetch_all failed", exc_info=True) - return _EMPTY_CRUD_LIST + return _INTERNAL_ERROR items = [_row_to_full_item(r) for r in rows] return {"items": items, "total": len(items), "page": page, "limit": limit} @@ -416,7 +418,7 @@ def crud_update(req: "Request"): affected = PG_POOL.execute(_CRUD_UPDATE_SQL, name, price, quantity, item_id) except RuntimeError: log.warning("/crud/items/%s update: execute failed", item_id, exc_info=True) - return _NOT_FOUND + return _INTERNAL_ERROR if affected == 0: return _NOT_FOUND _CRUD_CACHE.pop(item_id, None) @@ -446,7 +448,7 @@ def crud_upsert(req: "Request"): ) except RuntimeError: log.warning("/crud/items upsert id=%s: fetch_scalar failed", item_id, exc_info=True) - return _BAD_REQUEST + return _INTERNAL_ERROR _CRUD_CACHE.pop(item_id, None) return Response( body=json.dumps({ @@ -470,4 +472,6 @@ def crud_upsert(req: "Request"): port = int(os.environ.get("PYRONOVA_PORT", "8080")) # Detect worker count from cgroup cpu.max (same pattern as actix's helper). # Pyronova's engine will fall back to num_cpus if PYRONOVA_WORKERS isn't set. - app.run(host=host, port=port) + _tls_ports_env = os.environ.get("PYRONOVA_TLS_PORTS") + extra_tls_ports = [int(p) for p in _tls_ports_env.split(",") if p.strip()] if _tls_ports_env else None + app.run(host=host, port=port, extra_tls_ports=extra_tls_ports) diff --git a/frameworks/pyronova/launcher.py b/frameworks/pyronova/launcher.py index 60f5932e0..4ba18d129 100644 --- a/frameworks/pyronova/launcher.py +++ b/frameworks/pyronova/launcher.py @@ -1,21 +1,19 @@ -"""Launcher — spawns two Pyronova processes (HTTP plain + HTTPS). - -Plain HTTP on $PORT (default 8080) and HTTPS on $PORT+1 for the json-tls -profile. A separate HTTP/2 listener on 8443 is launched when TLS certs -are present — rustls advertises ALPN h2 + http/1.1, so clients negotiate -automatically. - -Why two processes: Pyronova's `app.run()` binds a single port. Running it -twice is the simplest way to serve plaintext + TLS without adding -multi-bind support to the engine for one benchmark. Each process gets -half the available CPUs so we don't over-subscribe the sub-interpreter -pool. +"""Launcher — spawns ONE Pyronova process serving all ports simultaneously. + +Plain HTTP on $PORT (default 8080). When TLS certs are present, HTTPS is +also served on $PORT+1 (json-tls profile) and 8443 (baseline-h2 / static-h2 +profile) via PYRONOVA_TLS_PORTS — all from the same process. + +Each TPC thread creates its own SO_REUSEPORT socket on every port, so all +cores serve all profiles simultaneously. This gives every profile access to +all CPUs, unlike the old two-process approach that split cores 50/50. """ import os import signal import subprocess import sys +import threading import time @@ -41,32 +39,21 @@ def _numa_nodes() -> int: def main() -> int: total = _cpu_count() - per_proc = max(total // 2, 1) - # IO workers sizing is NUMA-topology-aware. Two regimes: - # - # Multi-node (Threadripper PRO / EPYC / multi-socket Xeon): cap IO - # at `per_proc // 4` so the accept loops + hyper socket threads - # stay on a couple of CCDs. Letting IO spread across every CCD - # turns every `crossbeam_channel::recv` into an Infinity-Fabric - # round trip — Leo observed this as "baseline stops scaling past - # 26 cores on the 3995WX" in the Arena run. - # - # Single-node (laptops, Apple Silicon, most cloud VMs): IO == cores. - # The prior NUMA-only formula cost 32% throughput on an M5 Pro - # because IO threads starved with no matching compute benefit. - if _numa_nodes() > 1: - io_per_proc = min(max(per_proc // 4, 4), 16) - else: - io_per_proc = per_proc + per_proc = total + io_per_proc = per_proc base_port = int(os.environ.get("PORT", "8080")) tls_cert = os.environ.get("TLS_CERT", "/certs/server.crt") tls_key = os.environ.get("TLS_KEY", "/certs/server.key") have_tls = os.path.exists(tls_cert) and os.path.exists(tls_key) - env_common = dict(os.environ) - env_common["PYRONOVA_WORKERS"] = str(per_proc) - env_common["PYRONOVA_IO_WORKERS"] = str(io_per_proc) + env = dict(os.environ) + # PYRONOVA_WORKERS controls TPC thread count in TPC mode (one thread per + # logical CPU). Rust's auto-detect uses physical_core_count() which misses + # hyperthreads — on a 32C/64T Threadripper that would give 32; we want 64. + env["PYRONOVA_WORKERS"] = str(per_proc) + env["PYRONOVA_HOST"] = "0.0.0.0" + env["PYRONOVA_PORT"] = str(base_port) # GIL-bridge sizing for gil=True routes under TPC. Default is 4 workers # + 16×4=64 channel depth — correct for typical apps with 1-2 numpy # routes. HttpArena's async-db / crud profiles hammer gil=True paths @@ -75,78 +62,62 @@ def main() -> int: # contract). Widen to 16 workers + 8192 capacity so the DB-heavy # gcannon profiles see sustained throughput instead of a 503 storm. # Verified locally at c=4096: 15k req/s steady, zero drops. - env_common.setdefault("PYRONOVA_GIL_BRIDGE_WORKERS", "16") - env_common.setdefault("PYRONOVA_GIL_BRIDGE_CAPACITY", "8192") + env.setdefault("PYRONOVA_GIL_BRIDGE_WORKERS", "16") + env.setdefault("PYRONOVA_GIL_BRIDGE_CAPACITY", "8192") # Metrics / access log off; benchmarks care about throughput, not logs. - env_common.pop("PYRONOVA_LOG", None) - env_common.pop("PYRONOVA_METRICS", None) + env.pop("PYRONOVA_LOG", None) + env.pop("PYRONOVA_METRICS", None) # Hard-silence the tracing subscriber. Default level is ERROR, which # still writes any `tracing::error!` call to stderr — under 4096-conn # load a single recurring error log (see the PyObjRef leak bug) drags # throughput by ~3× from log-pipe contention alone. OFF makes every # tracing macro a zero-cost no-op, matching what Actix / Helidon / # ASP.NET ship in their benchmark images. - env_common["PYRONOVA_LOG_LEVEL"] = "OFF" - - procs = [] - - # Plain HTTP on $base_port. - env_plain = dict(env_common) - env_plain["PYRONOVA_PORT"] = str(base_port) - env_plain["PYRONOVA_HOST"] = "0.0.0.0" - env_plain.pop("PYRONOVA_TLS_CERT", None) - env_plain.pop("PYRONOVA_TLS_KEY", None) - procs.append(subprocess.Popen(["python3", "app.py"], env=env_plain)) + env["PYRONOVA_LOG_LEVEL"] = "OFF" - # HTTPS on $base_port + 1 (json-tls profile target). if have_tls: - env_tls = dict(env_common) - env_tls["PYRONOVA_PORT"] = str(base_port + 1) - env_tls["PYRONOVA_HOST"] = "0.0.0.0" - env_tls["PYRONOVA_TLS_CERT"] = tls_cert - env_tls["PYRONOVA_TLS_KEY"] = tls_key - procs.append(subprocess.Popen(["python3", "app.py"], env=env_tls)) - - # HTTP/2 on 8443 (baseline-h2 / static-h2 profile target). ALPN on - # this listener advertises h2 + http/1.1; hyper's AutoBuilder picks - # the right protocol from the handshake. - env_h2 = dict(env_tls) - env_h2["PYRONOVA_PORT"] = "8443" - procs.append(subprocess.Popen(["python3", "app.py"], env=env_h2)) + env["PYRONOVA_TLS_CERT"] = tls_cert + env["PYRONOVA_TLS_KEY"] = tls_key + env["PYRONOVA_TLS_PORTS"] = f"{base_port + 1},8443" + else: + env.pop("PYRONOVA_TLS_CERT", None) + env.pop("PYRONOVA_TLS_KEY", None) + env.pop("PYRONOVA_TLS_PORTS", None) + + proc = subprocess.Popen(["python3", "app.py"], env=env) def shutdown(_sig, _frame): - for p in procs: + # Signal handlers must not block — offload the wait+kill to a thread. + def _cleanup(): + import logging as _log try: - p.terminate() + proc.terminate() except Exception: - pass - # give them a moment to drain gracefully; Pyronova's graceful shutdown - # waits up to 30s for in-flight conns — the Arena harness typically - # SIGKILLs the container anyway, but polite is polite. - time.sleep(1) - for p in procs: - if p.poll() is None: + _log.warning("launcher: terminate failed for pid %s", proc.pid, exc_info=True) + # give it a moment to drain gracefully; Pyronova's graceful + # shutdown waits up to 30s for in-flight conns — Arena harness + # typically SIGKILLs the container anyway, but polite is polite. + time.sleep(1) + if proc.poll() is None: try: - p.kill() + proc.kill() except Exception: - pass - sys.exit(0) + _log.warning("launcher: kill failed for pid %s", proc.pid, exc_info=True) + # os._exit terminates all threads (including this daemon thread); + # sys.exit(0) from a daemon thread only kills the daemon thread. + os._exit(0) + threading.Thread(target=_cleanup, daemon=True).start() signal.signal(signal.SIGTERM, shutdown) signal.signal(signal.SIGINT, shutdown) - # Wait on the plain HTTP process; when it exits the harness is done - # with us anyway. Terminate the others if they're still up. + import logging as _log try: - procs[0].wait() + rc = proc.wait() + if rc != 0: + _log.warning("launcher: process exited with code %d", rc) except Exception: - pass - for p in procs[1:]: - if p.poll() is None: - try: - p.terminate() - except Exception: - pass + _log.warning("launcher: wait() failed", exc_info=True) return 0 diff --git a/frameworks/pyronova/meta.json b/frameworks/pyronova/meta.json index 4c825c8af..e08345ce3 100644 --- a/frameworks/pyronova/meta.json +++ b/frameworks/pyronova/meta.json @@ -4,7 +4,7 @@ "type": "tuned", "engine": "pyronova", "description": "Pyronova — Python web framework with a Rust core (hyper + tokio + rustls + mimalloc) and PEP 684 sub-interpreter workers for true multi-core parallelism. Opt-in features: gzip/brotli compression, rustls TLS with h2/h1 ALPN, streaming body ingest, async Postgres via sqlx::PgPool. Handlers are standard Python functions routed via decorators.", - "repo": "https://github.com/moomoo-tech/pyronova", + "repo": "https://github.com/leocaolab/pyronova", "enabled": true, "tests": [ "baseline", @@ -25,4 +25,4 @@ "unary-grpc-tls" ], "maintainers": [] -} +} \ No newline at end of file