Skip to content

feat: add webhook rate limiting and stats tracking#85

Open
gauthierdmn wants to merge 1 commit intomainfrom
feat/webhook-rate-limiter
Open

feat: add webhook rate limiting and stats tracking#85
gauthierdmn wants to merge 1 commit intomainfrom
feat/webhook-rate-limiter

Conversation

@gauthierdmn
Copy link
Copy Markdown
Owner

Add in-memory sliding window rate limiter to protect webhook endpoints from replay attacks and retry storms. Includes per-platform statistics tracking with disk persistence and a /stats endpoint for observability.

Add in-memory sliding window rate limiter to protect webhook endpoints
from replay attacks and retry storms. Includes per-platform statistics
tracking with disk persistence and a /stats endpoint for observability.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

@nominalbot nominalbot bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR adds a rate limiter, middleware, and stats tracking for webhook endpoints. There are several significant issues: the X-Forwarded-For parsing extracts the wrong IP (last instead of first), WebhookStats.record_request() is never called so stats are always empty, pickle.load from a world-writable temp directory is a security risk, and the reset() method can raise KeyError on unknown IPs.

forwarded_for: str = request.headers.get("X-Forwarded-For", "")

if forwarded_for:
return forwarded_for.split(",")[-1].strip()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: X-Forwarded-For format is client, proxy1, proxy2. Using [-1] returns the last proxy's IP, not the original client IP. This means all clients behind the same proxy share one rate-limit bucket, and the actual client IP is never rate-limited individually. Use [0] to get the original client IP (or the rightmost untrusted entry if you have a list of trusted proxies).

Suggested change
return forwarded_for.split(",")[-1].strip()
return forwarded_for.split(",")[0].strip()

self._total_time: dict[str, float] = defaultdict(float)
self._load()

def record_request(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead code: record_request() is never called anywhere in the codebase. WebhookStats is instantiated in create_app and its get_summary() is exposed via /stats, but no handler ever calls record_request(), so the stats endpoint will always return an empty dict. You likely need to call this from the webhook handler or the middleware.

Comment on lines +113 to +114
with open(STATS_FILE, "rb") as file:
data = pickle.load(file)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security: pickle.load from a file in the system temp directory (/tmp/webhook_stats.pkl) is a code execution risk. Any local user or co-tenant can place a crafted pickle file at that path, and it will be deserialized when the server starts. Consider using a safer format like JSON, or at minimum use an application-owned directory with restrictive permissions.

client_ip (str): The client's IP address.
"""

del self._requests[client_ip]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: del self._requests[client_ip] raises KeyError if the IP has not been seen before. Even though _requests is a defaultdict, del on a missing key still raises. Use self._requests.pop(client_ip, None) instead.

Suggested change
del self._requests[client_ip]
self._requests.pop(client_ip, None)


bypass_token: str = request.headers.get("X-Rate-Limit-Bypass", "")

if bypass_token and bypass_token == RATE_LIMIT_BYPASS_TOKEN:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security: When RATE_LIMIT_BYPASS_TOKEN is empty (env var not set), the bypass_token and guard prevents bypass. However, the comparison with == is susceptible to timing attacks. Use hmac.compare_digest() for constant-time comparison of secrets.

Suggested change
if bypass_token and bypass_token == RATE_LIMIT_BYPASS_TOKEN:
if bypass_token and RATE_LIMIT_BYPASS_TOKEN and hmac.compare_digest(bypass_token, RATE_LIMIT_BYPASS_TOKEN):


def build_rate_limit_middleware(
limiter: WebhookRateLimiter,
) -> Any:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per coding guidelines, avoid Any as a return type. The return type of build_rate_limit_middleware should be the actual middleware type. aiohttp's @web.middleware returns a callable with a known signature.


RATE_LIMIT_BYPASS_TOKEN: str = os.environ.get("RATE_LIMIT_BYPASS_TOKEN", "")

_Handler = Callable[[web.Request], Awaitable[web.StreamResponse]]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused type alias: _Handler is defined but never referenced anywhere in this file.

app["stats"] = stats

app.router.add_get(path="/health", handler=_handle_health)
app.router.add_get(path="/stats", handler=_handle_stats)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The /health and /stats endpoints are behind the rate-limit middleware. Health-check probes from load balancers will consume rate-limit tokens and could eventually be blocked (429). Consider exempting these paths in the middleware, or moving them to a separate sub-application without the middleware.

Copy link
Copy Markdown

@nominalbot nominalbot bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR adds webhook rate limiting, middleware, and stats tracking. Several important issues were already flagged by existing reviewers. Additional concerns include an unbounded memory leak in the rate limiter (cleanup is never scheduled), the stats endpoint being exposed without authentication, and missing test coverage for the middleware and stats modules.

for ip in expired_keys:
del self._requests[ip]

logger.debug("Rate limiter cleanup: removed %d expired IPs", len(expired_keys))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory leak: cleanup() is defined but never called from any production code path. Without a periodic background task invoking it, _requests will grow unboundedly as new IPs arrive and old entries are never pruned (they are only pruned per-IP when that same IP calls is_allowed again). Consider registering an on_startup/on_cleanup hook on the aiohttp app that schedules a periodic asyncio task to call limiter.cleanup().

"total_errors": self._errors[platform],
"error_rate": self._errors[platform] / count,
"avg_response_time_ms": round(avg_time * 1000, 2),
"last_updated": str(datetime.now()),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datetime.now() returns a naive datetime with no timezone info, making last_updated ambiguous. Use datetime.now(tz=datetime.UTC) or datetime.now(tz=timezone.utc) for an unambiguous timestamp.

Suggested change
"last_updated": str(datetime.now()),
"last_updated": datetime.now(tz=datetime.UTC).isoformat(),


stats: WebhookStats = request.app["stats"]

return web.json_response(stats.get_summary())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The /stats endpoint exposes internal operational data (per-platform request counts, error rates, processing times) to any caller with no authentication or authorization check. This could leak information about traffic patterns and platform usage. Consider protecting it behind an auth check or restricting it to internal/admin routes.


for platform in self._counters:
count: int = self._counters[platform]
avg_time: float = self._total_time[platform] / count
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential ZeroDivisionError: if _counters is somehow loaded from a corrupted pickle file where a platform has a count of 0, self._total_time[platform] / count and self._errors[platform] / count will raise. Add a guard for count == 0.


Returns:
int: Remaining requests in the current window.
"""
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_remaining() accesses self._requests[client_ip] on a defaultdict, which silently inserts an empty list for never-seen IPs. This pollutes the internal dict with empty entries for any IP that is queried. Consider using .get(client_ip, []) instead.

Suggested change
"""
timestamps: list[float] = self._requests.get(client_ip, [])

start_time: float = time.time()

if not limiter.is_allowed(client_ip):
remaining: int = limiter.get_remaining(client_ip)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is_allowed returns False, get_remaining is called but is_allowed already pruned expired entries and did not append the current timestamp. So get_remaining will always return 0 (or a small non-negative value). The header is technically correct but the extra call is unnecessary — you could hardcode 0 or omit the header for 429 responses.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant