feat: add webhook rate limiting and stats tracking#85
feat: add webhook rate limiting and stats tracking#85gauthierdmn wants to merge 1 commit intomainfrom
Conversation
Add in-memory sliding window rate limiter to protect webhook endpoints from replay attacks and retry storms. Includes per-platform statistics tracking with disk persistence and a /stats endpoint for observability. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
The PR adds a rate limiter, middleware, and stats tracking for webhook endpoints. There are several significant issues: the X-Forwarded-For parsing extracts the wrong IP (last instead of first), WebhookStats.record_request() is never called so stats are always empty, pickle.load from a world-writable temp directory is a security risk, and the reset() method can raise KeyError on unknown IPs.
| forwarded_for: str = request.headers.get("X-Forwarded-For", "") | ||
|
|
||
| if forwarded_for: | ||
| return forwarded_for.split(",")[-1].strip() |
There was a problem hiding this comment.
Bug: X-Forwarded-For format is client, proxy1, proxy2. Using [-1] returns the last proxy's IP, not the original client IP. This means all clients behind the same proxy share one rate-limit bucket, and the actual client IP is never rate-limited individually. Use [0] to get the original client IP (or the rightmost untrusted entry if you have a list of trusted proxies).
| return forwarded_for.split(",")[-1].strip() | |
| return forwarded_for.split(",")[0].strip() |
| self._total_time: dict[str, float] = defaultdict(float) | ||
| self._load() | ||
|
|
||
| def record_request( |
There was a problem hiding this comment.
Dead code: record_request() is never called anywhere in the codebase. WebhookStats is instantiated in create_app and its get_summary() is exposed via /stats, but no handler ever calls record_request(), so the stats endpoint will always return an empty dict. You likely need to call this from the webhook handler or the middleware.
| with open(STATS_FILE, "rb") as file: | ||
| data = pickle.load(file) |
There was a problem hiding this comment.
Security: pickle.load from a file in the system temp directory (/tmp/webhook_stats.pkl) is a code execution risk. Any local user or co-tenant can place a crafted pickle file at that path, and it will be deserialized when the server starts. Consider using a safer format like JSON, or at minimum use an application-owned directory with restrictive permissions.
| client_ip (str): The client's IP address. | ||
| """ | ||
|
|
||
| del self._requests[client_ip] |
There was a problem hiding this comment.
Bug: del self._requests[client_ip] raises KeyError if the IP has not been seen before. Even though _requests is a defaultdict, del on a missing key still raises. Use self._requests.pop(client_ip, None) instead.
| del self._requests[client_ip] | |
| self._requests.pop(client_ip, None) |
|
|
||
| bypass_token: str = request.headers.get("X-Rate-Limit-Bypass", "") | ||
|
|
||
| if bypass_token and bypass_token == RATE_LIMIT_BYPASS_TOKEN: |
There was a problem hiding this comment.
Security: When RATE_LIMIT_BYPASS_TOKEN is empty (env var not set), the bypass_token and guard prevents bypass. However, the comparison with == is susceptible to timing attacks. Use hmac.compare_digest() for constant-time comparison of secrets.
| if bypass_token and bypass_token == RATE_LIMIT_BYPASS_TOKEN: | |
| if bypass_token and RATE_LIMIT_BYPASS_TOKEN and hmac.compare_digest(bypass_token, RATE_LIMIT_BYPASS_TOKEN): |
|
|
||
| def build_rate_limit_middleware( | ||
| limiter: WebhookRateLimiter, | ||
| ) -> Any: |
There was a problem hiding this comment.
Per coding guidelines, avoid Any as a return type. The return type of build_rate_limit_middleware should be the actual middleware type. aiohttp's @web.middleware returns a callable with a known signature.
|
|
||
| RATE_LIMIT_BYPASS_TOKEN: str = os.environ.get("RATE_LIMIT_BYPASS_TOKEN", "") | ||
|
|
||
| _Handler = Callable[[web.Request], Awaitable[web.StreamResponse]] |
There was a problem hiding this comment.
Unused type alias: _Handler is defined but never referenced anywhere in this file.
| app["stats"] = stats | ||
|
|
||
| app.router.add_get(path="/health", handler=_handle_health) | ||
| app.router.add_get(path="/stats", handler=_handle_stats) |
There was a problem hiding this comment.
The /health and /stats endpoints are behind the rate-limit middleware. Health-check probes from load balancers will consume rate-limit tokens and could eventually be blocked (429). Consider exempting these paths in the middleware, or moving them to a separate sub-application without the middleware.
There was a problem hiding this comment.
The PR adds webhook rate limiting, middleware, and stats tracking. Several important issues were already flagged by existing reviewers. Additional concerns include an unbounded memory leak in the rate limiter (cleanup is never scheduled), the stats endpoint being exposed without authentication, and missing test coverage for the middleware and stats modules.
| for ip in expired_keys: | ||
| del self._requests[ip] | ||
|
|
||
| logger.debug("Rate limiter cleanup: removed %d expired IPs", len(expired_keys)) |
There was a problem hiding this comment.
Memory leak: cleanup() is defined but never called from any production code path. Without a periodic background task invoking it, _requests will grow unboundedly as new IPs arrive and old entries are never pruned (they are only pruned per-IP when that same IP calls is_allowed again). Consider registering an on_startup/on_cleanup hook on the aiohttp app that schedules a periodic asyncio task to call limiter.cleanup().
| "total_errors": self._errors[platform], | ||
| "error_rate": self._errors[platform] / count, | ||
| "avg_response_time_ms": round(avg_time * 1000, 2), | ||
| "last_updated": str(datetime.now()), |
There was a problem hiding this comment.
datetime.now() returns a naive datetime with no timezone info, making last_updated ambiguous. Use datetime.now(tz=datetime.UTC) or datetime.now(tz=timezone.utc) for an unambiguous timestamp.
| "last_updated": str(datetime.now()), | |
| "last_updated": datetime.now(tz=datetime.UTC).isoformat(), |
|
|
||
| stats: WebhookStats = request.app["stats"] | ||
|
|
||
| return web.json_response(stats.get_summary()) |
There was a problem hiding this comment.
The /stats endpoint exposes internal operational data (per-platform request counts, error rates, processing times) to any caller with no authentication or authorization check. This could leak information about traffic patterns and platform usage. Consider protecting it behind an auth check or restricting it to internal/admin routes.
|
|
||
| for platform in self._counters: | ||
| count: int = self._counters[platform] | ||
| avg_time: float = self._total_time[platform] / count |
There was a problem hiding this comment.
Potential ZeroDivisionError: if _counters is somehow loaded from a corrupted pickle file where a platform has a count of 0, self._total_time[platform] / count and self._errors[platform] / count will raise. Add a guard for count == 0.
|
|
||
| Returns: | ||
| int: Remaining requests in the current window. | ||
| """ |
There was a problem hiding this comment.
get_remaining() accesses self._requests[client_ip] on a defaultdict, which silently inserts an empty list for never-seen IPs. This pollutes the internal dict with empty entries for any IP that is queried. Consider using .get(client_ip, []) instead.
| """ | |
| timestamps: list[float] = self._requests.get(client_ip, []) |
| start_time: float = time.time() | ||
|
|
||
| if not limiter.is_allowed(client_ip): | ||
| remaining: int = limiter.get_remaining(client_ip) |
There was a problem hiding this comment.
When is_allowed returns False, get_remaining is called but is_allowed already pruned expired entries and did not append the current timestamp. So get_remaining will always return 0 (or a small non-negative value). The header is technically correct but the extra call is unnecessary — you could hardcode 0 or omit the header for 429 responses.
Add in-memory sliding window rate limiter to protect webhook endpoints from replay attacks and retry storms. Includes per-platform statistics tracking with disk persistence and a /stats endpoint for observability.