Skip to content

(in progress) Attempting to simplify and cleanup code. #241

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -101,7 +101,6 @@ def main():
get_version_string(project_name, project_version)
)
args = parser.parse_args()

if args.logname is not None:
print("Writting log to {}".format(args.logname))
logging.basicConfig(
@@ -119,6 +118,37 @@ def main():
datefmt=LOG_DATEFMT,
)
logging.info(get_version_string(project_name, project_version))

kwargs = process_args(args)

logging.info("Connecting to event stream server")
conn = connect_redis_server(
args.event_stream_host,
args.event_stream_port,
args.event_stream_user,
args.event_stream_pass,
)
kwargs["conn"] = conn

if kwargs["datasink_push_results_redistimeseries"] is True:
logging.info("Connecting to datasync server")
datasync_conn = connect_redis_server(
args.datasink_redistimeseries_host,
args.datasink_redistimeseries_port,
args.datasink_redistimeseries_user,
args.datasink_redistimeseries_pass,
)
kwargs["datasync_conn"] = datasync_conn

build_runners_consumer_group_create(conn, args.platform_name)

logging.info("Entering blocking read waiting for work.")
while True:
_, stream_id, _, _ = self_contained_coordinator_blocking_read(kwargs)


def process_args(args):
kwargs = {}
topologies_folder = os.path.abspath(args.setups_folder + "/topologies")
logging.info("Using topologies folder dir {}".format(topologies_folder))
topologies_files = pathlib.Path(topologies_folder).glob("*.yml")
@@ -128,240 +158,166 @@ def main():
" ".join([str(x) for x in topologies_files])
)
)
topologies_map = get_topologies(topologies_files[0])
testsuite_spec_files = extract_testsuites(args)
kwargs["topologies_map"] = get_topologies(topologies_files[0])
kwargs["testsuite_spec_files"] = extract_testsuites(args)

kwargs["home"] = str(Path.home())
kwargs["cpuset_start_pos"] = args.cpuset_start_pos
logging.info("Start CPU pinning at position {}".format(kwargs["cpuset_start_pos"]))
kwargs["redis_proc_start_port"] = args.redis_proc_start_port
logging.info(
"Reading event streams from: {}:{} with user {}".format(
args.event_stream_host, args.event_stream_port, args.event_stream_user
)
"Redis Processes start port: {}".format(kwargs["redis_proc_start_port"])
)
try:
conn = redis.StrictRedis(
host=args.event_stream_host,
port=args.event_stream_port,
decode_responses=False, # dont decode due to binary archives
password=args.event_stream_pass,
username=args.event_stream_user,
health_check_interval=REDIS_HEALTH_CHECK_INTERVAL,
socket_connect_timeout=REDIS_SOCKET_TIMEOUT,
socket_keepalive=True,
)
conn.ping()
except redis.exceptions.ConnectionError as e:
logging.error(
"Unable to connect to redis available at: {}:{} to read the event streams".format(
args.event_stream_host, args.event_stream_port
)
)
logging.error("Error message {}".format(e.__str__()))
exit(1)
datasink_conn = None
if args.datasink_push_results_redistimeseries:
logging.info(
"Checking redistimeseries datasink connection is available at: {}:{} to push the timeseries data".format(
args.datasink_redistimeseries_host, args.datasink_redistimeseries_port
)
)
try:
datasink_conn = redis.StrictRedis(
host=args.datasink_redistimeseries_host,
port=args.datasink_redistimeseries_port,
decode_responses=True,
password=args.datasink_redistimeseries_pass,
username=args.datasink_redistimeseries_user,
health_check_interval=REDIS_HEALTH_CHECK_INTERVAL,
socket_connect_timeout=REDIS_SOCKET_TIMEOUT,
socket_keepalive=True,
)
datasink_conn.ping()
except redis.exceptions.ConnectionError as e:
logging.error(
"Unable to connect to redis available at: {}:{}".format(
args.datasink_redistimeseries_host,
args.datasink_redistimeseries_port,
)
)
logging.error("Error message {}".format(e.__str__()))
exit(1)

logging.info("checking build spec requirements")
running_platform = args.platform_name
build_runners_consumer_group_create(conn, running_platform)
stream_id = None
docker_client = docker.from_env()
home = str(Path.home())
cpuset_start_pos = args.cpuset_start_pos
logging.info("Start CPU pinning at position {}".format(cpuset_start_pos))
redis_proc_start_port = args.redis_proc_start_port
logging.info("Redis Processes start port: {}".format(redis_proc_start_port))

# TODO: confirm we do have enough cores to run the spec
# availabe_cpus = args.cpu_count
datasink_push_results_redistimeseries = args.datasink_push_results_redistimeseries
grafana_profile_dashboard = args.grafana_profile_dashboard

defaults_filename = args.defaults_filename
kwargs["running_platform"] = args.platform_name

kwargs["stream_id"] = args.consumer_start_id
kwargs["docker_client"] = docker.from_env()

kwargs[
"datasink_push_results_redistimeseries"
] = args.datasink_push_results_redistimeseries
kwargs["grafana_profile_dashboard"] = args.grafana_profile_dashboard

kwargs["defaults_filename"] = args.defaults_filename
(
_,
default_metrics,
_,
_,
_,
) = get_defaults(defaults_filename)
) = get_defaults(kwargs["defaults_filename"])
kwargs["default_metrics"] = default_metrics

# Consumer id
consumer_pos = args.consumer_pos
logging.info("Consumer pos {}".format(consumer_pos))
kwargs["consumer_pos"] = args.consumer_pos
logging.info("Consumer pos {}".format(kwargs["consumer_pos"]))

# Arch
arch = args.arch
logging.info("Running for arch: {}".format(arch))
kwargs["arch"] = args.arch
logging.info("Running for arch: {}".format(kwargs["arch"]))

# Docker air gap usage
docker_air_gap = args.docker_air_gap
if docker_air_gap:
kwargs["docker_air_gap"] = args.docker_air_gap
if kwargs["docker_air_gap"]:
logging.info(
"Using docker in an air-gapped way. Restoring running images from redis keys."
)

profilers_list = []
profilers_enabled = args.enable_profilers
if profilers_enabled:
profilers_list = args.profilers.split(",")
kwargs["override_memtier_test_time"] = args.override_memtier_test_time
if kwargs["override_memtier_test_time"] > 0:
logging.info(
"Overriding memtier benchmark --test-time to {} seconds".format(
kwargs["override_memtier_test_time"]
)
)
kwargs["profilers_list"] = []
kwargs["profilers_enabled"] = args.enable_profilers
if kwargs["profilers_enabled"]:
kwargs["profilers_list"] = args.profilers.split(",")
res = check_compatible_system_and_kernel_and_prepare_profile(args)
if res is False:
logging.error(
"Requested for the following profilers to be enabled but something went wrong: {}.".format(
" ".join(profilers_list)
" ".join(kwargs["profilers_list"])
)
)
exit(1)
kwargs["consumer_name"] = "{}-self-contained-proc#{}".format(
get_runners_consumer_group_name(kwargs["running_platform"]),
kwargs["consumer_pos"],
)
logging.info(
"Consuming from group {}. Consumer id {}".format(
get_runners_consumer_group_name(kwargs["running_platform"]),
kwargs["consumer_name"],
)
)

override_memtier_test_time = args.override_memtier_test_time
if override_memtier_test_time > 0:
logging.info(
"Overriding memtier benchmark --test-time to {} seconds".format(
override_memtier_test_time
)
return kwargs


def connect_redis_server(
event_stream_host, event_stream_port, event_stream_user, event_stream_pass
):
logging.info(
"Connecting to Redis server: {}:{} with user {}".format(
event_stream_host, event_stream_port, event_stream_user
)
logging.info("Entering blocking read waiting for work.")
if stream_id is None:
stream_id = args.consumer_start_id
while True:
_, stream_id, _, _ = self_contained_coordinator_blocking_read(
conn,
datasink_push_results_redistimeseries,
docker_client,
home,
stream_id,
datasink_conn,
testsuite_spec_files,
topologies_map,
running_platform,
profilers_enabled,
profilers_list,
grafana_profile_dashboard,
cpuset_start_pos,
redis_proc_start_port,
consumer_pos,
docker_air_gap,
override_memtier_test_time,
default_metrics,
arch,
)
try:
conn = redis.StrictRedis(
host=event_stream_host,
port=event_stream_port,
decode_responses=False, # dont decode due to binary archives
password=event_stream_pass,
username=event_stream_user,
health_check_interval=REDIS_HEALTH_CHECK_INTERVAL,
socket_connect_timeout=REDIS_SOCKET_TIMEOUT,
socket_keepalive=True,
)
conn.ping()
except redis.exceptions.ConnectionError as e:
logging.error(
"Unable to connect to Redis server available at: {}:{}".format(
event_stream_host, event_stream_port
)
)
logging.error("Error message {}".format(e.__str__()))
exit(1)
return conn


def self_contained_coordinator_blocking_read(
conn,
datasink_push_results_redistimeseries,
docker_client,
home,
stream_id,
datasink_conn,
testsuite_spec_files,
topologies_map,
platform_name,
profilers_enabled,
profilers_list,
grafana_profile_dashboard="",
cpuset_start_pos=0,
redis_proc_start_port=6379,
consumer_pos=1,
docker_air_gap=False,
override_test_time=None,
default_metrics=None,
arch="amd64",
):
def self_contained_coordinator_blocking_read(kwargs):
num_process_streams = 0
num_process_test_suites = 0
overall_result = False
consumer_name = "{}-self-contained-proc#{}".format(
get_runners_consumer_group_name(platform_name), consumer_pos
)
logging.info(
"Consuming from group {}. Consumer id {}".format(
get_runners_consumer_group_name(platform_name), consumer_name
)
)
newTestInfo = conn.xreadgroup(
get_runners_consumer_group_name(platform_name),
consumer_name,
{STREAM_KEYNAME_NEW_BUILD_EVENTS: stream_id},

newTestInfo = kwargs["conn"].xreadgroup(
get_runners_consumer_group_name(kwargs["running_platform"]),
kwargs["consumer_name"],
{STREAM_KEYNAME_NEW_BUILD_EVENTS: kwargs["stream_id"]},
count=1,
block=0,
)
kwargs["newTestInfo"] = newTestInfo

if len(newTestInfo[0]) < 2 or len(newTestInfo[0][1]) < 1:
stream_id = ">"
kwargs["stream_id"] = ">"
else:
(
stream_id,
kwargs["stream_id"],
overall_result,
total_test_suite_runs,
) = process_self_contained_coordinator_stream(
conn,
datasink_push_results_redistimeseries,
docker_client,
home,
newTestInfo,
datasink_conn,
testsuite_spec_files,
topologies_map,
platform_name,
profilers_enabled,
profilers_list,
grafana_profile_dashboard,
cpuset_start_pos,
redis_proc_start_port,
docker_air_gap,
"defaults.yml",
None,
default_metrics,
arch,
)
) = process_self_contained_coordinator_stream(kwargs)

num_process_streams = num_process_streams + 1
num_process_test_suites = num_process_test_suites + total_test_suite_runs
if overall_result is True:
ack_reply = conn.xack(
ack_reply = kwargs["conn"].xack(
STREAM_KEYNAME_NEW_BUILD_EVENTS,
get_runners_consumer_group_name(platform_name),
stream_id,
get_runners_consumer_group_name(kwargs["running_platform"]),
kwargs["stream_id"],
)
if type(ack_reply) == bytes:
ack_reply = ack_reply.decode()
if ack_reply == "1" or ack_reply == 1:
logging.info(
"Sucessfully acknowledge build variation stream with id {}.".format(
stream_id
kwargs["stream_id"]
)
)
else:
logging.error(
"Unable to acknowledge build variation stream with id {}. XACK reply {}".format(
stream_id, ack_reply
kwargs["stream_id"], ack_reply
)
)
return overall_result, stream_id, num_process_streams, num_process_test_suites

return (
overall_result,
kwargs["stream_id"],
num_process_streams,
num_process_test_suites,
)


def prepare_memtier_benchmark_parameters(
@@ -390,31 +346,12 @@ def prepare_memtier_benchmark_parameters(
return None, benchmark_command_str


def process_self_contained_coordinator_stream(
conn,
datasink_push_results_redistimeseries,
docker_client,
home,
newTestInfo,
datasink_conn,
testsuite_spec_files,
topologies_map,
running_platform,
profilers_enabled=False,
profilers_list=[],
grafana_profile_dashboard="",
cpuset_start_pos=0,
redis_proc_start_port=6379,
docker_air_gap=False,
defaults_filename="defaults.yml",
override_test_time=None,
default_metrics=[],
arch="amd64",
):
def process_self_contained_coordinator_stream(kwargs):
stream_id = "n/a"
overall_result = False
total_test_suite_runs = 0
try:
newTestInfo = kwargs["newTestInfo"]
stream_id, testDetails = newTestInfo[0][1][0]
stream_id = stream_id.decode()
logging.info("Received work . Stream id {}.".format(stream_id))
@@ -433,22 +370,24 @@ def process_self_contained_coordinator_stream(
run_arch,
) = extract_build_info_from_streamdata(testDetails)

if run_arch == arch:
if run_arch == kwargs["arch"]:
overall_result = True
profiler_dashboard_links = []
if docker_air_gap:
if kwargs["docker_air_gap"]:
airgap_key = "docker:air-gap:{}".format(run_image)
logging.info(
"Restoring docker image: {} from {}".format(
run_image, airgap_key
)
)
airgap_docker_image_bin = conn.get(airgap_key)
images_loaded = docker_client.images.load(airgap_docker_image_bin)
airgap_docker_image_bin = kwargs["conn"].get(airgap_key)
images_loaded = kwargs["docker_client"].images.load(
airgap_docker_image_bin
)
logging.info("Successfully loaded images {}".format(images_loaded))

for test_file in testsuite_spec_files:
if defaults_filename in test_file:
for test_file in kwargs["testsuite_spec_files"]:
if kwargs["defaults_filename"] in test_file:
continue
redis_containers = []
client_containers = []
@@ -497,12 +436,14 @@ def process_self_contained_coordinator_stream(
test_result = False
redis_container = None
try:
current_cpu_pos = cpuset_start_pos
current_cpu_pos = kwargs["cpuset_start_pos"]
ceil_db_cpu_limit = extract_db_cpu_limit(
topologies_map, topology_spec_name
kwargs["topologies_map"], topology_spec_name
)
temporary_dir = tempfile.mkdtemp(dir=kwargs["home"])
temporary_dir_client = tempfile.mkdtemp(
dir=kwargs["home"]
)
temporary_dir = tempfile.mkdtemp(dir=home)
temporary_dir_client = tempfile.mkdtemp(dir=home)
logging.info(
"Using local temporary dir to persist redis build artifacts. Path: {}".format(
temporary_dir
@@ -519,13 +460,13 @@ def process_self_contained_coordinator_stream(
setup_type = "oss-standalone"
tf_triggering_env = "ci"
github_actor = "{}-{}".format(
tf_triggering_env, running_platform
tf_triggering_env, kwargs["running_platform"]
)
dso = "redis-server"
profilers_artifacts_matrix = []

collection_summary_str = ""
if profilers_enabled:
if kwargs["profilers_enabled"]:
collection_summary_str = (
local_profilers_platform_checks(
dso,
@@ -542,12 +483,15 @@ def process_self_contained_coordinator_stream(
)

restore_build_artifacts_from_test_details(
build_artifacts, conn, temporary_dir, testDetails
build_artifacts,
kwargs["conn"],
temporary_dir,
testDetails,
)
mnt_point = "/mnt/redis/"
command = generate_standalone_redis_server_args(
"{}redis-server".format(mnt_point),
redis_proc_start_port,
kwargs["redis_proc_start_port"],
mnt_point,
redis_configuration_parameters,
)
@@ -560,7 +504,10 @@ def process_self_contained_coordinator_stream(
run_image, db_cpuset_cpus, command_str
)
)
redis_container = docker_client.containers.run(

redis_container = kwargs[
"docker_client"
].containers.run(
image=run_image,
volumes={
temporary_dir: {
@@ -579,13 +526,16 @@ def process_self_contained_coordinator_stream(
)
redis_containers.append(redis_container)

r = redis.StrictRedis(port=redis_proc_start_port)
r = redis.StrictRedis(
port=kwargs["redis_proc_start_port"]
)
r.ping()
redis_conns = [r]
reset_commandstats(redis_conns)
redis_pids = []
first_redis_pid = r.info()["process_id"]
redis_pids.append(first_redis_pid)

ceil_client_cpu_limit = extract_client_cpu_limit(
benchmark_config
)
@@ -603,9 +553,9 @@ def process_self_contained_coordinator_stream(
benchmark_config,
benchmark_tool_workdir,
client_cpuset_cpus,
docker_client,
kwargs["docker_client"],
git_hash,
redis_proc_start_port,
kwargs["redis_proc_start_port"],
temporary_dir,
test_name,
)
@@ -636,6 +586,7 @@ def process_self_contained_coordinator_stream(
"oss-standalone",
)
)

logging.info(
"Will store benchmark json output to local file {}".format(
local_benchmark_output_filename
@@ -649,7 +600,7 @@ def process_self_contained_coordinator_stream(
) = prepare_benchmark_parameters(
benchmark_config,
full_benchmark_path,
redis_proc_start_port,
kwargs["redis_proc_start_port"],
"localhost",
local_benchmark_output_filename,
False,
@@ -663,7 +614,7 @@ def process_self_contained_coordinator_stream(
) = prepare_memtier_benchmark_parameters(
benchmark_config["clientconfig"],
full_benchmark_path,
redis_proc_start_port,
kwargs["redis_proc_start_port"],
"localhost",
local_benchmark_output_filename,
benchmark_tool_workdir,
@@ -672,15 +623,16 @@ def process_self_contained_coordinator_stream(
client_container_image = extract_client_container_image(
benchmark_config
)

profiler_call_graph_mode = "dwarf"
profiler_frequency = 99
# start the profile
(
profiler_name,
profilers_map,
) = profilers_start_if_required(
profilers_enabled,
profilers_list,
kwargs["profilers_enabled"],
kwargs["profilers_list"],
redis_pids,
setup_name,
start_time_str,
@@ -699,7 +651,9 @@ def process_self_contained_coordinator_stream(
# run the benchmark
benchmark_start_time = datetime.datetime.now()

client_container_stdout = docker_client.containers.run(
client_container_stdout = kwargs[
"docker_client"
].containers.run(
image=client_container_image,
volumes={
temporary_dir_client: {
@@ -730,31 +684,31 @@ def process_self_contained_coordinator_stream(
_,
overall_tabular_data_map,
) = profilers_stop_if_required(
datasink_push_results_redistimeseries,
kwargs["datasink_push_results_redistimeseries"],
benchmark_duration_seconds,
collection_summary_str,
dso,
tf_github_org,
tf_github_repo,
profiler_name,
profilers_artifacts_matrix,
profilers_enabled,
kwargs["profilers_enabled"],
profilers_map,
redis_pids,
S3_BUCKET_NAME,
test_name,
)
if (
profilers_enabled
and datasink_push_results_redistimeseries
kwargs["profilers_enabled"]
and kwargs["datasink_push_results_redistimeseries"]
):
datasink_profile_tabular_data(
git_branch,
tf_github_org,
tf_github_repo,
git_hash,
overall_tabular_data_map,
conn,
kwargs["conn"],
setup_name,
start_time_ms,
start_time_str,
@@ -776,21 +730,21 @@ def process_self_contained_coordinator_stream(
"s3_link": s3_link,
}
)
https_link = (
generate_artifacts_table_grafana_redis(
datasink_push_results_redistimeseries,
grafana_profile_dashboard,
profilers_artifacts,
datasink_conn,
setup_name,
start_time_ms,
start_time_str,
test_name,
tf_github_org,
tf_github_repo,
git_hash,
git_branch,
)
https_link = generate_artifacts_table_grafana_redis(
kwargs[
"datasink_push_results_redistimeseries"
],
kwargs["grafana_profile_dashboard"],
profilers_artifacts,
kwargs["datasync_conn"],
setup_name,
start_time_ms,
start_time_str,
test_name,
tf_github_org,
tf_github_repo,
git_hash,
git_branch,
)
profiler_dashboard_links.append(
[
@@ -863,38 +817,51 @@ def process_self_contained_coordinator_stream(
results_dict = json.load(json_file)
print_results_table_stdout(
benchmark_config,
default_metrics,
kwargs["default_metrics"],
results_dict,
setup_type,
test_name,
None,
)

logging.info(
"Done reading results json from {}".format(
full_result_path
)
)

dataset_load_duration_seconds = 0

exporter_datasink_common(
benchmark_config,
benchmark_duration_seconds,
build_variant_name,
datapoint_time_ms,
dataset_load_duration_seconds,
datasink_conn,
datasink_push_results_redistimeseries,
git_branch,
git_version,
metadata,
redis_conns,
results_dict,
running_platform,
setup_name,
setup_type,
test_name,
tf_github_org,
tf_github_repo,
tf_triggering_env,
topology_spec_name,
default_metrics,
)
if (
kwargs["datasink_push_results_redistimeseries"]
is True
):
exporter_datasink_common(
benchmark_config,
benchmark_duration_seconds,
build_variant_name,
datapoint_time_ms,
dataset_load_duration_seconds,
kwargs["datasync_conn"],
kwargs["datasink_push_results_redistimeseries"],
git_branch,
git_version,
metadata,
redis_conns,
results_dict,
kwargs["running_platform"],
setup_name,
setup_type,
test_name,
tf_github_org,
tf_github_repo,
tf_triggering_env,
topology_spec_name,
kwargs["default_metrics"],
)

logging.info("shutting down redis server")

r.shutdown(save=False)
test_result = True
total_test_suite_runs = total_test_suite_runs + 1
@@ -956,7 +923,7 @@ def process_self_contained_coordinator_stream(
else:
logging.info(
"skipping stream_id {} given arch {}!={}".format(
stream_id, run_arch, arch
stream_id, run_arch, kwargs["arch"]
)
)
else:
34 changes: 15 additions & 19 deletions utils/tests/test_self_contained_coordinator.py
Original file line number Diff line number Diff line change
@@ -91,12 +91,18 @@ def test_generate_cpuset_cpus():

def test_self_contained_coordinator_blocking_read():
try:
kwargs = {}
kwargs[
"datasink_push_results_redistimeseries"
] = args.datasink_push_results_redistimeseries

run_coordinator = True
TST_RUNNER_X = os.getenv("TST_RUNNER_X", "1")
if TST_RUNNER_X == "0":
run_coordinator = False
if run_coordinator:
conn = redis.StrictRedis(port=16379)
kwargs["conn"] = conn
conn.ping()
expected_datapoint_ts = None
conn.flushall()
@@ -110,40 +116,30 @@ def test_self_contained_coordinator_blocking_read():

assert conn.exists(STREAM_KEYNAME_NEW_BUILD_EVENTS)
assert conn.xlen(STREAM_KEYNAME_NEW_BUILD_EVENTS) > 0
running_platform = "fco-ThinkPad-T490"

build_runners_consumer_group_create(conn, running_platform, "0")
datasink_conn = redis.StrictRedis(port=16379)
kwargs["datasync_conn"] = datasync_conn
rts = datasink_conn.ts()
docker_client = docker.from_env()
home = str(Path.home())
stream_id = ">"
topologies_map = get_topologies(
kwargs["docker_client"] = docker.from_env()
kwargs["home"] = str(Path.home())
kwargs["stream_id"] = ">"
kwargs["topologies_map"] = get_topologies(
"./redis_benchmarks_specification/setups/topologies/topologies.yml"
)
# we use a benchmark spec with smaller CPU limit for client given github machines only contain 2 cores
# and we need 1 core for DB and another for CLIENT
testsuite_spec_files = [
kwargs["testsuite_spec_files"] = [
"./utils/tests/test_data/test-suites/redis-benchmark-full-suite-1Mkeys-100B.yml"
]
kwargs["running_platform"] = "fco-ThinkPad-T490"
kwargs["profilers_enabled"] = False
(
result,
stream_id,
number_processed_streams,
_,
) = self_contained_coordinator_blocking_read(
conn,
True,
docker_client,
home,
stream_id,
datasink_conn,
testsuite_spec_files,
topologies_map,
running_platform,
False,
[],
)
) = self_contained_coordinator_blocking_read(kwargs)
assert result == True
assert number_processed_streams == 1
# ensure we're able to aknowledge the consumed message
41 changes: 20 additions & 21 deletions utils/tests/test_self_contained_coordinator_memtier.py
Original file line number Diff line number Diff line change
@@ -23,20 +23,23 @@
)
from redis_benchmarks_specification.__self_contained_coordinator__.runners import (
build_runners_consumer_group_create,
get_runners_consumer_group_name,
)
from redis_benchmarks_specification.__setups__.topologies import get_topologies
from utils.tests.test_data.api_builder_common import flow_1_and_2_api_builder_checks


def test_self_contained_coordinator_blocking_read():
try:
kwargs = {}
run_coordinator = True
TST_RUNNER_X = os.getenv("TST_RUNNER_X", "1")
if TST_RUNNER_X == "0":
run_coordinator = False
if run_coordinator:
conn = redis.StrictRedis(port=16379)
conn.ping()
kwargs["conn"] = conn
expected_datapoint_ts = None
conn.flushall()
build_variant_name, reply_fields = flow_1_and_2_api_builder_checks(conn)
@@ -47,39 +50,35 @@ def test_self_contained_coordinator_blocking_read():

assert conn.exists(STREAM_KEYNAME_NEW_BUILD_EVENTS)
assert conn.xlen(STREAM_KEYNAME_NEW_BUILD_EVENTS) > 0
running_platform = "fco-ThinkPad-T490"
kwargs["running_platform"] = "fco-ThinkPad-T490"

build_runners_consumer_group_create(conn, running_platform, "0")
datasink_conn = redis.StrictRedis(port=16379)
docker_client = docker.from_env()
home = str(Path.home())
stream_id = ">"
topologies_map = get_topologies(
build_runners_consumer_group_create(conn, kwargs["running_platform"], "0")
kwargs["datasink_conn"] = redis.StrictRedis(port=16379)
kwargs["docker_client"] = docker.from_env()
kwargs["home"] = str(Path.home())
kwargs["stream_id"] = ">"
kwargs["topologies_map"] = get_topologies(
"./redis_benchmarks_specification/setups/topologies/topologies.yml"
)
# we use a benchmark spec with smaller CPU limit for client given github machines only contain 2 cores
# and we need 1 core for DB and another for CLIENT
testsuite_spec_files = [
kwargs["testsuite_spec_files"] = [
"./utils/tests/test_data/test-suites/memtier_benchmark-1Mkeys-100B-expire-use-case.yml"
]
kwargs["datasink_push_results_redistimeseries"] = True
kwargs["profilers_enabled"] = False
kwargs["consumer_pos"] = 1
kwargs["consumer_name"] = "{}-self-contained-proc#{}".format(
get_runners_consumer_group_name(kwargs["running_platform"]),
kwargs["consumer_pos"],
)

(
result,
stream_id,
number_processed_streams,
_,
) = self_contained_coordinator_blocking_read(
conn,
True,
docker_client,
home,
stream_id,
datasink_conn,
testsuite_spec_files,
topologies_map,
running_platform,
False,
[],
)
) = self_contained_coordinator_blocking_read(kwargs)
assert result == True
assert number_processed_streams == 1
tf_github_org = "redis"