Skip to content
This repository was archived by the owner on Mar 1, 2022. It is now read-only.

Commit b0753e4

Browse files
committed
nasa loadgenerator
1 parent eee2ef1 commit b0753e4

6 files changed

+399
-10
lines changed

.envrc

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
layout python
22
unset HTTP_PROXY http_proxy
3-
export LOCUST_STATSD_HOST=172.17.0.8 \
3+
. ~/.rancher-credentials
4+
export LOCUST_STATSD_HOST=172.17.0.10 \
45
LOCUST_USERS=10 \
56
LOCUST_METRICS_EXPORT="measurements" \
67
LOCUST_MEASUREMENT_NAME="measurement" \

locustfile.py

+78-4
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414
from locust import HttpLocust, TaskSet, task, events, runners
1515
import requests
1616
import statsd
17+
import pandas as pd
1718

1819
from loadgenerator import project, csrf
1920
import metrics
21+
import logparser
2022

2123
host = os.environ.get("LOCUST_STATSD_HOST", "localhost")
2224
port = os.environ.get("LOCUST_STATSD_PORT", "8125")
@@ -25,29 +27,33 @@
2527
MEASUREMENT_NAME = os.environ.get("LOCUST_MEASUREMENT_NAME", "measurement")
2628
MEASUREMENT_DESCRIPTION = os.environ.get("LOCUST_MEASUREMENT_DESCRIPTION", "linear increase")
2729
DURATION = int(os.environ.get("LOCUST_DURATION", "20"))
30+
print(DURATION)
2831
USERS = int(os.environ.get("LOCUST_USERS", '10'))
2932
HATCH_RATE = float(os.environ.get("LOCUST_HATCH_RATE", "1"))
30-
LOAD_TYPE = os.environ.get("LOCUST_LOAD_TYPE", "constant") # linear, constant, random
33+
LOAD_TYPE = os.environ.get("LOCUST_LOAD_TYPE", "constant") # linear, constant, random, nasa, worldcup
3134
SPAWN_WAIT_MEAN = int(os.environ.get("LOCUST_SPAWN_WAIT_MEAN", "10"))
3235
SPAWN_WAIT_STD = int(os.environ.get("LOCUST_SPAWN_WAIT_STD", "4"))
3336
USER_MEAN = int(os.environ.get("LOCUST_USER_MEAN", "40"))
3437
USER_STD = int(os.environ.get("LOCUST_USER_STD", "5"))
3538
WAIT_MEAN = int(os.environ.get("LOCUST_WAIT_MEAN", "10"))
3639
WAIT_STD = int(os.environ.get("LOCUST_WAIT_STD", "4"))
40+
WEB_LOGS_PATH = os.environ.get("LOCUST_LOG_PATH", "logs") # path to nasa/worldcup logs
3741

3842
def wait(self):
3943
gevent.sleep(random.normalvariate(WAIT_MEAN, WAIT_STD))
4044
TaskSet.wait = wait
4145

4246
def login(l):
4347
resp = l.client.get("/login")
48+
print(resp)
4449
l.csrf_token = csrf.find_in_page(resp.content)
4550
data = {
4651
"_csrf": l.csrf_token,
4752
"email": l.email,
4853
"password": "password"
4954
}
5055
r = l.client.post("/login", data)
56+
print(r)
5157
assert r.json().get("redir", None) == "/project"
5258

5359
def create_delete_project(l):
@@ -140,6 +146,32 @@ def print_color(text):
140146
reset=CSI+"m"
141147
print((CSI+"31;40m%s"+CSI+"0m") % text)
142148

149+
150+
def replay_log_measure(df):
151+
def wait(self):
152+
from remote_pdb import RemotePdb
153+
RemotePdb('127.0.0.1', 4444).set_trace()
154+
gevent.sleep(random.normalvariate(WAIT_MEAN, WAIT_STD))
155+
TaskSet.wait = wait
156+
157+
runner = runners.locust_runner
158+
locust = runner.locust_classes[0]
159+
started_at = datetime.utcnow()
160+
for client in df.groupby(["client_id", "session_id"]):
161+
pass
162+
while True:
163+
if (datetime.utcnow() - started_at).seconds > DURATION:
164+
break
165+
def start_locust(_):
166+
try:
167+
l = locust()
168+
#l.requests =
169+
l.run()
170+
except gevent.GreenletExit:
171+
pass
172+
runner.locusts.spawn(start_locust, locust)
173+
174+
143175
def random_measure():
144176
runner = runners.locust_runner
145177
locust = runner.locust_classes[0]
@@ -158,8 +190,10 @@ def start_locust(_):
158190
started_at = datetime.utcnow()
159191

160192
while True:
161-
if (datetime.utcnow() - started_at).seconds > DURATION:
193+
seconds = (datetime.utcnow() - started_at).seconds
194+
if seconds > DURATION:
162195
break
196+
print("%d seconds left!" % (DURATION - seconds))
163197
new_user = -1
164198
while new_user < 0:
165199
new_user = int(random.normalvariate(USER_MEAN, USER_STD))
@@ -176,14 +210,48 @@ def start_locust(_):
176210
if diff > 0:
177211
for l in random.sample(locusts, diff):
178212
if new_user >= len(runner.locusts): break
179-
runner.locusts.killone(l)
213+
try:
214+
runner.locusts.killone(l)
215+
except Exception as e:
216+
print("failed to kill locust: %s" % e)
180217
print("stop user: now: %d" % len(runner.locusts))
181218
STATSD.gauge("user", len(runner.locusts))
182219
wait = random.normalvariate(SPAWN_WAIT_MEAN, SPAWN_WAIT_STD)
183220
print_color("cooldown for %f" % wait)
184221
time.sleep(wait)
185222
stop_measure(started_at)
186223

224+
def read_log(type):
225+
if type == "nasa":
226+
read_log = logparser.read_nasa
227+
else: # "worldcup"
228+
read_log = logparser.read_worldcup
229+
df = read_log(WEB_LOGS_PATH, limit=1000)
230+
filter = df["type"].isin(["HTML", "DYNAMIC", "DIRECTORY"])
231+
if type == "worldcup":
232+
filter = filter & (df.region == "Paris") & (df.server == 4)
233+
return df[filter]
234+
235+
def session_number(v):
236+
diff = v.timestamp.diff(1)
237+
diff.fillna(0, inplace=True)
238+
sessions = (diff > pd.Timedelta(minutes=10)).cumsum()
239+
data = dict(client_id=v.client_id, timestamp=v.timestamp,
240+
session_id=sessions.values)
241+
return pd.DataFrame(data)
242+
243+
def started_at(v):
244+
data = dict(client_id=v.client_id, timestamp=v.timestamp, session_id=v.session_id,
245+
started_at=[v.timestamp.iloc[0]] * len(v.timestamp))
246+
return pd.DataFrame(data)
247+
248+
def group_log_by_sessions(df):
249+
df = df.sort_values("timestamp")
250+
per_client = df.groupby(df.client_id, sort=False)
251+
with_session = per_client.apply(session_number)
252+
by = [with_session.client_id, with_session.session_id]
253+
return with_session.groupby(by).apply(started_at)
254+
187255
def measure():
188256
RequestStats()
189257
time.sleep(5)
@@ -196,7 +264,13 @@ def measure():
196264
def linear_measure(*args, **kw):
197265
stop_measure(started_at)
198266
events.hatch_complete += linear_measure
199-
else: # "random"
267+
elif LOAD_TYPE == "random":
200268
random_measure()
269+
elif LOAD_TYPE == "nasa" or LOAD_TYPE == "worldcup":
270+
df = read_log(LOAD_TYPE).head(10000)
271+
replay_log_measure(group_log_by_sessions(df))
272+
else:
273+
sys.stderr.write("unsupported load type: %s" % LOAD_TYPE)
274+
sys.exit(1)
201275

202276
Thread(target=measure).start()

logparser.py

+206
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
import sys
2+
import re
3+
import os
4+
import gzip
5+
import warnings
6+
from datetime import datetime
7+
8+
import pytz
9+
import pandas as pd
10+
import numpy as np
11+
from collections import defaultdict
12+
13+
METHODS = [
14+
"GET",
15+
"HEAD",
16+
"POST",
17+
"PUT",
18+
"DELETE",
19+
"TRACE",
20+
"OPTIONS",
21+
"CONNECT",
22+
"OTHER_METHODS",
23+
]
24+
25+
TYPES = [
26+
"HTML",
27+
"IMAGE",
28+
"AUDIO",
29+
"VIDEO",
30+
"JAVA",
31+
"FORMATTED",
32+
"DYNAMIC",
33+
"TEXT",
34+
"COMPRESSED",
35+
"PROGRAMS",
36+
"DIRECTORY",
37+
"ICL",
38+
"OTHER_TYPES",
39+
"NUM_OF_FILETYPES",
40+
]
41+
42+
STATUS = [
43+
"100",
44+
"101",
45+
"200",
46+
"201",
47+
"202",
48+
"203",
49+
"204",
50+
"205",
51+
"206",
52+
"300",
53+
"301",
54+
"302",
55+
"303",
56+
"304",
57+
"305",
58+
"400",
59+
"401",
60+
"402",
61+
"403",
62+
"404",
63+
"405",
64+
"406",
65+
"407",
66+
"408",
67+
"409",
68+
"410",
69+
"411",
70+
"412",
71+
"413",
72+
"414",
73+
"415",
74+
"500",
75+
"501",
76+
"502",
77+
"503",
78+
"504",
79+
"505",
80+
"OTHER_CODES",
81+
]
82+
83+
REGIONS = ["SantaClara", "Plano", "Herndon", "Paris"]
84+
85+
def request_type():
86+
"""
87+
struct request {
88+
uint32_t timestamp;
89+
uint32_t clientID;
90+
uint32_t objectID;
91+
uint32_t size;
92+
uint8_t method;
93+
uint8_t status;
94+
uint8_t type;
95+
uint8_t server;
96+
};
97+
"""
98+
def i(name): return (name, '>u4')
99+
def b(name): return (name, 'b')
100+
return np.dtype([i('timestamp'),
101+
i('client_id'),
102+
i('object_id'),
103+
i('size'),
104+
b('method'),
105+
b('status'),
106+
b('type'),
107+
b('server')])
108+
109+
def inverse_dict(map):
110+
return dict((v, k) for k, v in map.iteritems())
111+
112+
def read_worldcup(path):
113+
buf = gzip.open(path, "r").read()
114+
df = pd.DataFrame(np.frombuffer(buf, dtype=request_type()))
115+
timestamp = df.timestamp.values.astype(np.int64)
116+
117+
from_codes = pd.Categorical.from_codes
118+
fields = dict(timestamp=timestamp.view("datetime64[s]"),
119+
client_id=df.client_id,
120+
object_id=df.object_id,
121+
size=df.size,
122+
method=from_codes(df.method, categories=METHODS),
123+
status=from_codes(df.status & 0x3f, categories=STATUS),
124+
type=from_codes(df.type, categories=TYPES),
125+
region=from_codes(df.server.apply(lambda x: x >> 5), categories=REGIONS),
126+
server=df.server & 0x1F)
127+
return pd.DataFrame(fields)
128+
129+
130+
def parse_datetime(x):
131+
"""
132+
Parses datetime with timezone formatted as: `[day/month/year:hour:minute:second zone]`
133+
Example:
134+
`>>> parse_datetime('13/Nov/2015:11:45:42 +0000')`
135+
`datetime.datetime(2015, 11, 3, 11, 45, 4, tzinfo=<UTC>)`
136+
137+
Due to problems parsing the timezone (`%z`) with `datetime.strptime`, the
138+
timezone will be obtained using the `pytz` library.
139+
"""
140+
if x == "-" or x == "" or x is None: return None
141+
dt = datetime.strptime(x[1:-7], '%d/%b/%Y:%H:%M:%S')
142+
dt_tz = int(x[-6:-3]) * 60 + int(x[-3:-1])
143+
return dt.replace(tzinfo=pytz.FixedOffset(dt_tz))
144+
145+
def read_nasa(path, limit=-1):
146+
# example line:
147+
# 199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245
148+
parts = [
149+
r'(?P<host>\S+)', # host %h
150+
r'\S+', # indent %l (unused)
151+
r'(?P<user>\S+)', # user %u
152+
r'(?P<time>.+)', # time %t
153+
r'"(?P<method>\S+)\s+(?P<ressource>[^" ]+)([^"]+)?"', # request "%r"
154+
r'(?P<status>[0-9]+)', # status %>s
155+
r'(?P<size>\S+)', # size %b (careful, can be '-')
156+
]
157+
regex = r'\s+'.join(parts)+r'\s*\Z'
158+
pattern = re.compile(regex)
159+
160+
log = defaultdict(list)
161+
with gzip.open(path, "r") as f:
162+
for linenumber, line in enumerate(f):
163+
if limit >= 0 and limit <= linenumber:
164+
import pdb; pdb.set_trace()
165+
break
166+
167+
line = line.decode("latin_1")
168+
m = pattern.match(line)
169+
if m is None:
170+
#warnings.warn("ignore invalid log line '%s'" % line.decode("ascii", "replace"))
171+
continue
172+
res = m.groupdict()
173+
log["client_id"].append(res["host"])
174+
log["timestamp"].append(parse_datetime(res["time"]))
175+
log["method"].append(res["method"])
176+
log["ressource"].append(res["ressource"])
177+
log["status"].append(res["status"])
178+
if res["size"] != "-":
179+
size = int(res["size"])
180+
else:
181+
size = None
182+
log["size"].append(size)
183+
ressource = res["ressource"].lower()
184+
filename, ext = os.path.splitext(ressource)
185+
ext = ext.lower()
186+
if ext.endswith("."):
187+
ext = ext[:-1]
188+
189+
if ressource.endswith("/") or ext == "" or ext in [".gov"]:
190+
type = "DIRECTORY"
191+
elif ".htm" in ressource or ext in [".txt", ".txt~", ".hmtl", ".htlm", ".hrml", ".bak"]:
192+
type = "HTML"
193+
elif ext in [".gif", ".xbm", ".jpg", ".jpeg", ".bmp"]:
194+
type = "IMAGE"
195+
elif ext in [".wav"]:
196+
type = "AUDIO"
197+
elif ext in [".mpg"]:
198+
type = "VIDEO"
199+
elif "cgi" in ressource or "?" in ressource or ext in [".pl", ".perl"]:
200+
type = "DYNAMIC"
201+
else:
202+
type = "OTHER_TYPES"
203+
log["type"].append(type)
204+
log["status"] = pd.Categorical(log["status"], categories=STATUS, ordered=False)
205+
log["method"] = pd.Categorical(log["method"], categories=METHODS, ordered=False)
206+
return pd.DataFrame(log)

requirements.txt

+3
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,6 @@ websocket-client
44
zmq
55
influxdb
66
statsd
7+
pytz
8+
numpy
9+
pandas

0 commit comments

Comments
 (0)