-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathloadgen.py
More file actions
127 lines (107 loc) · 4 KB
/
loadgen.py
File metadata and controls
127 lines (107 loc) · 4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
"""
edgepilot loadgen
drives traffic at a cloudflare zone so recon-agent analytics have something to report
not abusive - ~30 req/sec by default, all paths, mix of user agents
"""
import argparse
import asyncio
import os
import random
import signal
import time
from itertools import cycle
import aiohttp
PATHS = [
"/", "/index.html", "/login", "/api/v1/users", "/admin",
"/wp-login.php", "/.env", "/api/v2/orders", "/search?q=test",
"/health", "/robots.txt", "/static/app.js",
]
# mix of benign and bot-y agents - cloudflare often tags the latter as threats
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 13_0) AppleWebKit/605.1.15",
"curl/8.4.0",
"python-requests/2.31.0",
"Go-http-client/1.1",
"sqlmap/1.7.11",
"Nikto/2.5.0",
"masscan/1.3.2",
"Mozilla/5.0 (compatible; Baiduspider/2.0)",
"ZmEu",
"Mozilla/5.0 (compatible; GPTBot/1.0; +https://openai.com/gptbot)",
"ClaudeBot/1.0",
"Mozilla/5.0 (compatible; PerplexityBot/1.0; +https://perplexity.ai/perplexitybot)",
"Mozilla/5.0 (compatible; ChatGPT-User/1.0; +https://openai.com/bot)",
"Bytespider",
"CCBot/2.0",
]
async def hit(session, url, ua):
try:
async with session.get(url, headers={"User-Agent": ua}, timeout=aiohttp.ClientTimeout(total=5)) as r:
await r.read()
return r.status
except Exception:
return 0
async def worker(name, session, base, rate, stats, stop):
interval = 1.0 / rate
ua_cycle = cycle(USER_AGENTS)
while not stop.is_set():
path = random.choice(PATHS)
ua = next(ua_cycle)
status = await hit(session, base + path, ua)
stats["sent"] += 1
if status == 0:
stats["errors"] += 1
await asyncio.sleep(interval)
async def heartbeat(stats, stop):
last = time.time()
last_sent = 0
while not stop.is_set():
await asyncio.sleep(5)
now = time.time()
rps = (stats["sent"] - last_sent) / (now - last)
print(f" sent={stats['sent']:>6} errors={stats['errors']:>4} rps={rps:.1f}")
last = now
last_sent = stats["sent"]
async def main():
p = argparse.ArgumentParser()
p.add_argument("--zone", default=os.environ.get("CLOUDFLARE_ZONE_NAME", "stackql.xyz"))
p.add_argument("--rps", type=float, default=30.0, help="total target requests per second")
p.add_argument("--concurrency", type=int, default=10)
p.add_argument("--duration", type=int, default=0, help="seconds (0 = run until ctrl-c)")
args = p.parse_args()
base = f"https://{args.zone}"
per_worker_rps = args.rps / args.concurrency
stats = {"sent": 0, "errors": 0}
stop = asyncio.Event()
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(sig, stop.set)
except NotImplementedError:
pass # windows
print(f"loadgen -> {base}")
print(f"target {args.rps} req/sec across {args.concurrency} workers ({per_worker_rps:.2f} rps each)")
if args.duration:
print(f"running for {args.duration}s\n")
else:
print("running until ctrl-c\n")
connector = aiohttp.TCPConnector(limit=args.concurrency, ssl=False)
async with aiohttp.ClientSession(connector=connector) as session:
workers = [asyncio.create_task(worker(f"w{i}", session, base, per_worker_rps, stats, stop))
for i in range(args.concurrency)]
hb = asyncio.create_task(heartbeat(stats, stop))
if args.duration:
try:
await asyncio.wait_for(stop.wait(), timeout=args.duration)
except asyncio.TimeoutError:
stop.set()
else:
await stop.wait()
for w in workers:
w.cancel()
hb.cancel()
await asyncio.gather(*workers, hb, return_exceptions=True)
print(f"\ndone. sent={stats['sent']} errors={stats['errors']}")
if __name__ == "__main__":
asyncio.run(main())