From b9d87fcf2d9a22c9f5aa7c84491d3267550520db Mon Sep 17 00:00:00 2001 From: Steven Hahn Date: Mon, 9 Feb 2026 14:37:32 -0500 Subject: [PATCH 01/33] Replace cpu-benchmark with similar stress-ng monte-carlo test --- bin/wfbench | 65 +++++++++++++++++++++++++++++++++-------------------- 1 file changed, 41 insertions(+), 24 deletions(-) diff --git a/bin/wfbench b/bin/wfbench index 4bc03cba..e30273d3 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Copyright (c) 2021-2025 The WfCommons Team. @@ -20,6 +20,7 @@ import re import json import logging import pandas as pd +import psutil from io import StringIO from filelock import FileLock @@ -27,6 +28,9 @@ from pathos.helpers import mp as multiprocessing from typing import List, Optional +int32_max = 2147483647 + + # Configure logging logging.basicConfig( level=logging.INFO, # Change this to control the verbosity @@ -39,6 +43,16 @@ logging.basicConfig( this_dir = pathlib.Path(__file__).resolve().parent +def kill_process_and_children(proc): + try: + parent = psutil.Process(proc.pid) + children = parent.children(recursive=True) + for child in children: + child.kill() + parent.kill() + except psutil.NoSuchProcess: + pass # Process is already dead + def log_info(msg: str): """ Log an info message to stderr @@ -165,34 +179,38 @@ def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue, :rtype: List """ total_mem = f"{total_mem}B" if total_mem else f"{100.0 / os.cpu_count()}%" - cpu_work_per_thread = int(cpu_work / cpu_threads) - - cpu_procs = [] - mem_procs = [] - cpu_prog = [f"{this_dir.joinpath('cpu-benchmark')}", f"{cpu_work_per_thread}"] + cpu_work_per_thread = int(1000000 * cpu_work / (16384 * cpu_threads)) + cpu_samples = min(cpu_work_per_thread, int32_max) + cpu_ops = (cpu_work_per_thread + int32_max - 1) // int32_max + if cpu_ops > int32_max: + log_info("Exceeded maximum number of cpu work.") + cpu_ops = int32_max + + cpu_proc = None + mem_proc = None + + cpu_prog = ["stress-ng", "--monte-carlo", f"{cpu_threads}", + "--monte-carlo-method", "pi", + "--monte-carlo-rand", "lcg", + "--monte-carlo-samples", f"{cpu_samples}", + "--monte-carlo-ops", f"{cpu_ops}"] mem_prog = ["stress-ng", "--vm", f"{mem_threads}", "--vm-bytes", f"{total_mem}", "--vm-keep"] - for i in range(cpu_threads): - cpu_proc = subprocess.Popen(cpu_prog, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if cpu_threads > 0: + cpu_proc = subprocess.Popen(cpu_prog, preexec_fn=os.setsid) # NOTE: might be a good idea to use psutil to set the affinity (works across platforms) if core: os.sched_setaffinity(cpu_proc.pid, {core}) - cpu_procs.append(cpu_proc) - - # Start a thread to monitor the progress of each CPU benchmark process - monitor_thread = multiprocessing.Process(target=monitor_progress, args=(cpu_proc, cpu_queue)) - monitor_thread.start() if mem_threads > 0: # NOTE: add a check to use creationflags=subprocess.CREATE_NEW_PROCESS_GROUP for Windows mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid) if core: os.sched_setaffinity(mem_proc.pid, {core}) - mem_procs.append(mem_proc) - return cpu_procs, mem_procs + return cpu_proc, mem_proc def io_read_benchmark_user_input_data_size(inputs, @@ -446,14 +464,14 @@ def main(): log_debug(f"{args.name} acquired core {core}") mem_threads=int(10 - 10 * args.percent_cpu) - cpu_procs, mem_procs = cpu_mem_benchmark(cpu_queue=cpu_queue, + cpu_proc, mem_proc = cpu_mem_benchmark(cpu_queue=cpu_queue, cpu_threads=int(10 * args.percent_cpu), mem_threads=mem_threads, - cpu_work=sys.maxsize if args.time else int(args.cpu_work), + cpu_work=int32_max**2 if args.time else int(args.cpu_work), core=core, total_mem=mem_bytes) - procs.extend(cpu_procs) + procs.append(cpu_proc) if args.time: time.sleep(int(args.time)) for proc in procs: @@ -461,7 +479,7 @@ def main(): if proc.is_alive(): proc.terminate() elif isinstance(proc, subprocess.Popen): - proc.terminate() + kill_process_and_children(proc) else: for proc in procs: if isinstance(proc, subprocess.Popen): @@ -470,11 +488,10 @@ def main(): # io_proc.terminate() io_proc.join() - for mem_proc in mem_procs: - try: - os.kill(mem_proc.pid, signal.SIGKILL) # Force kill if SIGTERM fails - except subprocess.TimeoutExpired: - log_debug("Memory process did not terminate; force-killing.") + try: + os.kill(mem_proc.pid, signal.SIGKILL) # Force kill if SIGTERM fails + except subprocess.TimeoutExpired: + log_debug("Memory process did not terminate; force-killing.") # As a fallback, use pkill if any remaining instances are stuck subprocess.Popen(["pkill", "-f", "stress-ng"]).wait() From d3778c2fb3170e96d8d1da9b17960a8c5b81afe4 Mon Sep 17 00:00:00 2001 From: Steven Hahn Date: Tue, 10 Feb 2026 14:33:17 -0500 Subject: [PATCH 02/33] Add dependency psutil --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index b3ceed01..caa985f0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,11 +29,11 @@ dependencies = [ "networkx", "numpy", "pandas", + "psutil", "python-dateutil", "requests", "scipy>=1.16.1", "pyyaml", - "pandas", "shortuuid", "stringcase", "filelock", From 827c1893f05cffda3d062442ad2e3af9b8266d15 Mon Sep 17 00:00:00 2001 From: Steven Hahn Date: Tue, 10 Feb 2026 14:38:32 -0500 Subject: [PATCH 03/33] terminate io process --- bin/wfbench | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/bin/wfbench b/bin/wfbench index e30273d3..34aa5425 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -44,6 +44,8 @@ this_dir = pathlib.Path(__file__).resolve().parent def kill_process_and_children(proc): + if proc is None: + return try: parent = psutil.Process(proc.pid) children = parent.children(recursive=True) @@ -470,7 +472,6 @@ def main(): cpu_work=int32_max**2 if args.time else int(args.cpu_work), core=core, total_mem=mem_bytes) - procs.append(cpu_proc) if args.time: time.sleep(int(args.time)) @@ -485,11 +486,11 @@ def main(): if isinstance(proc, subprocess.Popen): proc.wait() if io_proc is not None and io_proc.is_alive(): - # io_proc.terminate() + io_proc.terminate() io_proc.join() try: - os.kill(mem_proc.pid, signal.SIGKILL) # Force kill if SIGTERM fails + kill_process_and_children(mem_proc) except subprocess.TimeoutExpired: log_debug("Memory process did not terminate; force-killing.") # As a fallback, use pkill if any remaining instances are stuck From e41861c335b0a5c309cf587e34f0613f709b5c42 Mon Sep 17 00:00:00 2001 From: Steven Hahn Date: Tue, 10 Feb 2026 14:54:56 -0500 Subject: [PATCH 04/33] check for division by zero --- bin/wfbench | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/wfbench b/bin/wfbench index 34aa5425..fdc7177e 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -181,11 +181,11 @@ def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue, :rtype: List """ total_mem = f"{total_mem}B" if total_mem else f"{100.0 / os.cpu_count()}%" - cpu_work_per_thread = int(1000000 * cpu_work / (16384 * cpu_threads)) + cpu_work_per_thread = int(1000000 * cpu_work / (16384 * cpu_threads)) if cpu_threads != 0 else int32_max**2 cpu_samples = min(cpu_work_per_thread, int32_max) cpu_ops = (cpu_work_per_thread + int32_max - 1) // int32_max if cpu_ops > int32_max: - log_info("Exceeded maximum number of cpu work.") + log_info("Exceeded maximum allowed value of cpu work.") cpu_ops = int32_max cpu_proc = None From 901e87bcebe598eae9e3c60eedc4d5a702812dcb Mon Sep 17 00:00:00 2001 From: Steven Hahn Date: Tue, 10 Feb 2026 15:35:42 -0500 Subject: [PATCH 05/33] mute stress-ng --- bin/wfbench | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bin/wfbench b/bin/wfbench index fdc7177e..74194106 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -195,7 +195,8 @@ def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue, "--monte-carlo-method", "pi", "--monte-carlo-rand", "lcg", "--monte-carlo-samples", f"{cpu_samples}", - "--monte-carlo-ops", f"{cpu_ops}"] + "--monte-carlo-ops", f"{cpu_ops}", + "--quiet"] mem_prog = ["stress-ng", "--vm", f"{mem_threads}", "--vm-bytes", f"{total_mem}", "--vm-keep"] From 8c560de6eec7fa5cf61f496819adca54e3fac0fe Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Tue, 10 Feb 2026 16:05:36 -1000 Subject: [PATCH 06/33] Added a --quiet flag to another stress-ng invocation --- bin/wfbench | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/wfbench b/bin/wfbench index 74194106..7b3853d6 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -198,7 +198,7 @@ def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue, "--monte-carlo-ops", f"{cpu_ops}", "--quiet"] mem_prog = ["stress-ng", "--vm", f"{mem_threads}", - "--vm-bytes", f"{total_mem}", "--vm-keep"] + "--vm-bytes", f"{total_mem}", "--vm-keep", "--quiet"] if cpu_threads > 0: cpu_proc = subprocess.Popen(cpu_prog, preexec_fn=os.setsid) From ffac645e0466e61e341fa4bf1f1dd4cb2a14be08 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Wed, 11 Feb 2026 11:50:10 -1000 Subject: [PATCH 07/33] Fixed the zombie problem --- bin/wfbench | 6 ++++-- tests/test_helpers.py | 5 +++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/bin/wfbench b/bin/wfbench index 7b3853d6..c253d543 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -52,9 +52,11 @@ def kill_process_and_children(proc): for child in children: child.kill() parent.kill() + except psutil.NoSuchProcess: pass # Process is already dead + def log_info(msg: str): """ Log an info message to stderr @@ -213,7 +215,7 @@ def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue, if core: os.sched_setaffinity(mem_proc.pid, {core}) - return cpu_proc, mem_proc + return [cpu_proc, mem_proc] def io_read_benchmark_user_input_data_size(inputs, @@ -467,7 +469,7 @@ def main(): log_debug(f"{args.name} acquired core {core}") mem_threads=int(10 - 10 * args.percent_cpu) - cpu_proc, mem_proc = cpu_mem_benchmark(cpu_queue=cpu_queue, + [cpu_proc, mem_proc] = cpu_mem_benchmark(cpu_queue=cpu_queue, cpu_threads=int(10 * args.percent_cpu), mem_threads=mem_threads, cpu_work=int32_max**2 if args.time else int(args.cpu_work), diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 3f45290c..966b7379 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -90,7 +90,8 @@ def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command= working_dir=working_dir, user="wfcommons", tty=True, - detach=True + detach=True, + init=True # For zombies ) # Installing WfCommons on container @@ -165,4 +166,4 @@ def _compare_workflows(workflow_1: Workflow, workflow_2: Workflow): # sys.stderr.write(f"WORKFLOW2 OUTPUT FILE: {output_file.file_id} {output_file.size}\n") workflow2_output_bytes += output_file.size assert (workflow1_input_bytes == workflow2_input_bytes) - assert (workflow1_output_bytes == workflow2_output_bytes) \ No newline at end of file + assert (workflow1_output_bytes == workflow2_output_bytes) From e802679fd257074883a78b7727939405022cf9c3 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Thu, 12 Feb 2026 08:33:23 -1000 Subject: [PATCH 08/33] Since memsize argument document says MB (and not MiB), I changed the "* 1024 * 1024" to "* 1000 * 1000". The alternative was to document "MiB", but as far as I can tell none of the above layers use 1024. This said, _every time_ we've used any unit other than just byte, we've had errors. Perhaps we should change it all to "memory size in bytes" and that's it. --- bin/wfbench | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/wfbench b/bin/wfbench index c253d543..77d86d93 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -252,7 +252,7 @@ def io_write_benchmark_user_input_data_size(outputs, def io_alternate(inputs, outputs, cpu_queue: multiprocessing.Queue, memory_limit=None, rundir=None, event=None): """Alternate between reading and writing to a file, ensuring read only happens after write.""" - if memory_limit is None: + if memory_limit is None:f memory_limit = 10 * 1024 * 1024 # sys.maxsize memory_limit = int(memory_limit) @@ -397,7 +397,7 @@ def main(): log_info(f"Starting {args.name} Benchmark") - mem_bytes = args.mem * 1024 * 1024 if args.mem else None + mem_bytes = args.mem * 1000 * 1000 if args.mem else None procs = [] io_proc = None From 45124d91b888dc8492069df6b6d51e13663d4c73 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Thu, 12 Feb 2026 08:43:10 -1000 Subject: [PATCH 09/33] typo-- !! --- bin/wfbench | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/wfbench b/bin/wfbench index 77d86d93..935bd22c 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -252,7 +252,7 @@ def io_write_benchmark_user_input_data_size(outputs, def io_alternate(inputs, outputs, cpu_queue: multiprocessing.Queue, memory_limit=None, rundir=None, event=None): """Alternate between reading and writing to a file, ensuring read only happens after write.""" - if memory_limit is None:f + if memory_limit is None: memory_limit = 10 * 1024 * 1024 # sys.maxsize memory_limit = int(memory_limit) From c92654ef1e744843966ba91d9b6537386edf2f2d Mon Sep 17 00:00:00 2001 From: Steven Hahn Date: Thu, 12 Feb 2026 17:12:21 -0500 Subject: [PATCH 10/33] try to workaround missing cpu_queue Signed-off-by: Steven Hahn --- bin/wfbench | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/bin/wfbench b/bin/wfbench index 935bd22c..101f3c01 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -263,7 +263,7 @@ def io_alternate(inputs, outputs, cpu_queue: multiprocessing.Queue, memory_limit for name in outputs: open(rundir.joinpath(name), "wb").close() - io_completed = 0 + io_completed = 1 bytes_read = { name: 0 for name in inputs @@ -280,15 +280,15 @@ def io_alternate(inputs, outputs, cpu_queue: multiprocessing.Queue, memory_limit } while io_completed < 100: - cpu_percent = max(io_completed, cpu_queue.get()) - while True: # Get the last message - try: - cpu_percent = max(io_completed, cpu_queue.get_nowait()) - except queue.Empty: - break + #cpu_percent = max(io_completed, cpu_queue.get()) + #while True: # Get the last message + # try: + # cpu_percent = max(io_completed, cpu_queue.get_nowait()) + # except queue.Empty: + # break log_debug(f"CPU Percent: {cpu_percent}") - if cpu_percent: + if true: #cpu_percent: bytes_to_read = { name: int(size * (cpu_percent / 100) - bytes_read[name]) for name, size in inputs.items() @@ -312,7 +312,7 @@ def io_alternate(inputs, outputs, cpu_queue: multiprocessing.Queue, memory_limit log_debug(f"Bytes Read: {bytes_read}") log_debug(f"Bytes Written: {bytes_written}") - io_completed = cpu_percent + io_completed = io_completed + 1 # cpu_percent if io_completed >= 100: break @@ -489,7 +489,7 @@ def main(): if isinstance(proc, subprocess.Popen): proc.wait() if io_proc is not None and io_proc.is_alive(): - io_proc.terminate() + #io_proc.terminate() io_proc.join() try: From 510ca588f9eee18c1046d9299829ff5ee8e68129 Mon Sep 17 00:00:00 2001 From: Steven Hahn Date: Thu, 12 Feb 2026 21:06:53 -0500 Subject: [PATCH 11/33] typos Signed-off-by: Steven Hahn --- bin/wfbench | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/bin/wfbench b/bin/wfbench index 101f3c01..7cbc7f1c 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -287,14 +287,14 @@ def io_alternate(inputs, outputs, cpu_queue: multiprocessing.Queue, memory_limit # except queue.Empty: # break - log_debug(f"CPU Percent: {cpu_percent}") - if true: #cpu_percent: + log_debug(f"IO Percent: {io_completed}") + if True: #cpu_percent: bytes_to_read = { - name: int(size * (cpu_percent / 100) - bytes_read[name]) + name: int(size * (io_completed / 100) - bytes_read[name]) for name, size in inputs.items() } bytes_to_write = { - name: int(size * (cpu_percent / 100) - bytes_written[name]) + name: int(size * (io_completed / 100) - bytes_written[name]) for name, size in outputs.items() } io_read_benchmark_user_input_data_size(bytes_to_read, rundir, memory_limit=memory_limit) @@ -312,7 +312,7 @@ def io_alternate(inputs, outputs, cpu_queue: multiprocessing.Queue, memory_limit log_debug(f"Bytes Read: {bytes_read}") log_debug(f"Bytes Written: {bytes_written}") - io_completed = io_completed + 1 # cpu_percent + io_completed = io_completed + 1 if io_completed >= 100: break From 8bb70bc56bd4bddb0b43a74a77c94646a6084960 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Thu, 19 Feb 2026 14:10:00 -1000 Subject: [PATCH 12/33] Rewrite/Re-engineering of wfbench so that the execution proceeds in pipelined chunks. --- bin/wfbench | 999 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 676 insertions(+), 323 deletions(-) diff --git a/bin/wfbench b/bin/wfbench index 7cbc7f1c..0f53cb9d 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -13,22 +13,21 @@ import pathlib import subprocess import time import sys -import signal -import queue import argparse import re import json import logging -import pandas as pd import psutil from io import StringIO from filelock import FileLock from pathos.helpers import mp as multiprocessing -from typing import List, Optional +from abc import ABC, abstractmethod +from typing import List, Optional, IO int32_max = 2147483647 +this_dir = pathlib.Path(__file__).resolve().parent # Configure logging @@ -39,24 +38,6 @@ logging.basicConfig( handlers=[logging.StreamHandler()] ) - -this_dir = pathlib.Path(__file__).resolve().parent - - -def kill_process_and_children(proc): - if proc is None: - return - try: - parent = psutil.Process(proc.pid) - children = parent.children(recursive=True) - for child in children: - child.kill() - parent.kill() - - except psutil.NoSuchProcess: - pass # Process is already dead - - def log_info(msg: str): """ Log an info message to stderr @@ -84,6 +65,226 @@ def log_error(msg: str): """ logging.error(msg) +# Utility process class +####################### + +class ProcessHandle: + def __init__(self, proc: multiprocessing.Process | subprocess.Popen): + self._proc = proc + + @property + def pid(self): + return self._proc.pid + + def terminate(self): + self._proc.terminate() + + def terminate_along_with_children(self): + if isinstance(self._proc, multiprocessing.Process): + log_error("Calling terminate_along_with_children() on a multiprocessing.Process is a likely bug") + return + + try: + parent = psutil.Process(self._proc.pid) + children = parent.children(recursive=True) + for child in children: + try: + child.kill() + except psutil.NoSuchProcess: + pass # Process is already dead' + try: + parent.kill() + except psutil.NoSuchProcess: + pass # Nevermind + except subprocess.TimeoutExpired: + log_debug("Process did not terminate; force-killing.") + subprocess.Popen(["pkill", "-f", "stress-ng"]).wait() + + def wait(self): + if isinstance(self._proc, multiprocessing.Process): + self._proc.join() + else: + self._proc.wait() + + def is_alive(self): + if isinstance(self._proc, multiprocessing.Process): + return self._proc.is_alive() + else: + return self._proc.poll() is None + +# Benchmark classes +################### + +class Benchmark(ABC): + @abstractmethod + def run(self) -> multiprocessing.Process: + pass + +class IOReadBenchmark: + def __init__(self): + self.to_read : dict[str, (IO, int)] = {} + + def add_read_operation(self, filepath: str, opened_file: IO, num_bytes: int): + self.to_read[filepath] = (opened_file, num_bytes) + + def run(self) -> ProcessHandle | None: + if len(self.to_read) <= 0: + return None + p = multiprocessing.Process(target=self.benchmark_function, args=()) + p.start() + return ProcessHandle(p) + + def benchmark_function(self): + for filepath, (opened_file, bytes_to_read) in self.to_read.items(): + log_debug(f"Reading {bytes_to_read} bytes from {filepath}...") + opened_file.read(bytes_to_read) + + +class IOWriteBenchmark: + def __init__(self): + self.to_write : dict[str, (IO, int)] = {} + + def add_write_operation(self, filepath: str, opened_file: IO, num_bytes: int): + self.to_write[filepath] = (opened_file, num_bytes) + + def run(self) -> ProcessHandle | None: + if len(self.to_write) <= 0: + return None + p = multiprocessing.Process(target=self.benchmark_function, args=()) + p.start() + return ProcessHandle(p) + + def benchmark_function(self): + for filepath, (opened_file, bytes_to_write) in self.to_write.items(): + log_debug(f"Writing {bytes_to_write} bytes to {filepath}...") + opened_file.write(os.urandom(int(bytes_to_write))) + opened_file.flush() + + +class CPUBenchmark: + def __init__(self, cpu_threads: Optional[int] = 5, + mem_threads: Optional[int] = 5, + core: Optional[int] = None, + total_mem: Optional[int] = None): + self.cpu_threads = cpu_threads + self.mem_threads = mem_threads + self.core = core + self.total_mem = total_mem + self.work = None + + def set_work(self, work: int): + self.work = work + + def set_infinite_work(self): + self.work = int32_max # "infinite" + + def run(self) -> list[ProcessHandle | None]: + if self.work is None or self.work <= 0: + return [None, None] + + total_mem = f"{self.total_mem}B" if self.total_mem else f"{100.0 / os.cpu_count()}%" + cpu_work_per_thread = int(1000000 * self.work / (16384 * self.cpu_threads)) if self.cpu_threads != 0 else int32_max ** 2 + cpu_samples = min(cpu_work_per_thread, int32_max) + cpu_ops = (cpu_work_per_thread + int32_max - 1) // int32_max + if cpu_ops > int32_max: + log_info("Exceeded maximum allowed value of cpu work.") + cpu_ops = int32_max + + + # Start CPU benchmark, if need be + cpu_proc_handle = None + if self.cpu_threads > 0: + log_debug(f"Running CPU benchmark with {self.cpu_threads} threads for {self.work if self.work < int32_max else 'infinite'} units of work...") + cpu_prog = ["stress-ng", "--monte-carlo", f"{self.cpu_threads}", + "--monte-carlo-method", "pi", + "--monte-carlo-rand", "lcg", + "--monte-carlo-samples", f"{cpu_samples}", + "--monte-carlo-ops", f"{cpu_ops}", + "--quiet"] + cpu_proc = subprocess.Popen(cpu_prog, preexec_fn=os.setsid) + cpu_proc_handle = ProcessHandle(cpu_proc) + + # NOTE: might be a good idea to use psutil to set the affinity (works across platforms) + if self.core: + os.sched_setaffinity(cpu_proc.pid, {self.core}) + + # Start Memory benchmark, if need be + mem_proc_handle = None + if self.mem_threads > 0: + # NOTE: add a check to use creationflags=subprocess.CREATE_NEW_PROCESS_GROUP for Windows + log_debug(f"Running memory benchmark with {self.mem_threads} threads...") + mem_prog = ["stress-ng", "--vm", f"{self.mem_threads}", + "--vm-bytes", f"{total_mem}", "--vm-keep", "--quiet"] + mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid) + mem_proc_handle = ProcessHandle(mem_proc) + if self.core: + os.sched_setaffinity(mem_proc.pid, {self.core}) + + return [cpu_proc_handle, mem_proc_handle] + + +class GPUBenchmark: + + @staticmethod + def get_available_gpus(): + proc = subprocess.Popen(["nvidia-smi", "--query-gpu=utilization.gpu", "--format=csv"], stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, _ = proc.communicate() + df = pd.read_csv(StringIO(stdout.decode("utf-8")), sep=" ") + return df[df["utilization.gpu"] <= 5].index.to_list() + + def __init__(self): + self.work = None + self.duration = None + self.device = None + + def set_device(self): + available_gpus = self.get_available_gpus() # checking for available GPUs + if not available_gpus: + log_error("No GPU available") + sys.exit(1) + self.device = available_gpus[0] + log_debug(f"GPU benchmark instantiated for device {self.device}") + + def set_work(self, work: int): + self.work = work + + def set_time(self, duration: float): + self.duration = duration + + def run(self) -> ProcessHandle | None: + if self.work is None and self.duration is None: + return None + + if self.duration is not None: + log_debug(f"Running GPU benchmark for {self.duration} seconds") + gpu_prog = [ + f"CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES={self.device} {this_dir.joinpath('./gpu_benchmark')} {self.work} {self.duration}"] + else: + log_debug(f"Running GPU benchmark for {self.work} units of work") + gpu_prog = [ + f"CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES={self.device} {this_dir.joinpath('./gpu_benchmark')} {self.work}"] + + p = subprocess.Popen(gpu_prog, shell=True) + return ProcessHandle(p) + +# def kill_process_and_children(proc): +# if proc is None: +# return +# try: +# parent = psutil.Process(proc.pid) +# children = parent.children(recursive=True) +# for child in children: +# child.kill() +# parent.kill() +# +# except psutil.NoSuchProcess: +# pass # Process is already dead + + + +# Utility code functions +######################## def lock_core(path_locked: pathlib.Path, path_cores: pathlib.Path) -> int: @@ -146,191 +347,185 @@ def unlock_core(path_locked: pathlib.Path, finally: lock.release() -def monitor_progress(proc, cpu_queue): - """Monitor progress from the CPU benchmark process.""" - for line in iter(proc.stdout.readline, ""): # No decode needed - line = line.strip() - if line.startswith("Progress:"): - try: - progress = float(line.split()[1].strip('%')) - cpu_queue.put(progress) - except (ValueError, IndexError): - pass - -def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue, - cpu_threads: Optional[int] = 5, - mem_threads: Optional[int] = 5, - cpu_work: Optional[int] = 100, - core: Optional[int] = None, - total_mem: Optional[int] = None) -> List: - """ - Run CPU and memory benchmark. - - :param cpu_queue: Queue to push CPU benchmark progress as a float. - :type cpu_queue: multiprocessing.Queue - :param cpu_threads: Number of threads for CPU benchmark. - :type cpu_threads: Optional[int] - :param mem_threads: Number of threads for memory benchmark. - :type mem_threads: Optional[int] - :param cpu_work: Total work units for CPU benchmark. - :type cpu_work: Optional[int] - :param core: Core to pin the benchmark processes to. - :type core: Optional[int] - :param total_mem: Total memory to use for memory benchmark. - :type total_mem: Optional[float] - - :return: Lists of CPU and memory subprocesses. - :rtype: List - """ - total_mem = f"{total_mem}B" if total_mem else f"{100.0 / os.cpu_count()}%" - cpu_work_per_thread = int(1000000 * cpu_work / (16384 * cpu_threads)) if cpu_threads != 0 else int32_max**2 - cpu_samples = min(cpu_work_per_thread, int32_max) - cpu_ops = (cpu_work_per_thread + int32_max - 1) // int32_max - if cpu_ops > int32_max: - log_info("Exceeded maximum allowed value of cpu work.") - cpu_ops = int32_max - - cpu_proc = None - mem_proc = None - - cpu_prog = ["stress-ng", "--monte-carlo", f"{cpu_threads}", - "--monte-carlo-method", "pi", - "--monte-carlo-rand", "lcg", - "--monte-carlo-samples", f"{cpu_samples}", - "--monte-carlo-ops", f"{cpu_ops}", - "--quiet"] - mem_prog = ["stress-ng", "--vm", f"{mem_threads}", - "--vm-bytes", f"{total_mem}", "--vm-keep", "--quiet"] - - if cpu_threads > 0: - cpu_proc = subprocess.Popen(cpu_prog, preexec_fn=os.setsid) - - # NOTE: might be a good idea to use psutil to set the affinity (works across platforms) - if core: - os.sched_setaffinity(cpu_proc.pid, {core}) - - if mem_threads > 0: - # NOTE: add a check to use creationflags=subprocess.CREATE_NEW_PROCESS_GROUP for Windows - mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid) - if core: - os.sched_setaffinity(mem_proc.pid, {core}) - - return [cpu_proc, mem_proc] - - -def io_read_benchmark_user_input_data_size(inputs, - rundir=None, - memory_limit=None): - if memory_limit is None: - memory_limit = -1 - memory_limit = int(memory_limit) - log_debug("Starting IO Read Benchmark...") - for file, size in inputs.items(): - with open(rundir.joinpath(file), "rb") as fp: - log_debug(f"Reading '{file}'") - chunk_size = min(size, memory_limit) - while fp.read(chunk_size): - pass - log_debug("Completed IO Read Benchmark!") - - -def io_write_benchmark_user_input_data_size(outputs, - rundir=None, - memory_limit=None): - if memory_limit is None: - memory_limit = sys.maxsize - memory_limit = int(memory_limit) - for file_name, file_size in outputs.items(): - log_debug(f"Writing output file '{file_name}'") - file_size_todo = file_size - while file_size_todo > 0: - with open(rundir.joinpath(file_name), "ab") as fp: - chunk_size = min(file_size_todo, memory_limit) - file_size_todo -= fp.write(os.urandom(int(chunk_size))) - - -def io_alternate(inputs, outputs, cpu_queue: multiprocessing.Queue, memory_limit=None, rundir=None, event=None): - """Alternate between reading and writing to a file, ensuring read only happens after write.""" - - if memory_limit is None: - memory_limit = 10 * 1024 * 1024 # sys.maxsize - memory_limit = int(memory_limit) - - # queue will have messages in the form (cpu_percent_completed) - # Get the last message and trash the rest - - # Create empty files - for name in outputs: - open(rundir.joinpath(name), "wb").close() - - io_completed = 1 - bytes_read = { - name: 0 - for name in inputs - } - bytes_written = { - name: 0 - for name in outputs - } - - # get size of inputs - inputs = { - name: os.path.getsize(rundir.joinpath(name)) - for name in inputs - } - - while io_completed < 100: - #cpu_percent = max(io_completed, cpu_queue.get()) - #while True: # Get the last message - # try: - # cpu_percent = max(io_completed, cpu_queue.get_nowait()) - # except queue.Empty: - # break - - log_debug(f"IO Percent: {io_completed}") - if True: #cpu_percent: - bytes_to_read = { - name: int(size * (io_completed / 100) - bytes_read[name]) - for name, size in inputs.items() - } - bytes_to_write = { - name: int(size * (io_completed / 100) - bytes_written[name]) - for name, size in outputs.items() - } - io_read_benchmark_user_input_data_size(bytes_to_read, rundir, memory_limit=memory_limit) - io_write_benchmark_user_input_data_size(bytes_to_write, rundir, memory_limit=memory_limit) - - bytes_read = { - name: bytes_read[name] + bytes_to_read[name] - for name in bytes_to_read - } - bytes_written = { - name: bytes_written[name] + bytes_to_write[name] - for name in bytes_to_write - } - - log_debug(f"Bytes Read: {bytes_read}") - log_debug(f"Bytes Written: {bytes_written}") - - io_completed = io_completed + 1 - - if io_completed >= 100: - break - -def get_available_gpus(): - proc = subprocess.Popen(["nvidia-smi", "--query-gpu=utilization.gpu", "--format=csv"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, _ = proc.communicate() - df = pd.read_csv(StringIO(stdout.decode("utf-8")), sep=" ") - return df[df["utilization.gpu"] <= 5].index.to_list() +# def monitor_progress(proc, cpu_queue): +# """Monitor progress from the CPU benchmark process.""" +# for line in iter(proc.stdout.readline, ""): # No decode needed +# line = line.strip() +# if line.startswith("Progress:"): +# try: +# progress = float(line.split()[1].strip('%')) +# cpu_queue.put(progress) +# except (ValueError, IndexError): +# pass +# +# def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue, +# cpu_threads: Optional[int] = 5, +# mem_threads: Optional[int] = 5, +# cpu_work: Optional[int] = 100, +# core: Optional[int] = None, +# total_mem: Optional[int] = None) -> List: +# """ +# Run CPU and memory benchmark. +# +# :param cpu_queue: Queue to push CPU benchmark progress as a float. +# :type cpu_queue: multiprocessing.Queue +# :param cpu_threads: Number of threads for CPU benchmark. +# :type cpu_threads: Optional[int] +# :param mem_threads: Number of threads for memory benchmark. +# :type mem_threads: Optional[int] +# :param cpu_work: Total work units for CPU benchmark. +# :type cpu_work: Optional[int] +# :param core: Core to pin the benchmark processes to. +# :type core: Optional[int] +# :param total_mem: Total memory to use for memory benchmark. +# :type total_mem: Optional[float] +# +# :return: Lists of CPU and memory subprocesses. +# :rtype: List +# """ +# total_mem = f"{total_mem}B" if total_mem else f"{100.0 / os.cpu_count()}%" +# cpu_work_per_thread = int(1000000 * cpu_work / (16384 * cpu_threads)) if cpu_threads != 0 else int32_max**2 +# cpu_samples = min(cpu_work_per_thread, int32_max) +# cpu_ops = (cpu_work_per_thread + int32_max - 1) // int32_max +# if cpu_ops > int32_max: +# log_info("Exceeded maximum allowed value of cpu work.") +# cpu_ops = int32_max +# +# cpu_proc = None +# mem_proc = None +# +# cpu_prog = ["stress-ng", "--monte-carlo", f"{cpu_threads}", +# "--monte-carlo-method", "pi", +# "--monte-carlo-rand", "lcg", +# "--monte-carlo-samples", f"{cpu_samples}", +# "--monte-carlo-ops", f"{cpu_ops}", +# "--quiet"] +# mem_prog = ["stress-ng", "--vm", f"{mem_threads}", +# "--vm-bytes", f"{total_mem}", "--vm-keep", "--quiet"] +# +# if cpu_threads > 0: +# cpu_proc = subprocess.Popen(cpu_prog, preexec_fn=os.setsid) +# +# # NOTE: might be a good idea to use psutil to set the affinity (works across platforms) +# if core: +# os.sched_setaffinity(cpu_proc.pid, {core}) +# +# if mem_threads > 0: +# # NOTE: add a check to use creationflags=subprocess.CREATE_NEW_PROCESS_GROUP for Windows +# mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid) +# if core: +# os.sched_setaffinity(mem_proc.pid, {core}) +# +# return [cpu_proc, mem_proc] +# +# +# def io_read_benchmark_user_input_data_size(inputs, +# rundir=None, +# memory_limit=None): +# if memory_limit is None: +# memory_limit = -1 +# memory_limit = int(memory_limit) +# log_debug("Starting IO Read Benchmark...") +# for file, size in inputs.items(): +# with open(rundir.joinpath(file), "rb") as fp: +# log_debug(f"Reading '{file}'") +# chunk_size = min(size, memory_limit) +# while fp.read(chunk_size): +# pass +# log_debug("Completed IO Read Benchmark!") +# +# +# def io_write_benchmark_user_input_data_size(outputs, +# rundir=None, +# memory_limit=None): +# if memory_limit is None: +# memory_limit = sys.maxsize +# memory_limit = int(memory_limit) +# for file_name, file_size in outputs.items(): +# log_debug(f"Writing output file '{file_name}'") +# file_size_todo = file_size +# while file_size_todo > 0: +# with open(rundir.joinpath(file_name), "ab") as fp: +# chunk_size = min(file_size_todo, memory_limit) +# file_size_todo -= fp.write(os.urandom(int(chunk_size))) +# +# +# def io_alternate(inputs, outputs, cpu_queue: multiprocessing.Queue, memory_limit=None, rundir=None, event=None): +# """Alternate between reading and writing to a file, ensuring read only happens after write.""" +# +# if memory_limit is None: +# memory_limit = 10 * 1024 * 1024 # sys.maxsize +# memory_limit = int(memory_limit) +# +# # queue will have messages in the form (cpu_percent_completed) +# # Get the last message and trash the rest +# +# # Create empty files +# for name in outputs: +# open(rundir.joinpath(name), "wb").close() +# +# io_completed = 1 +# bytes_read = { +# name: 0 +# for name in inputs +# } +# bytes_written = { +# name: 0 +# for name in outputs +# } +# +# # get size of inputs +# inputs = { +# name: os.path.getsize(rundir.joinpath(name)) +# for name in inputs +# } +# +# while io_completed < 100: +# #cpu_percent = max(io_completed, cpu_queue.get()) +# #while True: # Get the last message +# # try: +# # cpu_percent = max(io_completed, cpu_queue.get_nowait()) +# # except queue.Empty: +# # break +# +# log_debug(f"IO Percent: {io_completed}") +# if True: #cpu_percent: +# bytes_to_read = { +# name: int(size * (io_completed / 100) - bytes_read[name]) +# for name, size in inputs.items() +# } +# bytes_to_write = { +# name: int(size * (io_completed / 100) - bytes_written[name]) +# for name, size in outputs.items() +# } +# io_read_benchmark_user_input_data_size(bytes_to_read, rundir, memory_limit=memory_limit) +# io_write_benchmark_user_input_data_size(bytes_to_write, rundir, memory_limit=memory_limit) +# +# bytes_read = { +# name: bytes_read[name] + bytes_to_read[name] +# for name in bytes_to_read +# } +# bytes_written = { +# name: bytes_written[name] + bytes_to_write[name] +# for name in bytes_to_write +# } +# +# log_debug(f"Bytes Read: {bytes_read}") +# log_debug(f"Bytes Written: {bytes_written}") +# +# io_completed = io_completed + 1 +# +# if io_completed >= 100: +# break -def gpu_benchmark(time: int = 100, - work: int = 100, - device: int = 0): #work, device - - gpu_prog = [f"CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES={device} {this_dir.joinpath('./gpu_benchmark')} {work} {time}"] - log_debug(f"Running GPU Benchmark: {gpu_prog}") - subprocess.Popen(gpu_prog, shell=True) +# def gpu_benchmark(time: int = 100, +# work: int = 100, +# device: int = 0): #work, device +# +# gpu_prog = [f"CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES={device} {this_dir.joinpath('./gpu_benchmark')} {work} {time}"] +# log_debug(f"Running GPU Benchmark: {gpu_prog}") +# subprocess.Popen(gpu_prog, shell=True) def get_parser() -> argparse.ArgumentParser: @@ -342,22 +537,26 @@ def get_parser() -> argparse.ArgumentParser: parser.add_argument("--path-lock", default=None, help="Path to lock file.") parser.add_argument("--path-cores", default=None, help="Path to cores file.") parser.add_argument("--cpu-work", default=None, help="Amount of CPU work.") + parser.add_argument("--num-chunks", default=10, help="Number of chunks used for pipelining I/O and " + "computation throughout the execution (fewer chunks may be used " + "if amounts of work and or input/output file sizes are too small).") parser.add_argument("--gpu-work", default=None, help="Amount of GPU work.") - parser.add_argument("--time", default=None, help="Time limit (in seconds) to complete the task (overrides CPU and GPU works).") - parser.add_argument("--mem", type=float, default=None, help="Max amount (in MB) of memory consumption.") - parser.add_argument("--output-files", help="output file names with sizes in bytes as a JSON dictionary " + parser.add_argument("--time", default=None, help="Time limit (in seconds) to complete " + "the computational portion of the benchmark (overrides CPU and GPU works).") + parser.add_argument("--mem", type=float, default=None, help="Maximum memory consumption (in MB).") + parser.add_argument("--output-files", help="Output file names with sizes in bytes as a JSON dictionary " "(e.g., --output-files {\\\"file1\\\": 1024, \\\"file2\\\": 2048}).") - parser.add_argument("--input-files", help="input files names as a JSON array " + parser.add_argument("--input-files", help="Input files names as a JSON array " "(e.g., --input-files [\\\"file3\\\", \\\"file4\\\"]).") parser.add_argument("--debug", action="store_true", help="Enable debug messages.") parser.add_argument("--with-flowcept", action="store_true", default=False, help="Enable Flowcept monitoring.") parser.add_argument("--workflow_id", default=None, help="Id to group tasks in a workflow.") return parser - + def begin_flowcept(args): - log_info("Running with Flowcept.") + log_debug("Running with Flowcept.") from flowcept import Flowcept, FlowceptTask # TODO: parametrize to allow storing individual tasks f = Flowcept(workflow_id=args.workflow_id, @@ -373,6 +572,29 @@ def end_flowcept(flowcept, flowcept_task): flowcept.stop() +def compute_num_chunks(args): + # Compute the (feasible number of chunks) + min_chunk_size_time = 1.0 # At least 1 second per chunk, if we're doing time-based + # TODO: Pick reasonable factors below so that a chunk takes about min_chunk_size_time sec on a reasonable machine + min_chunk_size_cpu_work = 3000000 * min_chunk_size_time # 1s on my MacBook Pro + min_chunk_size_gpu_work = 30000000 * min_chunk_size_time # unknown..... + + if args.time: + num_chunks = min(int(args.num_chunks), int(float(args.time) / min_chunk_size_time)) + else: + if args.cpu_work: + num_chunks_cpu = min(int(args.num_chunks), int(float(args.cpu_work) / min_chunk_size_cpu_work)) + else: + num_chunks_cpu = 1 + if args.gpu_work: + num_chunks_gpu = min(int(args.num_chunks), int(float(args.gpu_work) / min_chunk_size_gpu_work)) + else: + num_chunks_gpu = 1 + num_chunks = min(num_chunks_cpu, num_chunks_cpu) + + num_chunks = max(num_chunks, 1) # The above computations may say "zero" + return num_chunks + def main(): """Main program.""" parser = get_parser() @@ -395,130 +617,261 @@ def main(): path_cores = pathlib.Path(args.path_cores) core = lock_core(path_locked, path_cores) - log_info(f"Starting {args.name} Benchmark") - - mem_bytes = args.mem * 1000 * 1000 if args.mem else None - - procs = [] - io_proc = None - outputs_dict = {} - - cpu_queue = multiprocessing.Queue() - - log_debug(f"Working directory: {os.getcwd()}") - - # Deal with input/output files if any + if args.time and (not args.cpu_work and not args.gpu_work): + log_error("If --time is provided, then --cpu-work and/or --gpu-work must also be provided.") + sys.exit(1) + + # Compute the (feasible) number of chunks + num_chunks = compute_num_chunks(args) + log_debug(f"Executing benchmark with {num_chunks} chunks.") + + # At this point we know the number of chunks, and we can just iterate as follows (N = num_chunks + 2) + # step 0 sep 1 step 2 step N-3 step N-2 step N-1 + # READ READ READ ... READ - - + # - COMPUTE_CPU COMPUTE_CPU ... COMPUTE_CPU COMPUTE_CPU - + # - COMPUTE_GPU COMPUTE_GPU ... COMPUTE_GPU COMPUTE_GPU - + # - - WRITE ... WRITE WRITE WRITE + # (Intermediate READ and WRITE steps may do nothing for some files if there is too little data) + + # Construct a list of benchmark steps, where each step is a list of IO benchmarks (Read or Write) + # and a list of non-IO benchmarks (CPU, GPU). Initially these are all "do nothing" benchmarks + N = num_chunks + 2 + steps = [{"io_read_benchmark": IOReadBenchmark(), + "io_write_benchmark": IOWriteBenchmark(), + "cpu_benchmark": CPUBenchmark(cpu_threads=int(10 * args.percent_cpu), + mem_threads=int(10 - 10 * args.percent_cpu), + core=core, + total_mem=args.mem * 1000 * 1000 if args.mem else None), + "gpu_benchmark": GPUBenchmark()} for i in range(N)] + + min_chunk_size_data = 1000 # 1KB per chunk at a minimum for each input / output file, otherwise the file + # is read/written all at once at the beginning/end + + # Augment I/O read benchmarks for each input file cleaned_input = "{}" if args.input_files is None else re.sub(r'\\+', '', args.input_files) + try: + input_files = json.loads(cleaned_input) + except json.JSONDecodeError as e: + log_error(f"Failed to decode --input-files JSON string argument: {e}") + sys.exit(1) + + for file_path in input_files: + file_size = os.path.getsize(file_path) + # If file is zero-size, do nothing + if file_size == 0: + continue + opened_file = open(rundir / file_path, "rb") + # If file is "small" only read it at the beginning + if file_size < num_chunks * min_chunk_size_data: + steps[0]["io_read_benchmark"].add_read_operation(file_path, opened_file, file_size) + continue + # Otherwise, read it in chunks + for step in range(0, N-2): + num_bytes = file_size // num_chunks + (file_size % num_chunks > step) + steps[step]["io_read_benchmark"].add_read_operation(file_path, opened_file, num_bytes) + + # Augment I/O write benchmarks for each output file cleaned_output = "{}" if args.output_files is None else re.sub(r'\\+', '', args.output_files) - # print("CLEANED INPUT", cleaned_input) - # print("CLEANED OUTPUT", cleaned_output) - - if cleaned_input or cleaned_output: - log_debug("Starting IO benchmark...") - - # Attempt to parse the cleaned string - try: - outputs_dict = json.loads(cleaned_output) - except json.JSONDecodeError as e: - log_error(f"Failed to decode --output-files JSON string argument: {e}") - sys.exit(1) - - try: - inputs_array = json.loads(cleaned_input) - except json.JSONDecodeError as e: - log_error(f"Failed to decode --input-files JSON string argument: {e}") - sys.exit(1) - - # print("OUTPUT", outputs_dict) - # print("INPUTS", inputs_array) - - # Create a multiprocessing event that in the first run is set to True - write_done_event = multiprocessing.Event() - # Set this to True to allow the first read to happen - write_done_event.set() - # Print the value of the event - # print("Event Value:", write_done_event.is_set()) - - io_proc = multiprocessing.Process( - target=io_alternate, - args=(inputs_array, outputs_dict, cpu_queue, mem_bytes, rundir, write_done_event) - ) - io_proc.start() - procs.append(io_proc) - - if args.gpu_work: - log_info(f"Starting GPU Benchmark for {args.name}...") - available_gpus = get_available_gpus() #checking for available GPUs - - if not available_gpus: - log_error("No GPU available") - sys.exit(1) - else: - device = available_gpus[0] - log_debug(f"Running on GPU {device}") - - if args.time: - log_debug(f" Time:{args.time}, Work:{args.gpu_work}, Device:{device}") - gpu_benchmark(time=int(args.time), work=int(args.gpu_work), device=device) - else: - gpu_benchmark(work=int(args.gpu_work), device=device) - + try: + output_files = json.loads(cleaned_output) + except json.JSONDecodeError as e: + log_error(f"Failed to decode --output-files JSON string argument: {e}") + sys.exit(1) + + for file_path, file_size in output_files.items(): + # Open the file for writing no matter what (it should be created) + opened_file = open(rundir / file_path, "wb") + # If file is zero-size, do nothing + if file_size == 0: + continue + # If file is "small" only write it at the end + if file_size < num_chunks * min_chunk_size_data: + steps[N-1]["io_write_benchmark"].add_write_operation(file_path, opened_file, file_size) + continue + # Otherwise, write it in chunks + for step in range(2, N): + num_bytes = file_size // num_chunks + (file_size % num_chunks > (step - 2)) + steps[step]["io_write_benchmark"].add_write_operation(file_path, opened_file, num_bytes) + + # Augment CPU benchmark with computation (if need be) if args.cpu_work: - log_info(f"Starting CPU and Memory Benchmarks for {args.name}...") - if core: - log_debug(f"{args.name} acquired core {core}") - - mem_threads=int(10 - 10 * args.percent_cpu) - [cpu_proc, mem_proc] = cpu_mem_benchmark(cpu_queue=cpu_queue, - cpu_threads=int(10 * args.percent_cpu), - mem_threads=mem_threads, - cpu_work=int32_max**2 if args.time else int(args.cpu_work), - core=core, - total_mem=mem_bytes) - procs.append(cpu_proc) if args.time: - time.sleep(int(args.time)) - for proc in procs: - if isinstance(proc, multiprocessing.Process): - if proc.is_alive(): - proc.terminate() - elif isinstance(proc, subprocess.Popen): - kill_process_and_children(proc) + for step in range(1, N-1): + steps[step]["cpu_benchmark"].set_infinite_work() else: - for proc in procs: - if isinstance(proc, subprocess.Popen): - proc.wait() - if io_proc is not None and io_proc.is_alive(): - #io_proc.terminate() - io_proc.join() - - try: - kill_process_and_children(mem_proc) - except subprocess.TimeoutExpired: - log_debug("Memory process did not terminate; force-killing.") - # As a fallback, use pkill if any remaining instances are stuck - subprocess.Popen(["pkill", "-f", "stress-ng"]).wait() - - log_debug("Completed CPU and Memory Benchmarks!") - - # NOTE: If you would like to run only IO add time.sleep(2) - # Check if all procs are done, if not, kill them - log_debug("Checking if all processes are done...") - for proc in procs: - if isinstance(proc, multiprocessing.Process): - if proc.is_alive(): - proc.terminate() - proc.join() - if isinstance(proc, subprocess.Popen): - proc.wait() + for step in range(1, N-1): + chunk_work = int(args.cpu_work) // num_chunks + (int(args.cpu_work) % num_chunks > step - 1) + steps[step]["cpu_benchmark"].set_work(chunk_work) + # Augment GPU benchmark with computation (if need be) + if args.gpu_work: + if args.time: + for step in range(1, N - 1): + steps[step]["gpu_benchmark"].set_device() + steps[step]["gpu_benchmark"].set_work(int(args.gpu_work)) + steps[step]["gpu_benchmark"].set_time(float(args.time)) + else: + for step in range(1, N - 1): + chunk_work = int(args.gpu_work) // num_chunks + (int(args.gpu_work) % num_chunks > step - 1) + steps[step]["gpu_benchmark"].set_device() + steps[step]["gpu_benchmark"].set_work(chunk_work) + + # All benchmarks have been specified, we can just go through the steps blindly + # log_info(f"Starting {args.name} Benchmark") + for step_index, step in enumerate(steps): + log_debug(f"**** STEP {step_index} ***") + io_read_process = step["io_read_benchmark"].run() + io_write_process = step["io_write_benchmark"].run() + [cpu_benchmark_process, memory_benchmark_process] = step["cpu_benchmark"].run() + gpu_benchmark_process = step["gpu_benchmark"].run() + + # If time based, sleep the required amount of time and kill the process + if args.time: + time.sleep(float(args.time) / num_chunks) + if cpu_benchmark_process is not None: + cpu_benchmark_process.terminate_along_with_children() + if gpu_benchmark_process is not None: + gpu_benchmark_process.terminate() + + # Wait for the I/O processes to be done + if io_read_process is not None: + io_read_process.wait() + if io_write_process is not None: + io_write_process.wait() + + # Wait for the CPU process to be done + if cpu_benchmark_process is not None: + cpu_benchmark_process.wait() + + # Kill the Memory process + if memory_benchmark_process is not None: + memory_benchmark_process.terminate_along_with_children() + memory_benchmark_process.wait() + + # Wait for the GPU Process to be done + if gpu_benchmark_process is not None: + gpu_benchmark_process.wait() + + # Cleanups if core: unlock_core(path_locked, path_cores, core) if args.with_flowcept: end_flowcept(flowcept, flowcept_task) - log_info(f"Benchmark {args.name} completed!") + log_debug(f"{args.name} Benchmark Completed") + + # OLD CODE BELOW: + # + # procs = [] + # io_proc = None + # outputs_dict = {} + # + # cpu_queue = multiprocessing.Queue() + # + # log_debug(f"Working directory: {os.getcwd()}") + # + # if cleaned_input or cleaned_output: + # log_debug("Starting IO benchmark...") + # + # # Attempt to parse the cleaned string + # try: + # outputs_dict = json.loads(cleaned_output) + # except json.JSONDecodeError as e: + # log_error(f"Failed to decode --output-files JSON string argument: {e}") + # sys.exit(1) + # + # try: + # inputs_array = json.loads(cleaned_input) + # except json.JSONDecodeError as e: + # log_error(f"Failed to decode --input-files JSON string argument: {e}") + # sys.exit(1) + # + # # print("OUTPUT", outputs_dict) + # # print("INPUTS", inputs_array) + # + # # Create a multiprocessing event that in the first run is set to True + # write_done_event = multiprocessing.Event() + # # Set this to True to allow the first read to happen + # write_done_event.set() + # # Print the value of the event + # # print("Event Value:", write_done_event.is_set()) + # + # io_proc = multiprocessing.Process( + # target=io_alternate, + # args=(inputs_array, outputs_dict, cpu_queue, mem_bytes, rundir, write_done_event) + # ) + # io_proc.start() + # procs.append(io_proc) + # + # if args.gpu_work: + # log_info(f"Starting GPU Benchmark for {args.name}...") + # available_gpus = get_available_gpus() #checking for available GPUs + # + # if not available_gpus: + # log_error("No GPU available") + # sys.exit(1) + # else: + # device = available_gpus[0] + # log_debug(f"Running on GPU {device}") + # + # if args.time: + # log_debug(f" Time:{args.time}, Work:{args.gpu_work}, Device:{device}") + # gpu_benchmark(time=int(args.time), work=int(args.gpu_work), device=device) + # else: + # gpu_benchmark(work=int(args.gpu_work), device=device) + # + # if args.cpu_work: + # log_info(f"Starting CPU and Memory Benchmarks for {args.name}...") + # if core: + # log_debug(f"{args.name} acquired core {core}") + # + # mem_threads=int(10 - 10 * args.percent_cpu) + # [cpu_proc, mem_proc] = cpu_mem_benchmark(cpu_queue=cpu_queue, + # cpu_threads=int(10 * args.percent_cpu), + # mem_threads=mem_threads, + # cpu_work=int32_max**2 if args.time else int(args.cpu_work), + # core=core, + # total_mem=mem_bytes) + # procs.append(cpu_proc) + # if args.time: + # time.sleep(int(args.time)) + # for proc in procs: + # if isinstance(proc, multiprocessing.Process): + # if proc.is_alive(): + # proc.terminate() + # elif isinstance(proc, subprocess.Popen): + # kill_process_and_children(proc) + # else: + # for proc in procs: + # if isinstance(proc, subprocess.Popen): + # proc.wait() + # if io_proc is not None and io_proc.is_alive(): + # #io_proc.terminate() + # io_proc.join() + # + # try: + # kill_process_and_children(mem_proc) + # except subprocess.TimeoutExpired: + # log_debug("Memory process did not terminate; force-killing.") + # # As a fallback, use pkill if any remaining instances are stuck + # subprocess.Popen(["pkill", "-f", "stress-ng"]).wait() + # + # log_debug("Completed CPU and Memory Benchmarks!") + # + # NOTE: If you would like to run only IO add time.sleep(2) + # Check if all procs are done, if not, kill them + # log_debug("Checking if all processes are done...") + # for proc in procs: + # if isinstance(proc, multiprocessing.Process): + # if proc.is_alive(): + # proc.terminate() + # proc.join() + # if isinstance(proc, subprocess.Popen): + # proc.wait() + # + # + # log_info(f"Benchmark {args.name} completed!") if __name__ == "__main__": main() From ebe24ab492d98d986dc88b688b94f94e15068ac6 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Thu, 19 Feb 2026 15:09:48 -1000 Subject: [PATCH 13/33] Made it to that even if wfbench is ^C-ed, it doesn't leave runaway processes around --- bin/wfbench | 90 ++++++++++++++++++++++++++++++++--------------------- 1 file changed, 55 insertions(+), 35 deletions(-) diff --git a/bin/wfbench b/bin/wfbench index 0f53cb9d..1227e194 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -80,10 +80,11 @@ class ProcessHandle: self._proc.terminate() def terminate_along_with_children(self): + # If it's a multiprocessing, just kill the parent and return if isinstance(self._proc, multiprocessing.Process): - log_error("Calling terminate_along_with_children() on a multiprocessing.Process is a likely bug") + self._proc.terminate() return - + # If it's a Popen, then do the brute-force thing try: parent = psutil.Process(self._proc.pid) children = parent.children(recursive=True) @@ -595,6 +596,12 @@ def compute_num_chunks(args): num_chunks = max(num_chunks, 1) # The above computations may say "zero" return num_chunks +def kill_current_handles(handles: list[ProcessHandle]): + for handle in handles: + if handle is not None and handle.is_alive(): + handle.terminate_along_with_children() + + def main(): """Main program.""" parser = get_parser() @@ -618,10 +625,10 @@ def main(): core = lock_core(path_locked, path_cores) if args.time and (not args.cpu_work and not args.gpu_work): - log_error("If --time is provided, then --cpu-work and/or --gpu-work must also be provided.") + log_error("If --time is provided, at least one of --cpu-work and --gpu-work must also be provided.") sys.exit(1) - # Compute the (feasible) number of chunks + # Compute the (feasible) number of chunks based on the arguments num_chunks = compute_num_chunks(args) log_debug(f"Executing benchmark with {num_chunks} chunks.") @@ -718,39 +725,52 @@ def main(): # All benchmarks have been specified, we can just go through the steps blindly # log_info(f"Starting {args.name} Benchmark") - for step_index, step in enumerate(steps): - log_debug(f"**** STEP {step_index} ***") - io_read_process = step["io_read_benchmark"].run() - io_write_process = step["io_write_benchmark"].run() - [cpu_benchmark_process, memory_benchmark_process] = step["cpu_benchmark"].run() - gpu_benchmark_process = step["gpu_benchmark"].run() - - # If time based, sleep the required amount of time and kill the process - if args.time: - time.sleep(float(args.time) / num_chunks) + + current_proc_handles = [] + try: + for step_index, step in enumerate(steps): + log_debug(f"**** STEP {step_index} ***") + io_read_process = step["io_read_benchmark"].run() + current_proc_handles += [io_read_process] + io_write_process = step["io_write_benchmark"].run() + [cpu_benchmark_process, memory_benchmark_process] = step["cpu_benchmark"].run() + current_proc_handles += [cpu_benchmark_process, memory_benchmark_process] + gpu_benchmark_process = step["gpu_benchmark"].run() + current_proc_handles += [gpu_benchmark_process] + current_proc_handles[:] = [io_read_process, cpu_benchmark_process, memory_benchmark_process, gpu_benchmark_process] + + # If time based, sleep the required amount of time and kill the process + if args.time: + time.sleep(float(args.time) / num_chunks) + if cpu_benchmark_process is not None: + cpu_benchmark_process.terminate_along_with_children() + if gpu_benchmark_process is not None: + gpu_benchmark_process.terminate() + + # Wait for the I/O processes to be done + if io_read_process is not None: + io_read_process.wait() + if io_write_process is not None: + io_write_process.wait() + + # Wait for the CPU process to be done if cpu_benchmark_process is not None: - cpu_benchmark_process.terminate_along_with_children() + cpu_benchmark_process.wait() + + # Kill the Memory process + if memory_benchmark_process is not None: + memory_benchmark_process.terminate_along_with_children() + memory_benchmark_process.wait() + + # Wait for the GPU Process to be done if gpu_benchmark_process is not None: - gpu_benchmark_process.terminate() - - # Wait for the I/O processes to be done - if io_read_process is not None: - io_read_process.wait() - if io_write_process is not None: - io_write_process.wait() - - # Wait for the CPU process to be done - if cpu_benchmark_process is not None: - cpu_benchmark_process.wait() - - # Kill the Memory process - if memory_benchmark_process is not None: - memory_benchmark_process.terminate_along_with_children() - memory_benchmark_process.wait() - - # Wait for the GPU Process to be done - if gpu_benchmark_process is not None: - gpu_benchmark_process.wait() + gpu_benchmark_process.wait() + except KeyboardInterrupt: + log_debug("Detected Keyboard interrupt: cleaning up processes...") + kill_current_handles(current_proc_handles) + sys.exit(1) + finally: + kill_current_handles(current_proc_handles) # Cleanups if core: From 85079cb9b7ce18263e3c3f954abf1c765df13597 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Thu, 19 Feb 2026 15:15:22 -1000 Subject: [PATCH 14/33] Minor fix --- bin/wfbench | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bin/wfbench b/bin/wfbench index 1227e194..a196175f 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -768,10 +768,11 @@ def main(): except KeyboardInterrupt: log_debug("Detected Keyboard interrupt: cleaning up processes...") kill_current_handles(current_proc_handles) - sys.exit(1) finally: + log_debug("Aborting: cleaning up processes...") kill_current_handles(current_proc_handles) + # Cleanups if core: unlock_core(path_locked, path_cores, core) From 8f46f0ec7b0a2d552f3b0bf4a7ca3c477403787b Mon Sep 17 00:00:00 2001 From: Steven Hahn Date: Mon, 2 Mar 2026 14:54:19 -0500 Subject: [PATCH 15/33] check container output Signed-off-by: Steven Hahn --- tests/wfbench/test_wfbench.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/wfbench/test_wfbench.py b/tests/wfbench/test_wfbench.py index 07593690..7fce75a2 100644 --- a/tests/wfbench/test_wfbench.py +++ b/tests/wfbench/test_wfbench.py @@ -165,7 +165,7 @@ def test_create_from_recipe(self) -> None: # Run the workflow sys.stderr.write("Running workflow...\n") exit_code, output = container.exec_run(cmd="/bin/bash ./run_workflow.sh", stdout=True, stderr=True) - + print(output.decode()) # Kill the container _shutdown_docker_container_and_remove_image(container) From 61fe80c304139291f2ace5fe2a4396aa93e10912 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Wed, 18 Mar 2026 13:41:10 -1000 Subject: [PATCH 16/33] bug-- in bin/wfbench --- bin/wfbench | 22 ++++++++++--------- .../test_translators_loggers.py | 21 +++++++++--------- tests/wfbench/test_wfbench.py | 1 - 3 files changed, 23 insertions(+), 21 deletions(-) diff --git a/bin/wfbench b/bin/wfbench index a196175f..f9d0af77 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -543,7 +543,8 @@ def get_parser() -> argparse.ArgumentParser: "if amounts of work and or input/output file sizes are too small).") parser.add_argument("--gpu-work", default=None, help="Amount of GPU work.") parser.add_argument("--time", default=None, help="Time limit (in seconds) to complete " - "the computational portion of the benchmark (overrides CPU and GPU works).") + "the computational portion of the benchmark (overrides CPU and GPU works). " + "Is only approximate since I/O time may make the overall time longer.") parser.add_argument("--mem", type=float, default=None, help="Maximum memory consumption (in MB).") parser.add_argument("--output-files", help="Output file names with sizes in bytes as a JSON dictionary " "(e.g., --output-files {\\\"file1\\\": 1024, \\\"file2\\\": 2048}).") @@ -591,7 +592,7 @@ def compute_num_chunks(args): num_chunks_gpu = min(int(args.num_chunks), int(float(args.gpu_work) / min_chunk_size_gpu_work)) else: num_chunks_gpu = 1 - num_chunks = min(num_chunks_cpu, num_chunks_cpu) + num_chunks = min(num_chunks_cpu, num_chunks_gpu) num_chunks = max(num_chunks, 1) # The above computations may say "zero" return num_chunks @@ -624,9 +625,9 @@ def main(): path_cores = pathlib.Path(args.path_cores) core = lock_core(path_locked, path_cores) - if args.time and (not args.cpu_work and not args.gpu_work): - log_error("If --time is provided, at least one of --cpu-work and --gpu-work must also be provided.") - sys.exit(1) + # if args.time and (not args.cpu_work and not args.gpu_work): + # log_error("If --time is provided, at least one of --cpu-work and --gpu-work must also be provided.") + # sys.exit(1) # Compute the (feasible) number of chunks based on the arguments num_chunks = compute_num_chunks(args) @@ -741,11 +742,12 @@ def main(): # If time based, sleep the required amount of time and kill the process if args.time: - time.sleep(float(args.time) / num_chunks) - if cpu_benchmark_process is not None: - cpu_benchmark_process.terminate_along_with_children() - if gpu_benchmark_process is not None: - gpu_benchmark_process.terminate() + if cpu_benchmark_process is not None or gpu_benchmark_process is not None: + time.sleep(float(args.time) / num_chunks) + if cpu_benchmark_process is not None: + cpu_benchmark_process.terminate_along_with_children() + if gpu_benchmark_process is not None: + gpu_benchmark_process.terminate() # Wait for the I/O processes to be done if io_read_process is not None: diff --git a/tests/translators_loggers/test_translators_loggers.py b/tests/translators_loggers/test_translators_loggers.py index 135a6a4b..8e5cab60 100644 --- a/tests/translators_loggers/test_translators_loggers.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -241,17 +241,17 @@ class TestTranslators: @pytest.mark.parametrize( "backend", [ - "swiftt", - "dask", - "parsl", - "nextflow", - "nextflow_subworkflow", - "airflow", + # "swiftt", + # "dask", + # "parsl", + # "nextflow", + # "nextflow_subworkflow", + # "airflow", "bash", - "taskvine", - "makeflow", - "cwl", - "pegasus", + # "taskvine", + # "makeflow", + # "cwl", + # "pegasus", ]) @pytest.mark.unit # @pytest.mark.skip(reason="tmp") @@ -318,5 +318,6 @@ def test_translator(self, backend) -> None: _compare_workflows(original_workflow, reconstructed_workflow) # Shutdown the container (weirdly, container is already shutdown by now... not sure how) + time.sleep(100000) _shutdown_docker_container_and_remove_image(container) diff --git a/tests/wfbench/test_wfbench.py b/tests/wfbench/test_wfbench.py index 07593690..567c59df 100644 --- a/tests/wfbench/test_wfbench.py +++ b/tests/wfbench/test_wfbench.py @@ -53,7 +53,6 @@ def _workflow_as_expected(dirpath: pathlib.Path, with json_path.open("r") as f: generated_json = json.load(f) - # Check the number of tasks assert(len(workflow.tasks) == len(generated_json['workflow']['specification']['tasks'])) From 4dee4fdef29f4e5d96ef273025af2a419430f9b5 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Wed, 18 Mar 2026 13:42:04 -1000 Subject: [PATCH 17/33] bug-- in wfbench --- .../test_translators_loggers.py | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/tests/translators_loggers/test_translators_loggers.py b/tests/translators_loggers/test_translators_loggers.py index 8e5cab60..135a6a4b 100644 --- a/tests/translators_loggers/test_translators_loggers.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -241,17 +241,17 @@ class TestTranslators: @pytest.mark.parametrize( "backend", [ - # "swiftt", - # "dask", - # "parsl", - # "nextflow", - # "nextflow_subworkflow", - # "airflow", + "swiftt", + "dask", + "parsl", + "nextflow", + "nextflow_subworkflow", + "airflow", "bash", - # "taskvine", - # "makeflow", - # "cwl", - # "pegasus", + "taskvine", + "makeflow", + "cwl", + "pegasus", ]) @pytest.mark.unit # @pytest.mark.skip(reason="tmp") @@ -318,6 +318,5 @@ def test_translator(self, backend) -> None: _compare_workflows(original_workflow, reconstructed_workflow) # Shutdown the container (weirdly, container is already shutdown by now... not sure how) - time.sleep(100000) _shutdown_docker_container_and_remove_image(container) From 50e42f96f954abf4282c54e614da59f7c68f062d Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Wed, 18 Mar 2026 13:51:18 -1000 Subject: [PATCH 18/33] Updated the create_benchmark() method to allow specifying the number of chunks for each task execution --- wfcommons/wfbench/bench.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/wfcommons/wfbench/bench.py b/wfcommons/wfbench/bench.py index 2f013546..7e7076a0 100644 --- a/wfcommons/wfbench/bench.py +++ b/wfcommons/wfbench/bench.py @@ -252,6 +252,7 @@ def create_benchmark(self, percent_cpu: Union[float, Dict[str, float]] = 0.6, cpu_work: Union[int, Dict[str, int]] = None, gpu_work: Union[int, Dict[str, int]] = None, + num_chunks: Optional[int] = 10, time: Optional[int] = None, data: Optional[int] = 0, mem: Optional[float] = None, @@ -269,6 +270,8 @@ def create_benchmark(self, :type cpu_work: Union[int, Dict[str, int]] :param gpu_work: GPU work per workflow task. :type gpu_work: Union[int, Dict[str, int]] + :param num_chunks: Number of chunks for pipelining I/O and computation for each task execution. + :type num_chunks: Optional[int] :param time: Time limit for running each task (in seconds). :type time: Optional[int] :param data: Total workflow data footprint (in MB). @@ -308,6 +311,7 @@ def create_benchmark(self, cpu_work, gpu_work, time, + num_chunks, mem, lock_files_folder, cores, @@ -367,6 +371,7 @@ def _set_argument_parameters(self, cpu_work: Union[int, Dict[str, int]], gpu_work: Union[int, Dict[str, int]], time: Optional[int], + num_chunks: Optional[int], mem: Optional[float], lock_files_folder: Optional[pathlib.Path], cores: Optional[pathlib.Path], @@ -381,6 +386,7 @@ def _set_argument_parameters(self, params.extend(cpu_params) gpu_params = self._generate_task_gpu_params(task, gpu_work) params.extend(gpu_params) + params.extend([f"--num-chunks {num_chunks}"]) if mem: params.extend([f"--mem {mem}"]) From 54c212e6ec34e02ad9f9e355236a3a1c6cae5e76 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Wed, 18 Mar 2026 18:59:29 -1000 Subject: [PATCH 19/33] Insane race-condition bug fix if wfbench.py (having to deal with killing children processes that are becoming orphans) Some API updates to specify num_chunks at the user level --- bin/wfbench | 334 ++---------------- .../test_translators_loggers.py | 6 +- wfcommons/wfbench/bench.py | 4 + 3 files changed, 29 insertions(+), 315 deletions(-) diff --git a/bin/wfbench b/bin/wfbench index f9d0af77..68c73a02 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -12,6 +12,7 @@ import os import pathlib import subprocess import time +import signal import sys import argparse import re @@ -80,26 +81,26 @@ class ProcessHandle: self._proc.terminate() def terminate_along_with_children(self): - # If it's a multiprocessing, just kill the parent and return if isinstance(self._proc, multiprocessing.Process): self._proc.terminate() return - # If it's a Popen, then do the brute-force thing try: - parent = psutil.Process(self._proc.pid) - children = parent.children(recursive=True) - for child in children: - try: - child.kill() - except psutil.NoSuchProcess: - pass # Process is already dead' + pgid = os.getpgid(self._proc.pid) + os.killpg(pgid, signal.SIGKILL) + except ProcessLookupError: + pass # group leader already gone, try children directly + except PermissionError: + pass + finally: + # Catch any re-parented children (ppid=1) that psutil can still see try: - parent.kill() + for child in psutil.Process(self._proc.pid).children(recursive=True): + try: + child.kill() + except psutil.NoSuchProcess: + pass except psutil.NoSuchProcess: - pass # Nevermind - except subprocess.TimeoutExpired: - log_debug("Process did not terminate; force-killing.") - subprocess.Popen(["pkill", "-f", "stress-ng"]).wait() + pass def wait(self): if isinstance(self._proc, multiprocessing.Process): @@ -348,186 +349,6 @@ def unlock_core(path_locked: pathlib.Path, finally: lock.release() -# def monitor_progress(proc, cpu_queue): -# """Monitor progress from the CPU benchmark process.""" -# for line in iter(proc.stdout.readline, ""): # No decode needed -# line = line.strip() -# if line.startswith("Progress:"): -# try: -# progress = float(line.split()[1].strip('%')) -# cpu_queue.put(progress) -# except (ValueError, IndexError): -# pass -# -# def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue, -# cpu_threads: Optional[int] = 5, -# mem_threads: Optional[int] = 5, -# cpu_work: Optional[int] = 100, -# core: Optional[int] = None, -# total_mem: Optional[int] = None) -> List: -# """ -# Run CPU and memory benchmark. -# -# :param cpu_queue: Queue to push CPU benchmark progress as a float. -# :type cpu_queue: multiprocessing.Queue -# :param cpu_threads: Number of threads for CPU benchmark. -# :type cpu_threads: Optional[int] -# :param mem_threads: Number of threads for memory benchmark. -# :type mem_threads: Optional[int] -# :param cpu_work: Total work units for CPU benchmark. -# :type cpu_work: Optional[int] -# :param core: Core to pin the benchmark processes to. -# :type core: Optional[int] -# :param total_mem: Total memory to use for memory benchmark. -# :type total_mem: Optional[float] -# -# :return: Lists of CPU and memory subprocesses. -# :rtype: List -# """ -# total_mem = f"{total_mem}B" if total_mem else f"{100.0 / os.cpu_count()}%" -# cpu_work_per_thread = int(1000000 * cpu_work / (16384 * cpu_threads)) if cpu_threads != 0 else int32_max**2 -# cpu_samples = min(cpu_work_per_thread, int32_max) -# cpu_ops = (cpu_work_per_thread + int32_max - 1) // int32_max -# if cpu_ops > int32_max: -# log_info("Exceeded maximum allowed value of cpu work.") -# cpu_ops = int32_max -# -# cpu_proc = None -# mem_proc = None -# -# cpu_prog = ["stress-ng", "--monte-carlo", f"{cpu_threads}", -# "--monte-carlo-method", "pi", -# "--monte-carlo-rand", "lcg", -# "--monte-carlo-samples", f"{cpu_samples}", -# "--monte-carlo-ops", f"{cpu_ops}", -# "--quiet"] -# mem_prog = ["stress-ng", "--vm", f"{mem_threads}", -# "--vm-bytes", f"{total_mem}", "--vm-keep", "--quiet"] -# -# if cpu_threads > 0: -# cpu_proc = subprocess.Popen(cpu_prog, preexec_fn=os.setsid) -# -# # NOTE: might be a good idea to use psutil to set the affinity (works across platforms) -# if core: -# os.sched_setaffinity(cpu_proc.pid, {core}) -# -# if mem_threads > 0: -# # NOTE: add a check to use creationflags=subprocess.CREATE_NEW_PROCESS_GROUP for Windows -# mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid) -# if core: -# os.sched_setaffinity(mem_proc.pid, {core}) -# -# return [cpu_proc, mem_proc] -# -# -# def io_read_benchmark_user_input_data_size(inputs, -# rundir=None, -# memory_limit=None): -# if memory_limit is None: -# memory_limit = -1 -# memory_limit = int(memory_limit) -# log_debug("Starting IO Read Benchmark...") -# for file, size in inputs.items(): -# with open(rundir.joinpath(file), "rb") as fp: -# log_debug(f"Reading '{file}'") -# chunk_size = min(size, memory_limit) -# while fp.read(chunk_size): -# pass -# log_debug("Completed IO Read Benchmark!") -# -# -# def io_write_benchmark_user_input_data_size(outputs, -# rundir=None, -# memory_limit=None): -# if memory_limit is None: -# memory_limit = sys.maxsize -# memory_limit = int(memory_limit) -# for file_name, file_size in outputs.items(): -# log_debug(f"Writing output file '{file_name}'") -# file_size_todo = file_size -# while file_size_todo > 0: -# with open(rundir.joinpath(file_name), "ab") as fp: -# chunk_size = min(file_size_todo, memory_limit) -# file_size_todo -= fp.write(os.urandom(int(chunk_size))) -# -# -# def io_alternate(inputs, outputs, cpu_queue: multiprocessing.Queue, memory_limit=None, rundir=None, event=None): -# """Alternate between reading and writing to a file, ensuring read only happens after write.""" -# -# if memory_limit is None: -# memory_limit = 10 * 1024 * 1024 # sys.maxsize -# memory_limit = int(memory_limit) -# -# # queue will have messages in the form (cpu_percent_completed) -# # Get the last message and trash the rest -# -# # Create empty files -# for name in outputs: -# open(rundir.joinpath(name), "wb").close() -# -# io_completed = 1 -# bytes_read = { -# name: 0 -# for name in inputs -# } -# bytes_written = { -# name: 0 -# for name in outputs -# } -# -# # get size of inputs -# inputs = { -# name: os.path.getsize(rundir.joinpath(name)) -# for name in inputs -# } -# -# while io_completed < 100: -# #cpu_percent = max(io_completed, cpu_queue.get()) -# #while True: # Get the last message -# # try: -# # cpu_percent = max(io_completed, cpu_queue.get_nowait()) -# # except queue.Empty: -# # break -# -# log_debug(f"IO Percent: {io_completed}") -# if True: #cpu_percent: -# bytes_to_read = { -# name: int(size * (io_completed / 100) - bytes_read[name]) -# for name, size in inputs.items() -# } -# bytes_to_write = { -# name: int(size * (io_completed / 100) - bytes_written[name]) -# for name, size in outputs.items() -# } -# io_read_benchmark_user_input_data_size(bytes_to_read, rundir, memory_limit=memory_limit) -# io_write_benchmark_user_input_data_size(bytes_to_write, rundir, memory_limit=memory_limit) -# -# bytes_read = { -# name: bytes_read[name] + bytes_to_read[name] -# for name in bytes_to_read -# } -# bytes_written = { -# name: bytes_written[name] + bytes_to_write[name] -# for name in bytes_to_write -# } -# -# log_debug(f"Bytes Read: {bytes_read}") -# log_debug(f"Bytes Written: {bytes_written}") -# -# io_completed = io_completed + 1 -# -# if io_completed >= 100: -# break - - -# def gpu_benchmark(time: int = 100, -# work: int = 100, -# device: int = 0): #work, device -# -# gpu_prog = [f"CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES={device} {this_dir.joinpath('./gpu_benchmark')} {work} {time}"] -# log_debug(f"Running GPU Benchmark: {gpu_prog}") -# subprocess.Popen(gpu_prog, shell=True) - def get_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser() @@ -550,7 +371,8 @@ def get_parser() -> argparse.ArgumentParser: "(e.g., --output-files {\\\"file1\\\": 1024, \\\"file2\\\": 2048}).") parser.add_argument("--input-files", help="Input files names as a JSON array " "(e.g., --input-files [\\\"file3\\\", \\\"file4\\\"]).") - parser.add_argument("--debug", action="store_true", help="Enable debug messages.") + parser.add_argument("--silent", action="store_true", help="Disable all log messages.") + parser.add_argument("--debug", action="store_true", help="Enable debug log messages.") parser.add_argument("--with-flowcept", action="store_true", default=False, help="Enable Flowcept monitoring.") parser.add_argument("--workflow_id", default=None, help="Id to group tasks in a workflow.") @@ -612,6 +434,8 @@ def main(): if args.with_flowcept: flowcept, flowcept_task = begin_flowcept(args) + if args.silent: + logging.getLogger().setLevel(logging.NOTSET) if args.debug: logging.getLogger().setLevel(logging.DEBUG) @@ -625,9 +449,9 @@ def main(): path_cores = pathlib.Path(args.path_cores) core = lock_core(path_locked, path_cores) - # if args.time and (not args.cpu_work and not args.gpu_work): - # log_error("If --time is provided, at least one of --cpu-work and --gpu-work must also be provided.") - # sys.exit(1) + if not args.time and (not args.cpu_work and not args.gpu_work): + log_error("At least one of --time, --cpu-work, or --gpu-work should be provided.") + sys.exit(1) # Compute the (feasible) number of chunks based on the arguments num_chunks = compute_num_chunks(args) @@ -782,119 +606,7 @@ def main(): if args.with_flowcept: end_flowcept(flowcept, flowcept_task) - log_debug(f"{args.name} Benchmark Completed") - - # OLD CODE BELOW: - # - # procs = [] - # io_proc = None - # outputs_dict = {} - # - # cpu_queue = multiprocessing.Queue() - # - # log_debug(f"Working directory: {os.getcwd()}") - # - # if cleaned_input or cleaned_output: - # log_debug("Starting IO benchmark...") - # - # # Attempt to parse the cleaned string - # try: - # outputs_dict = json.loads(cleaned_output) - # except json.JSONDecodeError as e: - # log_error(f"Failed to decode --output-files JSON string argument: {e}") - # sys.exit(1) - # - # try: - # inputs_array = json.loads(cleaned_input) - # except json.JSONDecodeError as e: - # log_error(f"Failed to decode --input-files JSON string argument: {e}") - # sys.exit(1) - # - # # print("OUTPUT", outputs_dict) - # # print("INPUTS", inputs_array) - # - # # Create a multiprocessing event that in the first run is set to True - # write_done_event = multiprocessing.Event() - # # Set this to True to allow the first read to happen - # write_done_event.set() - # # Print the value of the event - # # print("Event Value:", write_done_event.is_set()) - # - # io_proc = multiprocessing.Process( - # target=io_alternate, - # args=(inputs_array, outputs_dict, cpu_queue, mem_bytes, rundir, write_done_event) - # ) - # io_proc.start() - # procs.append(io_proc) - # - # if args.gpu_work: - # log_info(f"Starting GPU Benchmark for {args.name}...") - # available_gpus = get_available_gpus() #checking for available GPUs - # - # if not available_gpus: - # log_error("No GPU available") - # sys.exit(1) - # else: - # device = available_gpus[0] - # log_debug(f"Running on GPU {device}") - # - # if args.time: - # log_debug(f" Time:{args.time}, Work:{args.gpu_work}, Device:{device}") - # gpu_benchmark(time=int(args.time), work=int(args.gpu_work), device=device) - # else: - # gpu_benchmark(work=int(args.gpu_work), device=device) - # - # if args.cpu_work: - # log_info(f"Starting CPU and Memory Benchmarks for {args.name}...") - # if core: - # log_debug(f"{args.name} acquired core {core}") - # - # mem_threads=int(10 - 10 * args.percent_cpu) - # [cpu_proc, mem_proc] = cpu_mem_benchmark(cpu_queue=cpu_queue, - # cpu_threads=int(10 * args.percent_cpu), - # mem_threads=mem_threads, - # cpu_work=int32_max**2 if args.time else int(args.cpu_work), - # core=core, - # total_mem=mem_bytes) - # procs.append(cpu_proc) - # if args.time: - # time.sleep(int(args.time)) - # for proc in procs: - # if isinstance(proc, multiprocessing.Process): - # if proc.is_alive(): - # proc.terminate() - # elif isinstance(proc, subprocess.Popen): - # kill_process_and_children(proc) - # else: - # for proc in procs: - # if isinstance(proc, subprocess.Popen): - # proc.wait() - # if io_proc is not None and io_proc.is_alive(): - # #io_proc.terminate() - # io_proc.join() - # - # try: - # kill_process_and_children(mem_proc) - # except subprocess.TimeoutExpired: - # log_debug("Memory process did not terminate; force-killing.") - # # As a fallback, use pkill if any remaining instances are stuck - # subprocess.Popen(["pkill", "-f", "stress-ng"]).wait() - # - # log_debug("Completed CPU and Memory Benchmarks!") - # - # NOTE: If you would like to run only IO add time.sleep(2) - # Check if all procs are done, if not, kill them - # log_debug("Checking if all processes are done...") - # for proc in procs: - # if isinstance(proc, multiprocessing.Process): - # if proc.is_alive(): - # proc.terminate() - # proc.join() - # if isinstance(proc, subprocess.Popen): - # proc.wait() - # - # - # log_info(f"Benchmark {args.name} completed!") + log_info(f"{args.name} benchmark completed") if __name__ == "__main__": main() diff --git a/tests/translators_loggers/test_translators_loggers.py b/tests/translators_loggers/test_translators_loggers.py index 2eccca46..0ccc010c 100644 --- a/tests/translators_loggers/test_translators_loggers.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -130,7 +130,7 @@ def run_workflow_dask(container, num_tasks, str_dirpath): exit_code, output = container.exec_run("python ./dask_workflow.py", user="wfcommons", stdout=True, stderr=True) # Check sanity assert (exit_code == 0) - assert (output.decode().count("completed!") == num_tasks) + assert (output.decode().count("benchmark completed") == num_tasks) # TODO: Look at the (I think) generated run.json file on the container? def run_workflow_parsl(container, num_tasks, str_dirpath): @@ -163,7 +163,7 @@ def run_workflow_bash(container, num_tasks, str_dirpath): exit_code, output = container.exec_run(cmd="/bin/bash ./run_workflow.sh", user="wfcommons", stdout=True, stderr=True) # Check sanity assert (exit_code == 0) - assert (output.decode().count("completed") == num_tasks) + assert (output.decode().count("benchmark completed") == num_tasks) def run_workflow_taskvine(container, num_tasks, str_dirpath): # Run the workflow! @@ -187,7 +187,6 @@ def run_workflow_cwl(container, num_tasks, str_dirpath): # Note that the input file is hardcoded and Blast-specific exit_code, output = container.exec_run(cmd="cwltool ./main.cwl --split_fasta_00000001_input ./data/workflow_infile_0001 ", user="wfcommons", stdout=True, stderr=True) - # print(output.decode()) # Check sanity assert (exit_code == 0) # this below is ugly (the 3 is for "workflow", "compile_output_files" and "compile_log_files", @@ -199,7 +198,6 @@ def run_workflow_streamflow(container, num_tasks, str_dirpath): # Note that the input file is hardcoded and Blast-specific exit_code, output = container.exec_run(cmd="streamflow run ./streamflow.yml", user="wfcommons", stdout=True, stderr=True) - # print(output.decode()) # Check sanity assert (exit_code == 0) # 2 extra "COMPLETED Step" ("COMPLETED Step /compile_output_files", "COMPLETED Step /compile_log_files") diff --git a/wfcommons/wfbench/bench.py b/wfcommons/wfbench/bench.py index 7e7076a0..7bee4eb0 100644 --- a/wfcommons/wfbench/bench.py +++ b/wfcommons/wfbench/bench.py @@ -86,6 +86,7 @@ def create_benchmark_from_synthetic_workflow( percent_cpu: Union[float, Dict[str, float]] = 0.6, cpu_work: Union[int, Dict[str, int]] = None, gpu_work: Union[int, Dict[str, int]] = None, + num_chunks: Optional[int] = 10, time: Optional[int] = None, mem: Optional[float] = None, lock_files_folder: Optional[pathlib.Path] = None, @@ -102,6 +103,8 @@ def create_benchmark_from_synthetic_workflow( :type cpu_work: Union[int, Dict[str, int]] :param gpu_work: Maximum GPU work per workflow task. :type gpu_work: Union[int, Dict[str, int]] + :param num_chunks: Number of chunks for pipelining I/O and computation for each task execution. + :type num_chunks: Optional[int] :param time: Time limit for running each task (in seconds). :type time: Optional[int] :param mem: Maximum amount of memory consumption per task (in MB). @@ -164,6 +167,7 @@ def create_benchmark_from_synthetic_workflow( task_percent_cpu, task_cpu_work, task_gpu_work, + num_chunks, time, task_memory, lock_files_folder, From 68fb5db07a9bc9c84be4168503b713acfaa2e0d0 Mon Sep 17 00:00:00 2001 From: Steven Hahn Date: Fri, 20 Mar 2026 15:13:02 -0400 Subject: [PATCH 20/33] cleanup Signed-off-by: Steven Hahn --- tests/wfbench/test_wfbench.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/wfbench/test_wfbench.py b/tests/wfbench/test_wfbench.py index 03dd13d3..ea86809f 100644 --- a/tests/wfbench/test_wfbench.py +++ b/tests/wfbench/test_wfbench.py @@ -54,6 +54,7 @@ def _workflow_as_expected(dirpath: pathlib.Path, with json_path.open("r") as f: generated_json = json.load(f) + # Check the number of tasks assert(len(workflow.tasks) == len(generated_json['workflow']['specification']['tasks'])) @@ -169,7 +170,7 @@ def test_create_from_recipe(self) -> None: # Run the workflow sys.stderr.write("Running workflow...\n") exit_code, output = container.exec_run(cmd="/bin/bash ./run_workflow.sh", stdout=True, stderr=True) - print(output.decode()) + # Kill the container _shutdown_docker_container_and_remove_image(container) From 8bd49b5159fb07d97068d7a9b67b3ff8a3c72e62 Mon Sep 17 00:00:00 2001 From: Steven Hahn Date: Fri, 20 Mar 2026 15:37:45 -0400 Subject: [PATCH 21/33] cleanup Signed-off-by: Steven Hahn --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 4e3179b1..b0ef8dd3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ dependencies = [ "requests", "scipy>=1.16.1", "pyyaml", + "pandas", "shortuuid", "stringcase", "filelock", From 8639ee7836b420fc5459c29e361f32e8fe33e445 Mon Sep 17 00:00:00 2001 From: Steven Hahn Date: Fri, 20 Mar 2026 16:09:10 -0400 Subject: [PATCH 22/33] commented out code Signed-off-by: Steven Hahn --- bin/wfbench | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/bin/wfbench b/bin/wfbench index 68c73a02..c1802446 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -270,20 +270,6 @@ class GPUBenchmark: p = subprocess.Popen(gpu_prog, shell=True) return ProcessHandle(p) -# def kill_process_and_children(proc): -# if proc is None: -# return -# try: -# parent = psutil.Process(proc.pid) -# children = parent.children(recursive=True) -# for child in children: -# child.kill() -# parent.kill() -# -# except psutil.NoSuchProcess: -# pass # Process is already dead - - # Utility code functions ######################## From 90a6a490aa276ff863a141c0dd2f4c7a472565bd Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Fri, 20 Mar 2026 14:21:26 -1000 Subject: [PATCH 23/33] Updagted wfbench to make it callable as a module --- bin/wfbench | 110 +++++++++++++++++++++++++++++----------------------- 1 file changed, 62 insertions(+), 48 deletions(-) diff --git a/bin/wfbench b/bin/wfbench index 68c73a02..0532540c 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -363,7 +363,7 @@ def get_parser() -> argparse.ArgumentParser: "computation throughout the execution (fewer chunks may be used " "if amounts of work and or input/output file sizes are too small).") parser.add_argument("--gpu-work", default=None, help="Amount of GPU work.") - parser.add_argument("--time", default=None, help="Time limit (in seconds) to complete " + parser.add_argument("--time-limit", default=None, help="Time limit (in seconds) to complete " "the computational portion of the benchmark (overrides CPU and GPU works). " "Is only approximate since I/O time may make the overall time longer.") parser.add_argument("--mem", type=float, default=None, help="Maximum memory consumption (in MB).") @@ -379,15 +379,15 @@ def get_parser() -> argparse.ArgumentParser: return parser -def begin_flowcept(args): +def begin_flowcept(workflow_id, name, used): log_debug("Running with Flowcept.") from flowcept import Flowcept, FlowceptTask # TODO: parametrize to allow storing individual tasks - f = Flowcept(workflow_id=args.workflow_id, - bundle_exec_id=args.workflow_id, + f = Flowcept(workflow_id=workflow_id, + bundle_exec_id=workflow_id, start_persistence=False, save_workflow=False) f.start() - t = FlowceptTask(task_id=f"{args.workflow_id}_{args.name}", workflow_id=args.workflow_id, used={**args.__dict__}) + t = FlowceptTask(task_id=f"{workflow_id}_{name}", workflow_id=workflow_id, used=used) return f, t @@ -396,22 +396,22 @@ def end_flowcept(flowcept, flowcept_task): flowcept.stop() -def compute_num_chunks(args): +def compute_num_chunks(time_limit, cpu_work, gpu_work, num_chunks): # Compute the (feasible number of chunks) min_chunk_size_time = 1.0 # At least 1 second per chunk, if we're doing time-based # TODO: Pick reasonable factors below so that a chunk takes about min_chunk_size_time sec on a reasonable machine min_chunk_size_cpu_work = 3000000 * min_chunk_size_time # 1s on my MacBook Pro min_chunk_size_gpu_work = 30000000 * min_chunk_size_time # unknown..... - if args.time: - num_chunks = min(int(args.num_chunks), int(float(args.time) / min_chunk_size_time)) + if time_limit: + num_chunks = min(int(num_chunks), int(float(time_limit) / min_chunk_size_time)) else: - if args.cpu_work: - num_chunks_cpu = min(int(args.num_chunks), int(float(args.cpu_work) / min_chunk_size_cpu_work)) + if cpu_work: + num_chunks_cpu = min(int(num_chunks), int(float(cpu_work) / min_chunk_size_cpu_work)) else: num_chunks_cpu = 1 - if args.gpu_work: - num_chunks_gpu = min(int(args.num_chunks), int(float(args.gpu_work) / min_chunk_size_gpu_work)) + if gpu_work: + num_chunks_gpu = min(int(num_chunks), int(float(gpu_work) / min_chunk_size_gpu_work)) else: num_chunks_gpu = 1 num_chunks = min(num_chunks_cpu, num_chunks_gpu) @@ -425,36 +425,38 @@ def kill_current_handles(handles: list[ProcessHandle]): handle.terminate_along_with_children() -def main(): - """Main program.""" - parser = get_parser() - args = parser.parse_args() - core = None +def run(workflow_id, name, with_flowcept, silent, debug, rundir, path_lock, path_cores, + time_limit, cpu_work, percent_cpu, mem, gpu_work, num_chunks, + input_files, output_files): + """Main function.""" - if args.with_flowcept: - flowcept, flowcept_task = begin_flowcept(args) + if with_flowcept: + flowcept, flowcept_task = begin_flowcept(workflow_id, name, locals()) + else: + flowcept = None + flowcept_task = None - if args.silent: + if silent: logging.getLogger().setLevel(logging.NOTSET) - if args.debug: + if debug: logging.getLogger().setLevel(logging.DEBUG) - if args.rundir: - rundir = pathlib.Path(args.rundir) + if rundir: + rundir = pathlib.Path(rundir) else: rundir = pathlib.Path(os.getcwd()) - if args.path_lock and args.path_cores: - path_locked = pathlib.Path(args.path_lock) - path_cores = pathlib.Path(args.path_cores) + if path_lock and path_cores: + path_locked = pathlib.Path(path_lock) + path_cores = pathlib.Path(path_cores) core = lock_core(path_locked, path_cores) - - if not args.time and (not args.cpu_work and not args.gpu_work): - log_error("At least one of --time, --cpu-work, or --gpu-work should be provided.") - sys.exit(1) + else: + path_locked = None + path_cores = None + core = None # Compute the (feasible) number of chunks based on the arguments - num_chunks = compute_num_chunks(args) + num_chunks = compute_num_chunks(time_limit, cpu_work, gpu_work, num_chunks) log_debug(f"Executing benchmark with {num_chunks} chunks.") # At this point we know the number of chunks, and we can just iterate as follows (N = num_chunks + 2) @@ -470,17 +472,17 @@ def main(): N = num_chunks + 2 steps = [{"io_read_benchmark": IOReadBenchmark(), "io_write_benchmark": IOWriteBenchmark(), - "cpu_benchmark": CPUBenchmark(cpu_threads=int(10 * args.percent_cpu), - mem_threads=int(10 - 10 * args.percent_cpu), + "cpu_benchmark": CPUBenchmark(cpu_threads=int(10 * percent_cpu), + mem_threads=int(10 - 10 * percent_cpu), core=core, - total_mem=args.mem * 1000 * 1000 if args.mem else None), + total_mem=mem * 1000 * 1000 if mem else None), "gpu_benchmark": GPUBenchmark()} for i in range(N)] min_chunk_size_data = 1000 # 1KB per chunk at a minimum for each input / output file, otherwise the file # is read/written all at once at the beginning/end # Augment I/O read benchmarks for each input file - cleaned_input = "{}" if args.input_files is None else re.sub(r'\\+', '', args.input_files) + cleaned_input = "{}" if input_files is None else re.sub(r'\\+', '', input_files) try: input_files = json.loads(cleaned_input) except json.JSONDecodeError as e: @@ -503,7 +505,7 @@ def main(): steps[step]["io_read_benchmark"].add_read_operation(file_path, opened_file, num_bytes) # Augment I/O write benchmarks for each output file - cleaned_output = "{}" if args.output_files is None else re.sub(r'\\+', '', args.output_files) + cleaned_output = "{}" if output_files is None else re.sub(r'\\+', '', output_files) try: output_files = json.loads(cleaned_output) except json.JSONDecodeError as e: @@ -526,25 +528,25 @@ def main(): steps[step]["io_write_benchmark"].add_write_operation(file_path, opened_file, num_bytes) # Augment CPU benchmark with computation (if need be) - if args.cpu_work: - if args.time: + if cpu_work: + if time_limit: for step in range(1, N-1): steps[step]["cpu_benchmark"].set_infinite_work() else: for step in range(1, N-1): - chunk_work = int(args.cpu_work) // num_chunks + (int(args.cpu_work) % num_chunks > step - 1) + chunk_work = int(cpu_work) // num_chunks + (int(cpu_work) % num_chunks > step - 1) steps[step]["cpu_benchmark"].set_work(chunk_work) # Augment GPU benchmark with computation (if need be) - if args.gpu_work: - if args.time: + if gpu_work: + if time_limit: for step in range(1, N - 1): steps[step]["gpu_benchmark"].set_device() - steps[step]["gpu_benchmark"].set_work(int(args.gpu_work)) - steps[step]["gpu_benchmark"].set_time(float(args.time)) + steps[step]["gpu_benchmark"].set_work(int(gpu_work)) + steps[step]["gpu_benchmark"].set_time(float(time_limit)) else: for step in range(1, N - 1): - chunk_work = int(args.gpu_work) // num_chunks + (int(args.gpu_work) % num_chunks > step - 1) + chunk_work = int(gpu_work) // num_chunks + (int(gpu_work) % num_chunks > step - 1) steps[step]["gpu_benchmark"].set_device() steps[step]["gpu_benchmark"].set_work(chunk_work) @@ -565,9 +567,9 @@ def main(): current_proc_handles[:] = [io_read_process, cpu_benchmark_process, memory_benchmark_process, gpu_benchmark_process] # If time based, sleep the required amount of time and kill the process - if args.time: + if time_limit: if cpu_benchmark_process is not None or gpu_benchmark_process is not None: - time.sleep(float(args.time) / num_chunks) + time.sleep(float(time_limit) / num_chunks) if cpu_benchmark_process is not None: cpu_benchmark_process.terminate_along_with_children() if gpu_benchmark_process is not None: @@ -603,10 +605,22 @@ def main(): if core: unlock_core(path_locked, path_cores, core) - if args.with_flowcept: + if with_flowcept: end_flowcept(flowcept, flowcept_task) - log_info(f"{args.name} benchmark completed") + log_info(f"{name} benchmark completed") + +def main(): + # Parse command-line argument + parser = get_parser() + args = parser.parse_args() + + # Sanity checks + if not args.time_limit and (not args.cpu_work and not args.gpu_work): + log_error("At least one of --time-limit, --cpu-work, or --gpu-work should be provided.") + sys.exit(1) + run(**vars(args)) + if __name__ == "__main__": main() From 9a77ef8bd2467aacafa64ccd4400f6c8549dc4a4 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Fri, 20 Mar 2026 14:28:36 -1000 Subject: [PATCH 24/33] Made the Swift/T translator create a README file with instructions --- wfcommons/wfbench/translator/swift_t.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/wfcommons/wfbench/translator/swift_t.py b/wfcommons/wfbench/translator/swift_t.py index 610beb18..0383ebee 100644 --- a/wfcommons/wfbench/translator/swift_t.py +++ b/wfcommons/wfbench/translator/swift_t.py @@ -116,6 +116,9 @@ def translate(self, output_folder: pathlib.Path) -> None: self._copy_binary_files(output_folder) self._generate_input_files(output_folder) + # write README file + self._write_readme_file(output_folder) + def _find_categories_list(self, task_name: str, parent_task: Optional[str] = None) -> None: """" Find list of task categories ordered by task dependencies. @@ -238,3 +241,17 @@ def _add_tasks(self, category: str) -> None: f"{category}__out[0] = string2int(of_{self.cmd_counter});\n\n" self.cmd_counter += 1 + + def _write_readme_file(self, output_folder: pathlib.Path) -> None: + """ + Write the README file. + + :param output_folder: The path of the output folder. + :type output_folder: pathlib.Path + """ + readme_file_path = output_folder.joinpath("README") + with open(readme_file_path, "w") as out: + out.write(f"Start a REDIS server: redis-server\n") + out.write(f"[Optional] Check that REDIS works: redis-cli ping (it should say \"PONG\")\n") + out.write(f"Run the workflow: swift-t workflow.swift\n") + From f6278049dc1280c0b66248cd43efd18f20693bab Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Fri, 20 Mar 2026 14:52:20 -1000 Subject: [PATCH 25/33] Modified swift-t translator fork-exec wfbench (which is known to be slow) instead of copy-pasting the wfbench code into its code. Later, we'll try to use python_exec --- wfcommons/wfbench/translator/swift_t.py | 11 +- .../templates/swift_t/workflow.swift | 152 +++--------------- 2 files changed, 27 insertions(+), 136 deletions(-) diff --git a/wfcommons/wfbench/translator/swift_t.py b/wfcommons/wfbench/translator/swift_t.py index 0383ebee..407b4afa 100644 --- a/wfcommons/wfbench/translator/swift_t.py +++ b/wfcommons/wfbench/translator/swift_t.py @@ -74,7 +74,7 @@ def translate(self, output_folder: pathlib.Path) -> None: self.logger.debug("Defining input files") in_count = 0 self.output_folder = output_folder - self.cpu_benchmark = output_folder.joinpath("./bin/cpu-benchmark").absolute() + self.wfbench = output_folder.joinpath("./bin/wfbench").absolute() self.script = f"string fs = sprintf(flowcept_start, \"{self.workflow.workflow_id}\");\nstring fss = python_persist(fs);\n\n" if self.workflow.workflow_id else "" self.script += "string root_in_files[];\n" @@ -116,7 +116,7 @@ def translate(self, output_folder: pathlib.Path) -> None: self._copy_binary_files(output_folder) self._generate_input_files(output_folder) - # write README file + # README file self._write_readme_file(output_folder) def _find_categories_list(self, task_name: str, parent_task: Optional[str] = None) -> None: @@ -221,7 +221,7 @@ def _add_tasks(self, category: str) -> None: self.script += f"foreach i in [0:{num_tasks - 1}] {{\n" \ f" string of = sprintf(\"{self.output_folder.absolute()}/data/{category}_%i_output.txt\", i);\n" \ f" string task_id = \"{category}_\" + i;\n" \ - f" string cmd_{self.cmd_counter} = sprintf(command, \"{self.cpu_benchmark}\", task_id, {args});\n" \ + f" string cmd_{self.cmd_counter} = sprintf(command, \"{self.wfbench}\", task_id, {args});\n" \ f" string co_{self.cmd_counter} = python_persist(cmd_{self.cmd_counter});\n" \ f" string of_{self.cmd_counter} = sprintf(\"0%s\", co_{self.cmd_counter});\n" \ f" {category}__out[i] = string2int(of_{self.cmd_counter});\n" \ @@ -235,7 +235,7 @@ def _add_tasks(self, category: str) -> None: self.out_files.add(out_file) args = args.replace( ", of", f", \"{out_file}\"").replace("[i]", "[0]") - self.script += f"string cmd_{self.cmd_counter} = sprintf(command, \"{self.cpu_benchmark}\", \"{category}_{self.cmd_counter}\", {args});\n" \ + self.script += f"string cmd_{self.cmd_counter} = sprintf(command, \"{self.wfbench}\", \"{category}_{self.cmd_counter}\", {args});\n" \ f"string co_{self.cmd_counter} = python_persist(cmd_{self.cmd_counter});\n" \ f"string of_{self.cmd_counter} = sprintf(\"0%s\", co_{self.cmd_counter});\n" \ f"{category}__out[0] = string2int(of_{self.cmd_counter});\n\n" @@ -253,5 +253,4 @@ def _write_readme_file(self, output_folder: pathlib.Path) -> None: with open(readme_file_path, "w") as out: out.write(f"Start a REDIS server: redis-server\n") out.write(f"[Optional] Check that REDIS works: redis-cli ping (it should say \"PONG\")\n") - out.write(f"Run the workflow: swift-t workflow.swift\n") - + out.write(f"Run the workflow: swift-t workflow.swift\n") \ No newline at end of file diff --git a/wfcommons/wfbench/translator/templates/swift_t/workflow.swift b/wfcommons/wfbench/translator/templates/swift_t/workflow.swift index 6097cbc3..272d57b8 100644 --- a/wfcommons/wfbench/translator/templates/swift_t/workflow.swift +++ b/wfcommons/wfbench/translator/templates/swift_t/workflow.swift @@ -25,7 +25,7 @@ logging.basicConfig( handlers=[logging.StreamHandler()] ) -workflow_id = "%s" +workflow_id = "%s" workflow_name = "%s" out_files = [%s] @@ -63,12 +63,12 @@ string command = """ import logging import os +import sys import pathlib import signal import socket import subprocess import time -from pathos.helpers import mp as multiprocessing __import__("logging").basicConfig( level=logging.INFO, @@ -77,14 +77,15 @@ __import__("logging").basicConfig( handlers=[logging.StreamHandler()] ) -cpu_benchmark = "%s" +wfbench = "%s" task_name = "%s" -files_list = ["%s"] +input_file = ["%s"] gpu_work = int(%i) cpu_work = int(%i) percent_cpu = %f cpu_threads = int(10 * percent_cpu) -output_data = {"%s": int(%i)} +output_file = "%s" +output_file_size = int(%i) dep = %i workflow_id = "%s" task_id = f"{workflow_id}_{task_name}" @@ -106,131 +107,22 @@ if 'workflow_id': __import__("logging").info(f"Starting {task_name} Benchmark on {socket.gethostname()}") -procs = [] -cpu_queue = multiprocessing.Queue() -__import__("logging").debug(f"Working directory: {os.getcwd()}") - -__import__("logging").debug("Starting IO benchmark...") -io_proc = None -termination_event = multiprocessing.Event() - -io_proc = multiprocessing.Process( - target=lambda inputs=files_list, outputs=output_data, cpu_queue=cpu_queue, - termination_event=termination_event: ( - memory_limit := 10 * 1024 * 1024, - [open(name, "wb").close() for name in outputs], - io_completed := 0, - bytes_read := {name: 0 for name in inputs}, - bytes_written := {name: 0 for name in outputs}, - input_sizes := {name: __import__("os").path.getsize(name) for name in inputs}, - [ - ( - cpu_percent := cpu_queue.get(timeout=1.0), - should_exit := termination_event.is_set(), - ( - while_loop_var := True, - [ - ( - new_val := ( - cpu_queue.get(timeout = 1.0) - if not cpu_queue.empty() else None - ), - cpu_percent := ( - max(cpu_percent, new_val) - if new_val is not None else cpu_percent - ), - while_loop_var := ( - new_val is not None and not cpu_queue.empty() - ) - ) - for _ in range(100) if while_loop_var - ], - bytes_to_read := { - name: max(0, int(size * (cpu_percent / 100) - bytes_read[name])) - for name, size in input_sizes.items() - }, - bytes_to_write := { - name: max(0, int(size * (cpu_percent / 100) - bytes_written[name])) - for name, size in outputs.items() - }, - __import__("logging").debug("Starting IO Read Benchmark..."), - in_file := list(bytes_to_read.keys())[0], - in_size := list(bytes_to_read.values())[0], - open(in_file, "rb").read(int(in_size)), - __import__("logging").debug("Completed IO Read Benchmark!"), - out_file := list(outputs.keys())[0], - out_size := list(outputs.values())[0], - __import__("logging").debug(f"Writing output file '{out_file}'"), - open(out_file, "ab").write(__import__("os").urandom(int(out_size))), - bytes_read.update({ - name: bytes_read[name] + bytes_to_read[name] - for name in bytes_to_read - }), - bytes_written.update({ - name: bytes_written[name] + bytes_to_write[name] - for name in bytes_to_write - }), - - __import__("logging").debug(f"Bytes Read: {bytes_read}"), - __import__("logging").debug(f"Bytes Written: {bytes_written}"), - io_completed := cpu_percent, - ) if cpu_percent is not None else time.sleep(0.1), - not (should_exit or io_completed >= 100) - ) - for _ in range(1000000) - if not (io_completed >= 100 or termination_event.is_set()) - ], - __import__("logging").info("IO benchmark completed") - ) -) -io_proc.start() -procs.append(io_proc) - -if cpu_work > 0: - __import__("logging").info(f"Starting CPU and Memory Benchmarks for {task_name}...") - - mem_threads = 10 - cpu_threads - cpu_work_per_thread = int(cpu_work / cpu_threads) - - cpu_procs = [] - mem_procs = [] - cpu_prog = [f"{cpu_benchmark}", f"{cpu_work_per_thread}"] - mem_prog = ["stress-ng", "--vm", f"{mem_threads}", - "--vm-bytes", "0.05%%", "--vm-keep"] - - for i in range(cpu_threads): - cpu_proc = subprocess.Popen(cpu_prog, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - cpu_procs.append(cpu_proc) - monitor_thread = multiprocessing.Process( - target=lambda proc=cpu_proc, queue=cpu_queue: - [ - queue.put(float(line.strip().split()[1].strip('%%'))) - for line in iter(proc.stdout.readline, "") - if line.strip() and line.strip().startswith("Progress:") - ] - ) - monitor_thread.start() - - if mem_threads > 0: - mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid) - mem_procs.append(mem_proc) - - procs.extend(cpu_procs) - for proc in procs: - if isinstance(proc, subprocess.Popen): - proc.wait() - if io_proc is not None and io_proc.is_alive(): - io_proc.join() - - for mem_proc in mem_procs: - try: - os.kill(mem_proc.pid, signal.SIGKILL) - except subprocess.TimeoutExpired: - __import__("logging").debug("Memory process did not terminate; force-killing.") - subprocess.Popen(["pkill", "-f", "stress-ng"]).wait() - - __import__("logging").info("Completed CPU and Memory Benchmarks!") - +cmd = [ + sys.executable, wfbench, + "--name", task_name, + "--workflow_id", workflow_id, + "--percent-cpu", str(percent_cpu), + "--cpu-work", str(cpu_work), + "--output-files", f'{{"{output_file}": {output_file_size}}}', + "--input-files", str(input_file).replace("'", '"'), + "--with-flowcept", +] +if gpu_work: + cmd += ["--gpu-work", str(gpu_work)] + +logging.info(f"Launching wfbench for task {task_name}") +proc = subprocess.run(cmd) + __import__("logging").info(f"Benchmark {task_name} completed!") if 'workflow_id': From 03b058a43e0c1458add3e95bcd059454b8611578 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Fri, 20 Mar 2026 15:59:02 -1000 Subject: [PATCH 26/33] Made Swift/T translator use python_exec() --- tests/translators_loggers/Dockerfile.swiftt | 1 + .../test_translators_loggers.py | 22 +++++------ .../templates/swift_t/workflow.swift | 39 ++++++++++++------- 3 files changed, 36 insertions(+), 26 deletions(-) diff --git a/tests/translators_loggers/Dockerfile.swiftt b/tests/translators_loggers/Dockerfile.swiftt index 98dc56a6..2bf9a12b 100644 --- a/tests/translators_loggers/Dockerfile.swiftt +++ b/tests/translators_loggers/Dockerfile.swiftt @@ -56,6 +56,7 @@ RUN conda tos accept --override-channels --channel https://repo.anaconda.com/pkg conda create -n swiftt-env python=3.11 -y RUN conda run -n swiftt-env python --version RUN conda run -n swiftt-env pip install flowcept +RUN conda run -n swiftt-env pip install pandas filelock RUN conda run -n swiftt-env pip install py-cpuinfo psutil redis RUN conda run -n swiftt-env conda install -y -c conda-forge gcc zsh zlib pathos RUN conda run -n swiftt-env conda install -y -c swift-t swift-t diff --git a/tests/translators_loggers/test_translators_loggers.py b/tests/translators_loggers/test_translators_loggers.py index 0ccc010c..50dc5b38 100644 --- a/tests/translators_loggers/test_translators_loggers.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -269,17 +269,17 @@ class TestTranslators: "backend", [ "swiftt", - "dask", - "parsl", - "nextflow", - "nextflow_subworkflow", - "airflow", - "bash", - "taskvine", - "makeflow", - "cwl", - "streamflow", - "pegasus", + "dask", + "parsl", + "nextflow", + "nextflow_subworkflow", + "airflow", + "bash", + "taskvine", + "makeflow", + "cwl", + "streamflow", + "pegasus", ]) @pytest.mark.unit # @pytest.mark.skip(reason="tmp") diff --git a/wfcommons/wfbench/translator/templates/swift_t/workflow.swift b/wfcommons/wfbench/translator/templates/swift_t/workflow.swift index 272d57b8..1b740f12 100644 --- a/wfcommons/wfbench/translator/templates/swift_t/workflow.swift +++ b/wfcommons/wfbench/translator/templates/swift_t/workflow.swift @@ -107,21 +107,30 @@ if 'workflow_id': __import__("logging").info(f"Starting {task_name} Benchmark on {socket.gethostname()}") -cmd = [ - sys.executable, wfbench, - "--name", task_name, - "--workflow_id", workflow_id, - "--percent-cpu", str(percent_cpu), - "--cpu-work", str(cpu_work), - "--output-files", f'{{"{output_file}": {output_file_size}}}', - "--input-files", str(input_file).replace("'", '"'), - "--with-flowcept", -] -if gpu_work: - cmd += ["--gpu-work", str(gpu_work)] - -logging.info(f"Launching wfbench for task {task_name}") -proc = subprocess.run(cmd) +from importlib.util import spec_from_file_location, module_from_spec +from importlib.machinery import SourceFileLoader +loader = SourceFileLoader("wfbench", wfbench) +spec = spec_from_file_location("wfbench", wfbench, loader=loader) +mod = module_from_spec(spec) +spec.loader.exec_module(mod) +mod.run( + name=task_name, + workflow_id=workflow_id, + percent_cpu=percent_cpu, + cpu_work=cpu_work, + mem=None, + gpu_work=gpu_work, + output_files=f'{{"{output_file}": {output_file_size}}}', + input_files=str(input_file).replace("'", '"'), + with_flowcept=True, + silent=False, + debug=False, + rundir=None, + path_lock=None, + path_cores=None, + time_limit=None, + num_chunks=10, +) __import__("logging").info(f"Benchmark {task_name} completed!") From 50bc15885deff0e226d7fdd560bbaaee8b3d7bd0 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Sat, 21 Mar 2026 09:53:46 -1000 Subject: [PATCH 27/33] test re-enabling --- tests/translators_loggers/test_translators_loggers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/translators_loggers/test_translators_loggers.py b/tests/translators_loggers/test_translators_loggers.py index 50dc5b38..a48b2085 100644 --- a/tests/translators_loggers/test_translators_loggers.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -268,7 +268,7 @@ class TestTranslators: @pytest.mark.parametrize( "backend", [ - "swiftt", + "swiftt", "dask", "parsl", "nextflow", From 32963ae4bc81530adff4cb3e27cb200dafaf4d01 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Sat, 21 Mar 2026 10:29:30 -1000 Subject: [PATCH 28/33] test updates --- .../test_translators_loggers.py | 50 +++++++++++++------ 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/tests/translators_loggers/test_translators_loggers.py b/tests/translators_loggers/test_translators_loggers.py index a48b2085..b33f80e7 100644 --- a/tests/translators_loggers/test_translators_loggers.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -129,7 +129,9 @@ def _additional_setup_swiftt(container): def run_workflow_dask(container, num_tasks, str_dirpath): exit_code, output = container.exec_run("python ./dask_workflow.py", user="wfcommons", stdout=True, stderr=True) # Check sanity - assert (exit_code == 0) + if exit_code != 0: + print(output.decode()) + assert False assert (output.decode().count("benchmark completed") == num_tasks) # TODO: Look at the (I think) generated run.json file on the container? @@ -137,7 +139,9 @@ def run_workflow_parsl(container, num_tasks, str_dirpath): exit_code, output = container.exec_run("python ./parsl_workflow.py", user="wfcommons", stdout=True, stderr=True) ignored, output = container.exec_run(f"cat {str_dirpath}/runinfo/000/parsl.log", user="wfcommons", stdout=True, stderr=True) # Check sanity - assert (exit_code == 0) + if exit_code != 0: + print(output.decode()) + assert False assert ("completed" in output.decode()) assert (output.decode().count("_complete_task") == num_tasks) @@ -146,7 +150,9 @@ def run_workflow_nextflow(container, num_tasks, str_dirpath): exit_code, output = container.exec_run(f"nextflow run ./workflow.nf --pwd .", user="wfcommons", stdout=True, stderr=True) ignored, task_exit_codes = container.exec_run("find . -name .exitcode -exec cat {} \;", user="wfcommons", stdout=True, stderr=True) # Check sanity - assert (exit_code == 0) + if exit_code != 0: + print(output.decode()) + assert False assert (task_exit_codes.decode() == num_tasks * "0") def run_workflow_airflow(container, num_tasks, str_dirpath): @@ -155,14 +161,18 @@ def run_workflow_airflow(container, num_tasks, str_dirpath): exit_code, output = container.exec_run(cmd=["sh", "-c", "cd /home/wfcommons/ && sudo /bin/bash /run_a_workflow.sh Blast-Benchmark"], user="wfcommons", stdout=True, stderr=True) # Check sanity - assert (exit_code == 0) + if exit_code != 0: + print(output.decode()) + assert False assert (output.decode().count("completed") == num_tasks * 2) def run_workflow_bash(container, num_tasks, str_dirpath): # Run the workflow! exit_code, output = container.exec_run(cmd="/bin/bash ./run_workflow.sh", user="wfcommons", stdout=True, stderr=True) # Check sanity - assert (exit_code == 0) + if exit_code != 0: + print(output.decode()) + assert False assert (output.decode().count("benchmark completed") == num_tasks) def run_workflow_taskvine(container, num_tasks, str_dirpath): @@ -170,7 +180,9 @@ def run_workflow_taskvine(container, num_tasks, str_dirpath): exit_code, output = container.exec_run(cmd=["bash", "-c", "source ~/conda/etc/profile.d/conda.sh && conda activate && python3 ./taskvine_workflow.py"], user="wfcommons", stdout=True, stderr=True) # Check sanity - assert (exit_code == 0) + if exit_code != 0: + print(output.decode()) + assert False assert (output.decode().count("completed") == num_tasks) def run_workflow_makeflow(container, num_tasks, str_dirpath): @@ -178,7 +190,9 @@ def run_workflow_makeflow(container, num_tasks, str_dirpath): exit_code, output = container.exec_run(cmd=["bash", "-c", "source ~/conda/etc/profile.d/conda.sh && conda activate && makeflow --log-verbose --monitor=./monitor_data/ ./workflow.makeflow"], user="wfcommons", stdout=True, stderr=True) # Check sanity - assert (exit_code == 0) + if exit_code != 0: + print(output.decode()) + assert False num_completed_jobs = len(re.findall(r'job \d+ completed', output.decode())) assert (num_completed_jobs == num_tasks) @@ -188,8 +202,10 @@ def run_workflow_cwl(container, num_tasks, str_dirpath): exit_code, output = container.exec_run(cmd="cwltool ./main.cwl --split_fasta_00000001_input ./data/workflow_infile_0001 ", user="wfcommons", stdout=True, stderr=True) # Check sanity - assert (exit_code == 0) - # this below is ugly (the 3 is for "workflow", "compile_output_files" and "compile_log_files", + if exit_code != 0: + print(output.decode()) + assert False + # this below is ugly (the 3 is for "workflow", "compile_output_files" and "compile_log_files", # and there is a 2* because there is a message for the job and for the step) assert (output.decode().count("completed success") == 3 + 2 *num_tasks) @@ -199,8 +215,10 @@ def run_workflow_streamflow(container, num_tasks, str_dirpath): exit_code, output = container.exec_run(cmd="streamflow run ./streamflow.yml", user="wfcommons", stdout=True, stderr=True) # Check sanity - assert (exit_code == 0) - # 2 extra "COMPLETED Step" ("COMPLETED Step /compile_output_files", "COMPLETED Step /compile_log_files") + if exit_code != 0: + print(output.decode()) + assert False + # 2 extra "COMPLETED Step" ("COMPLETED Step /compile_output_files", "COMPLETED Step /compile_log_files") assert (output.decode().count("COMPLETED Step") == num_tasks + 2) # Generate RO-Crate now that the workflow has completed (Fails for now) @@ -219,18 +237,20 @@ def run_workflow_pegasus(container, num_tasks, str_dirpath): exit_code, output = container.exec_run(cmd="bash /home/wfcommons/run_workflow.sh", user="wfcommons", stdout=True, stderr=True) # Check sanity - assert(exit_code == 0) + if exit_code != 0: + print(output.decode()) + assert False assert("success" in output.decode()) def run_workflow_swiftt(container, num_tasks, str_dirpath): # Run the workflow! exit_code, output = container.exec_run(cmd="swift-t workflow.swift", user="wfcommons", stdout=True, stderr=True) - # sys.stderr.write(output.decode()) # Check sanity - assert(exit_code == 0) + if exit_code != 0: + print(output.decode()) + assert False assert (output.decode().count("completed!") == num_tasks) - pass run_workflow_methods = { "dask": run_workflow_dask, From f08f54e691034d27f989db9bb71fb82c97a3e2e0 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Sat, 21 Mar 2026 15:29:58 -1000 Subject: [PATCH 29/33] Removed all traces of cpu-benchmark.cpp --- Makefile | 40 --------- bin/cpu-benchmark.cpp | 89 ------------------- .../source/generating_workflow_benchmarks.rst | 4 +- setup.py | 43 ++++----- tests/test_helpers.py | 18 +--- .../test_translators_loggers.py | 2 +- .../wfbench/translator/abstract_translator.py | 1 - wfcommons/wfbench/translator/pegasus.py | 1 - wfcommons/wfbench/translator/taskvine.py | 1 - .../translator/templates/pegasus_template.py | 6 +- .../translator/templates/taskvine_template.py | 1 - wfcommons/wfinstances/logs/taskvine.py | 2 +- 12 files changed, 32 insertions(+), 176 deletions(-) delete mode 100644 Makefile delete mode 100644 bin/cpu-benchmark.cpp diff --git a/Makefile b/Makefile deleted file mode 100644 index c8505c7a..00000000 --- a/Makefile +++ /dev/null @@ -1,40 +0,0 @@ -# Makefile - -objects = bin/cpu-benchmark.o -CXX= g++ -CPPFLAGS= -std=c++11 -execname = bin/cpu-benchmark - -# compile -$(execname): $(objects) - $(CXX) $(CPPFLAGS) -o $(execname) $(objects) - -#clean Makefile -clean: - rm -rf $(objects) $(execname) - - -# #define variables -# objects= gpu-benchmark.o kernels.o -# NVCC= nvcc #cuda c compiler -# CPPFLAGS= -std=c++11 -# opt= -O2 #optimization flag -# LIBS= -# execname= gpu-benchmark - -# .PHONY: clean - -# #compile -# $(execname): $(objects) -# $(NVCC) $(CPPFLAGS) $(opt) -o $(execname) $(objects) $(LIBS) - -# kernels.o: kernels.cu -# $(NVCC) $(CPPFLAGS) $(opt) -c kernels.cu -# gpu-benchmark.o: gpu-benchmark.cu -# $(NVCC) $(CPPFLAGS) $(opt) -c gpu-benchmark.cu - - -# #clean Makefile -# clean: -# rm $(objects) -# #end of Makefile diff --git a/bin/cpu-benchmark.cpp b/bin/cpu-benchmark.cpp deleted file mode 100644 index 359d6175..00000000 --- a/bin/cpu-benchmark.cpp +++ /dev/null @@ -1,89 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include - -#define PRECISION 100000L - -/** - * This function computes pi using work trials for a Monte Carlo method. It's - * GOOD because it uses a good number generator. Unfortunately, this method - * leads to extra memory references that cause extra RAM references, and thus - * extra cache misses. - */ - -void compute_good_pi(long work) { - - std::uniform_real_distribution random_dist(-0.5, 0.5); - std::mt19937 rng; - rng.seed((long)&work); - - double good_pi = 0.0; - double x, y; - for (long sample = 0; sample < work; sample++) { - x = random_dist(rng); - y = random_dist(rng); - good_pi += (double)(std::sqrt(x * x + y * y) < 0.5); - - // Print progress every 1% of completion - if (sample % (work / 100) == 0) { - double progress = (double)sample / work * 100; - std::cout << "\rProgress: " << std::fixed << std::setprecision(2) << progress << "%" << std::flush; - } - } - std::cout << "\rProgress: 100.00%\n"; // Ensure full progress is displayed -} - -/** - * This function computes pi using work trials for a Monte Carlo method. It's - * TERRIBLE because it uses a custom, bad, number generator, which has too - * much bias to compute a good value a PI. The reason for using the generator - * is that it does not cause extra memory references, and thus keeps this benchmark - * 100% CPU intensive. - */ -void compute_terrible_pi(long work) { - - long rng = (long)&work; - double terrible_pi = 0.0; - double x_value, y_value; - for (long sample = 0; sample < work; sample++) { - rng = (((rng * 214013L + 2531011L) >> 16) & 32767); - x_value = -0.5 + (rng % PRECISION) / (double)PRECISION; - rng = (((rng * 214013L + 2531011L) >> 16) & 32767); - y_value = -0.5 + (rng % PRECISION) / (double)PRECISION; - terrible_pi += (double)(std::sqrt(x_value * x_value + y_value * y_value) < 0.5); - - // Print progress every 1% of completion - if (sample % (work / 100) == 0) { - double progress = (double)sample / work * 100; - std::cout << "\rProgress: " << std::fixed << std::setprecision(2) << progress << "%" << std::flush; - } - } - std::cout << "\rProgress: 100.00%\n"; // Ensure full progress is displayed -} - -int main(int argc, char **argv) { - - // Process command-line args - if (argc != 2) { - std::cerr << "Usage: " << argv[0] << " \n"; - exit(1); - } - - long work; - try { - work = std::stol(argv[1]); - } catch (std::invalid_argument &e) { - std::cerr << "Invalid argument: " << e.what() << "\n"; - exit(1); - } - - // Compute Pi using terrible method - compute_terrible_pi(1000000 * work); - std::cout << "Pi computation completed!\n"; - - exit(0); -} diff --git a/docs/source/generating_workflow_benchmarks.rst b/docs/source/generating_workflow_benchmarks.rst index a980e2e0..46067634 100644 --- a/docs/source/generating_workflow_benchmarks.rst +++ b/docs/source/generating_workflow_benchmarks.rst @@ -35,9 +35,7 @@ recipe to generate a task graph. Once the task graph has been generated, each ta is set to be an instance of the workflow task benchmark. For each task, the following values for the parameters of the workflow task benchmark can be specified: -- :code:`cpu_work`: CPU work per workflow task. The :code:`cpu-benchmark` executable - (compiled C++) calculates an increasingly precise value of π up until the specified - total amount of computation (cpu_work) has been performed. +- :code:`cpu_work`: CPU work per workflow task. - :code:`data`: Individual data volumes for each task in a way that is coherent with respect to task data dependencies (in the form of a dictionary of input size files per workflow task type). Alternatively, a total data footprint (in MB) diff --git a/setup.py b/setup.py index a04c14d5..3ac2be74 100644 --- a/setup.py +++ b/setup.py @@ -16,32 +16,33 @@ from setuptools.command.build_ext import build_ext class Build(build_ext): - """Customized setuptools build command - builds cpu-benchmark on build.""" + """Customized setuptools build command""" def run(self): - # Try to build the cpu-benchmark, but make it optional - # This allows installation on Windows where make/g++ may not be available - try: - result = subprocess.call(["make"], stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL) - if result != 0: - sys.stderr.write("Warning: 'make' build failed. cpu-benchmark will not be available.\n") - sys.stderr.write("This is expected on Windows. To build cpu-benchmark, install make and g++.\n") - except (FileNotFoundError, OSError): - sys.stderr.write("Warning: 'make' is not installed. cpu-benchmark will not be available.\n") - sys.stderr.write("This is expected on Windows. To build cpu-benchmark, install make and g++.\n") - super().run() + pass + # OLD CODE TO build a C++ executable + # # This allows installation on Windows where make/g++ may not be available + # try: + # result = subprocess.call(["make"], stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL) + # if result != 0: + # sys.stderr.write("Warning: 'make' build failed. cpu-benchmark will not be available.\n") + # sys.stderr.write("This is expected on Windows. To build cpu-benchmark, install make and g++.\n") + # except (FileNotFoundError, OSError): + # sys.stderr.write("Warning: 'make' is not installed. cpu-benchmark will not be available.\n") + # sys.stderr.write("This is expected on Windows. To build cpu-benchmark, install make and g++.\n") + # super().run() # Conditionally include cpu-benchmark based on platform data_files = [] -if sys.platform != 'win32': - # On Unix-like systems (Linux, macOS, Docker), always try to include it - # The Build class will create it during the build process - data_files.append(('bin', ['bin/cpu-benchmark'])) -else: - # On Windows, only include if it exists (e.g., if user manually compiled it) - cpu_benchmark_path = 'bin/cpu-benchmark' - if os.path.exists(cpu_benchmark_path): - data_files.append(('bin', [cpu_benchmark_path])) +# if sys.platform != 'win32': +# # On Unix-like systems (Linux, macOS, Docker), always try to include it +# # The Build class will create it during the build process +# data_files.append(('bin', ['bin/cpu-benchmark'])) +# else: +# # On Windows, only include if it exists (e.g., if user manually compiled it) +# cpu_benchmark_path = 'bin/cpu-benchmark' +# if os.path.exists(cpu_benchmark_path): +# data_files.append(('bin', [cpu_benchmark_path])) setup( packages=find_packages(), diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 4e824d12..bc9ae020 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -57,13 +57,8 @@ def _install_WfCommons_on_container(container): # Cleanup files that came from the host exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/build/", user="wfcommons", stdout=True, stderr=True) exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/*.egg-info/", user="wfcommons", stdout=True, stderr=True) - # Clean up and force a rebuild of cpu-benchmark (because it may be compiled for the wrong architecture) - exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark.o", user="wfcommons", stdout=True, - stderr=True) - exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark", user="wfcommons", stdout=True, - stderr=True) - # Install WfCommons on the container (to install wfbench and cpu-benchmark really) + # Install WfCommons on the container (to install wfbench really) exit_code, output = container.exec_run("sudo python3 -m pip install . --break-system-packages", user="wfcommons", workdir="/tmp/WfCommons", stdout=True, stderr=True) @@ -103,22 +98,17 @@ def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command= # Installing WfCommons on container _install_WfCommons_on_container(container) - # Copy over the wfbench and cpu-benchmark executables to where they should go on the container + # Copy over the wfbench executable to where they should go on the container if bin_dir: - sys.stderr.write(f"[{backend}] Copying wfbench and cpu-benchmark...\n") + sys.stderr.write(f"[{backend}] Copying wfbench...\n") exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which wfbench` " + bin_dir], user="wfcommons", stdout=True, stderr=True) if exit_code != 0: raise RuntimeError("Failed to copy wfbench script to the bin directory") - exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which cpu-benchmark` " + bin_dir], - user="wfcommons", - stdout=True, stderr=True) - if exit_code != 0: - raise RuntimeError("Failed to copy cpu-benchmark executable to the bin directory") else: - sys.stderr.write(f"[{backend}] Not Copying wfbench and cpu-benchmark...\n") + sys.stderr.write(f"[{backend}] Not Copying wfbench...\n") container.backend = backend return container diff --git a/tests/translators_loggers/test_translators_loggers.py b/tests/translators_loggers/test_translators_loggers.py index b33f80e7..20ca5e3f 100644 --- a/tests/translators_loggers/test_translators_loggers.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -348,7 +348,7 @@ def test_translator(self, backend) -> None: if backend == "pegasus": parser = PegasusLogsParser(dirpath / "work/wfcommons/pegasus/Blast-Benchmark/run0001/") elif backend == "taskvine": - parser = TaskVineLogsParser(dirpath / "vine-run-info/most-recent/vine-logs", filenames_to_ignore=["cpu-benchmark","stress-ng", "wfbench"]) + parser = TaskVineLogsParser(dirpath / "vine-run-info/most-recent/vine-logs", filenames_to_ignore=["stress-ng", "wfbench"]) elif backend == "makeflow": parser = MakeflowLogsParser(execution_dir = dirpath, resource_monitor_logs_dir = dirpath / "monitor_data/") elif backend == "streamflow": diff --git a/wfcommons/wfbench/translator/abstract_translator.py b/wfcommons/wfbench/translator/abstract_translator.py index 47c6afb2..435c18ac 100644 --- a/wfcommons/wfbench/translator/abstract_translator.py +++ b/wfcommons/wfbench/translator/abstract_translator.py @@ -94,7 +94,6 @@ def _copy_binary_files(self, output_folder: pathlib.Path) -> None: bin_folder.mkdir(exist_ok=True) shutil.copy(shutil.which("wfbench"), bin_folder) - shutil.copy(shutil.which("cpu-benchmark"), bin_folder) def _generate_input_files(self, output_folder: pathlib.Path) -> None: """ diff --git a/wfcommons/wfbench/translator/pegasus.py b/wfcommons/wfbench/translator/pegasus.py index 33c9aabc..348fd5b5 100644 --- a/wfcommons/wfbench/translator/pegasus.py +++ b/wfcommons/wfbench/translator/pegasus.py @@ -65,7 +65,6 @@ def translate(self, output_folder: pathlib.Path, tasks_priorities: Optional[Dict " is_stageable=True)\n" \ "transformation.add_env(PATH='/usr/bin:/bin:.')\n" \ "transformation.add_profiles(Namespace.CONDOR, 'request_disk', '10')\n" \ - "transformation.add_requirement(t_cpu_benchmark)\n" \ "tc.add_transformations(transformation)\n\n" # adding tasks diff --git a/wfcommons/wfbench/translator/taskvine.py b/wfcommons/wfbench/translator/taskvine.py index 2e3bcbaa..e1179eb0 100644 --- a/wfcommons/wfbench/translator/taskvine.py +++ b/wfcommons/wfbench/translator/taskvine.py @@ -118,7 +118,6 @@ def _add_task(self, task_name: str, parent_task: Optional[str] = None) -> list[s f_counter = 1 task_script = f"t_{self.task_counter}.add_poncho_package(poncho_pkg)\n" \ f"t_{self.task_counter}.add_input(wfbench, 'wfbench')\n" \ - f"t_{self.task_counter}.add_input(cpu_bench, 'cpu-benchmark')\n" \ f"t_{self.task_counter}.add_input(stress_ng, 'stress-ng')\n" input_spec = "\"[" for file in task.input_files: diff --git a/wfcommons/wfbench/translator/templates/pegasus_template.py b/wfcommons/wfbench/translator/templates/pegasus_template.py index fc956660..39bd4b21 100644 --- a/wfcommons/wfbench/translator/templates/pegasus_template.py +++ b/wfcommons/wfbench/translator/templates/pegasus_template.py @@ -22,9 +22,9 @@ def which(file): tc = TransformationCatalog() rc = ReplicaCatalog() -t_cpu_benchmark = Transformation('cpu-benchmark', site='local', -pfn = os.getcwd() + '/bin/cpu-benchmark', is_stageable=True) -tc.add_transformations(t_cpu_benchmark) +# t_cpu_benchmark = Transformation('cpu-benchmark', site='local', +# pfn = os.getcwd() + '/bin/cpu-benchmark', is_stageable=True) +# tc.add_transformations(t_cpu_benchmark) transformation_path = os.getcwd() + '/bin/wfbench' task_output_files = {} diff --git a/wfcommons/wfbench/translator/templates/taskvine_template.py b/wfcommons/wfbench/translator/templates/taskvine_template.py index 496ae82f..48b76ea2 100644 --- a/wfcommons/wfbench/translator/templates/taskvine_template.py +++ b/wfcommons/wfbench/translator/templates/taskvine_template.py @@ -46,7 +46,6 @@ def wait_for_tasks_completion(): # wfbench executable files wfbench = m.declare_file("bin/wfbench", cache="workflow") -cpu_bench = m.declare_file("bin/cpu-benchmark", cache="workflow") stress_ng = m.declare_file(shutil.which("stress-ng"), cache="workflow") # Generated code goes here diff --git a/wfcommons/wfinstances/logs/taskvine.py b/wfcommons/wfinstances/logs/taskvine.py index 2003d853..136872d9 100644 --- a/wfcommons/wfinstances/logs/taskvine.py +++ b/wfcommons/wfinstances/logs/taskvine.py @@ -45,7 +45,7 @@ class TaskVineLogsParser(LogsParser): are input to tasks. This argument is the list of names of files that should be ignored in the reconstructed instances, which typically do not include such files at task input. For instance, if reconstructing a workflow from an execution - of a WfBench-generated benchmark, one could pass ["wfbench", "cpu-benchmark", "stress-ng"] + of a WfBench-generated benchmark, one could pass ["wfbench", "stress-ng"] :type filenames_to_ignore: List[str] :param description: Workflow instance description. :type description: Optional[str] From 3aa7021b4f9a63fdaf93a70b0f0274b86624c26d Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Mon, 23 Mar 2026 08:45:56 -1000 Subject: [PATCH 30/33] added a sleep to let redis server time to start in the swift/t container --- tests/translators_loggers/test_translators_loggers.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/translators_loggers/test_translators_loggers.py b/tests/translators_loggers/test_translators_loggers.py index 20ca5e3f..29362636 100644 --- a/tests/translators_loggers/test_translators_loggers.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -101,6 +101,9 @@ def _additional_setup_swiftt(container): cmd=["bash", "-c", "redis-server"], user="wfcommons", detach=True, stdout=True, stderr=True) # Note that exit_code will always be None because of detach=True. + # Give redis time to start! + time.sleep(1) + # Check that the redis-server is up exit_code, output = container.exec_run( cmd=["bash", "-c", "redis-cli ping"], user="wfcommons", stdout=True, stderr=True) From 0a99665505783e44c1cd6e135f915762ad9dd766 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Mon, 23 Mar 2026 09:28:13 -1000 Subject: [PATCH 31/33] small test fix/cleanup --- tests/translators_loggers/test_translators_loggers.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/translators_loggers/test_translators_loggers.py b/tests/translators_loggers/test_translators_loggers.py index 29362636..f29304f4 100644 --- a/tests/translators_loggers/test_translators_loggers.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -372,3 +372,6 @@ def test_translator(self, backend) -> None: # Shutdown the container (weirdly, container is already shutdown by now... not sure how) _shutdown_docker_container_and_remove_image(container) + # Remove the created local directory + _remove_local_dir_if_it_exists(str_dirpath) + From 225b6898bf3149e89c3e9bef894e817f2ed933ec Mon Sep 17 00:00:00 2001 From: Steven Hahn Date: Mon, 23 Mar 2026 17:19:45 -0400 Subject: [PATCH 32/33] cleanup Signed-off-by: Steven Hahn --- bin/wfbench | 1 + 1 file changed, 1 insertion(+) diff --git a/bin/wfbench b/bin/wfbench index f7ae4fcd..2ff77fd6 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -18,6 +18,7 @@ import argparse import re import json import logging +import pandas as pd import psutil from io import StringIO From a486c9216d066b921ef8055ca4ed8040690b373e Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Fri, 27 Mar 2026 11:13:29 -1000 Subject: [PATCH 33/33] Update bin/wfbench Co-authored-by: Steven Hahn --- bin/wfbench | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/wfbench b/bin/wfbench index 2ff77fd6..f5f63eba 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -394,7 +394,7 @@ def compute_num_chunks(time_limit, cpu_work, gpu_work, num_chunks): num_chunks = min(int(num_chunks), int(float(time_limit) / min_chunk_size_time)) else: if cpu_work: - num_chunks_cpu = min(int(num_chunks), int(float(cpu_work) / min_chunk_size_cpu_work)) + num_chunks_cpu = min(num_chunks, cpu_work // min_chunk_size_cpu_work) else: num_chunks_cpu = 1 if gpu_work: