Skip to content

Runner Internals

Process Management

runner.process_manager

Process management utilities for orchestrating ROS 2 and simulator processes.

This module provides robust process lifecycle management with proper signal handling and process group management for clean shutdown of complex process trees.

ManagedProcess dataclass

Container for a managed subprocess with associated metadata.

Attributes:

Name Type Description
name str

Human-readable identifier for the process

popen Popen

The subprocess.Popen instance

log_path Path

Path to the log file capturing stdout/stderr

Source code in runner/process_manager.py
@dataclass
class ManagedProcess:
    """Container for a managed subprocess with associated metadata.

    Attributes:
        name: Human-readable identifier for the process
        popen: The subprocess.Popen instance
        log_path: Path to the log file capturing stdout/stderr
    """
    name: str
    popen: subprocess.Popen
    log_path: Path

ProcessManager

Manages the lifecycle of multiple subprocesses with centralized logging.

This class handles starting, monitoring, and gracefully stopping process trees. It uses process groups (via os.setsid) to ensure child processes are properly terminated when the parent is stopped.

Parameters:

Name Type Description Default
logs_dir str

Directory where process logs will be written

required
Source code in runner/process_manager.py
class ProcessManager:
    """Manages the lifecycle of multiple subprocesses with centralized logging.

    This class handles starting, monitoring, and gracefully stopping process trees.
    It uses process groups (via os.setsid) to ensure child processes are properly
    terminated when the parent is stopped.

    Args:
        logs_dir: Directory where process logs will be written
    """

    def __init__(self, logs_dir: str):
        self.logs_dir = Path(logs_dir)
        self.logs_dir.mkdir(parents=True, exist_ok=True)
        self.procs: dict[str, ManagedProcess] = {}

    def start(self, name: str, cmd: list[str], env: dict[str, str] | None = None, cwd: str | None = None) -> None:
        """Start a new managed process.

        Args:
            name: Unique identifier for this process
            cmd: Command and arguments as a list
            env: Optional environment variables to merge with os.environ
            cwd: Optional working directory for the process

        Raises:
            RuntimeError: If a process with this name already exists
        """
        if name in self.procs:
            raise RuntimeError(f"process already exists: {name}")

        log_path = self.logs_dir / f"{name}.log"
        f = open(log_path, "wb")

        merged_env = os.environ.copy()
        if env:
            merged_env.update({str(k): str(v) for k, v in env.items()})

        # Create a process group so we can SIGINT the whole tree.
        popen = subprocess.Popen(
            cmd,
            stdout=f,
            stderr=subprocess.STDOUT,
            cwd=cwd,
            env=merged_env,
            preexec_fn=os.setsid,   # POSIX only, OK for Ubuntu
        )
        self.procs[name] = ManagedProcess(name=name, popen=popen, log_path=log_path)

    def is_running(self, name: str) -> bool:
        """Check if a managed process is still running.

        Args:
            name: Process identifier

        Returns:
            True if the process is running, False otherwise
        """
        p = self.procs[name].popen
        return p.poll() is None

    def stop(self, name: str, sigint_timeout_s: float = 8.0, sigterm_timeout_s: float = 4.0) -> None:
        """Gracefully stop a managed process with escalating signals.

        Attempts to stop the process using a three-stage approach:
        1. SIGINT (ROS 2 friendly) - wait up to sigint_timeout_s
        2. SIGTERM - wait up to sigterm_timeout_s
        3. SIGKILL - forceful termination

        Args:
            name: Process identifier
            sigint_timeout_s: Seconds to wait after SIGINT before escalating
            sigterm_timeout_s: Seconds to wait after SIGTERM before SIGKILL
        """
        mp = self.procs.get(name)
        if not mp:
            return
        p = mp.popen
        if p.poll() is not None:
            return

        pgid = os.getpgid(p.pid)

        # 1) SIGINT (ROS2-friendly)
        os.killpg(pgid, signal.SIGINT)
        t0 = time.time()
        while time.time() - t0 < sigint_timeout_s:
            if p.poll() is not None:
                return
            time.sleep(0.1)

        # 2) SIGTERM
        os.killpg(pgid, signal.SIGTERM)
        t1 = time.time()
        while time.time() - t1 < sigterm_timeout_s:
            if p.poll() is not None:
                return
            time.sleep(0.1)

        # 3) SIGKILL
        os.killpg(pgid, signal.SIGKILL)

    def stop_all(self) -> None:
        """Stop all managed processes.

        Iterates through all processes and attempts to stop them gracefully.
        Exceptions during individual process stops are silently ignored.
        """
        for name in list(self.procs.keys()):
            try:
                self.stop(name)
            except Exception:
                pass

is_running(name)

Check if a managed process is still running.

Parameters:

Name Type Description Default
name str

Process identifier

required

Returns:

Type Description
bool

True if the process is running, False otherwise

Source code in runner/process_manager.py
def is_running(self, name: str) -> bool:
    """Check if a managed process is still running.

    Args:
        name: Process identifier

    Returns:
        True if the process is running, False otherwise
    """
    p = self.procs[name].popen
    return p.poll() is None

start(name, cmd, env=None, cwd=None)

Start a new managed process.

Parameters:

Name Type Description Default
name str

Unique identifier for this process

required
cmd list[str]

Command and arguments as a list

required
env dict[str, str] | None

Optional environment variables to merge with os.environ

None
cwd str | None

Optional working directory for the process

None

Raises:

Type Description
RuntimeError

If a process with this name already exists

Source code in runner/process_manager.py
def start(self, name: str, cmd: list[str], env: dict[str, str] | None = None, cwd: str | None = None) -> None:
    """Start a new managed process.

    Args:
        name: Unique identifier for this process
        cmd: Command and arguments as a list
        env: Optional environment variables to merge with os.environ
        cwd: Optional working directory for the process

    Raises:
        RuntimeError: If a process with this name already exists
    """
    if name in self.procs:
        raise RuntimeError(f"process already exists: {name}")

    log_path = self.logs_dir / f"{name}.log"
    f = open(log_path, "wb")

    merged_env = os.environ.copy()
    if env:
        merged_env.update({str(k): str(v) for k, v in env.items()})

    # Create a process group so we can SIGINT the whole tree.
    popen = subprocess.Popen(
        cmd,
        stdout=f,
        stderr=subprocess.STDOUT,
        cwd=cwd,
        env=merged_env,
        preexec_fn=os.setsid,   # POSIX only, OK for Ubuntu
    )
    self.procs[name] = ManagedProcess(name=name, popen=popen, log_path=log_path)

stop(name, sigint_timeout_s=8.0, sigterm_timeout_s=4.0)

Gracefully stop a managed process with escalating signals.

Attempts to stop the process using a three-stage approach: 1. SIGINT (ROS 2 friendly) - wait up to sigint_timeout_s 2. SIGTERM - wait up to sigterm_timeout_s 3. SIGKILL - forceful termination

Parameters:

Name Type Description Default
name str

Process identifier

required
sigint_timeout_s float

Seconds to wait after SIGINT before escalating

8.0
sigterm_timeout_s float

Seconds to wait after SIGTERM before SIGKILL

4.0
Source code in runner/process_manager.py
def stop(self, name: str, sigint_timeout_s: float = 8.0, sigterm_timeout_s: float = 4.0) -> None:
    """Gracefully stop a managed process with escalating signals.

    Attempts to stop the process using a three-stage approach:
    1. SIGINT (ROS 2 friendly) - wait up to sigint_timeout_s
    2. SIGTERM - wait up to sigterm_timeout_s
    3. SIGKILL - forceful termination

    Args:
        name: Process identifier
        sigint_timeout_s: Seconds to wait after SIGINT before escalating
        sigterm_timeout_s: Seconds to wait after SIGTERM before SIGKILL
    """
    mp = self.procs.get(name)
    if not mp:
        return
    p = mp.popen
    if p.poll() is not None:
        return

    pgid = os.getpgid(p.pid)

    # 1) SIGINT (ROS2-friendly)
    os.killpg(pgid, signal.SIGINT)
    t0 = time.time()
    while time.time() - t0 < sigint_timeout_s:
        if p.poll() is not None:
            return
        time.sleep(0.1)

    # 2) SIGTERM
    os.killpg(pgid, signal.SIGTERM)
    t1 = time.time()
    while time.time() - t1 < sigterm_timeout_s:
        if p.poll() is not None:
            return
        time.sleep(0.1)

    # 3) SIGKILL
    os.killpg(pgid, signal.SIGKILL)

stop_all()

Stop all managed processes.

Iterates through all processes and attempts to stop them gracefully. Exceptions during individual process stops are silently ignored.

Source code in runner/process_manager.py
def stop_all(self) -> None:
    """Stop all managed processes.

    Iterates through all processes and attempts to stop them gracefully.
    Exceptions during individual process stops are silently ignored.
    """
    for name in list(self.procs.keys()):
        try:
            self.stop(name)
        except Exception:
            pass

Data Recording

runner.bag_recorder

Configuration and command building for ROS 2 bag recording.

This module provides utilities to configure and generate rosbag2 record commands with various options like compression, storage format, and topic filtering.

RosbagConfig dataclass

Configuration for rosbag2 recording.

Attributes:

Name Type Description
storage_id str

Storage backend ("sqlite3" or "mcap")

compression Optional[str]

Optional compression format ("zstd" or "lz4")

max_bag_size_mb Optional[int]

Maximum size per bag file in megabytes

include_hidden_topics bool

Whether to record hidden topics (e.g., /rosout)

qos_overrides_path Optional[str]

Path to QoS profile overrides YAML

topics list[str]

List of topic names to record

Source code in runner/bag_recorder.py
@dataclass
class RosbagConfig:
    """Configuration for rosbag2 recording.

    Attributes:
        storage_id: Storage backend ("sqlite3" or "mcap")
        compression: Optional compression format ("zstd" or "lz4")
        max_bag_size_mb: Maximum size per bag file in megabytes
        include_hidden_topics: Whether to record hidden topics (e.g., /rosout)
        qos_overrides_path: Path to QoS profile overrides YAML
        topics: List of topic names to record
    """
    storage_id: str = "sqlite3"
    compression: Optional[str] = None   # "zstd" | "lz4"
    max_bag_size_mb: Optional[int] = None
    include_hidden_topics: bool = False
    qos_overrides_path: Optional[str] = None
    topics: list[str] = None

build_rosbag_cmd(output_dir, cfg)

Build a ros2 bag record command from configuration.

Parameters:

Name Type Description Default
output_dir str

Directory where the bag will be saved

required
cfg RosbagConfig

RosbagConfig with recording options

required

Returns:

Type Description
list[str]

Command as a list of strings ready for subprocess.Popen

Example

cfg = RosbagConfig(topics=["/scan", "/odom"], compression="zstd") cmd = build_rosbag_cmd("/tmp/mybag", cfg)

cmd = ["ros2", "bag", "record", "-o", "/tmp/mybag", ...]

Source code in runner/bag_recorder.py
def build_rosbag_cmd(output_dir: str, cfg: RosbagConfig) -> list[str]:
    """Build a ros2 bag record command from configuration.

    Args:
        output_dir: Directory where the bag will be saved
        cfg: RosbagConfig with recording options

    Returns:
        Command as a list of strings ready for subprocess.Popen

    Example:
        >>> cfg = RosbagConfig(topics=["/scan", "/odom"], compression="zstd")
        >>> cmd = build_rosbag_cmd("/tmp/mybag", cfg)
        >>> # cmd = ["ros2", "bag", "record", "-o", "/tmp/mybag", ...]
    """
    cmd = ["ros2", "bag", "record"]
    cmd += ["-o", output_dir]
    cmd += ["--storage", cfg.storage_id]

    if cfg.compression:
        cmd += ["--compression-mode", "file", "--compression-format", cfg.compression]

    if cfg.max_bag_size_mb:
        # rosbag expects bytes
        cmd += ["--max-bag-size", str(int(cfg.max_bag_size_mb) * 1024 * 1024)]

    if cfg.include_hidden_topics:
        cmd += ["--include-hidden-topics"]

    if cfg.qos_overrides_path:
        cmd += ["--qos-profile-overrides-path", cfg.qos_overrides_path]

    # topics
    for t in (cfg.topics or []):
        cmd.append(t)

    return cmd

Probes

runner.probes.ros_probes

ROS 2 probes for verifying system readiness before benchmark execution.

This module provides a suite of probes to verify that ROS 2 nodes, topics, services, and transforms are available and functioning correctly before starting a benchmark run.

NodePresentProbe

Bases: Probe

Probe that verifies a ROS 2 node is running.

Parameters:

Name Type Description Default
node_name str

Name of the node to check for

required
timeout_s float

Maximum time to wait (default: 10.0)

10.0
Source code in runner/probes/ros_probes.py
class NodePresentProbe(Probe):
    """Probe that verifies a ROS 2 node is running.

    Args:
        node_name: Name of the node to check for
        timeout_s: Maximum time to wait (default: 10.0)
    """
    def __init__(self, node_name: str, timeout_s: float = 10.0):
        self.node_name = node_name
        self.timeout_s = float(timeout_s)

    def run(self, ctx: ProbeContext) -> ProbeResult:
        node = ctx.node

        def present():
            names = node.get_node_names()  # without namespaces; we need full names:
            full = node.get_node_names_and_namespaces()
            full_names = [f"{ns.rstrip('/')}/{n}".replace("//", "/") for (n, ns) in full]
            return self.node_name in full_names or self.node_name in names

        ok = _spin_until(node, present, self.timeout_s)
        if ok:
            return ProbeResult(True, f"Node present: {self.node_name}")
        return ProbeResult(False, f"Node not present: {self.node_name} within {self.timeout_s}s")

ServiceAvailableProbe

Bases: Probe

Probe that verifies a ROS 2 service is available.

Parameters:

Name Type Description Default
service str

Service name (e.g., "/map_server/load_map")

required
srv_type str

Service type string (e.g., "nav2_msgs/srv/LoadMap")

required
timeout_s float

Maximum time to wait (default: 10.0)

10.0
Source code in runner/probes/ros_probes.py
class ServiceAvailableProbe(Probe):
    """Probe that verifies a ROS 2 service is available.

    Args:
        service: Service name (e.g., "/map_server/load_map")
        srv_type: Service type string (e.g., "nav2_msgs/srv/LoadMap")
        timeout_s: Maximum time to wait (default: 10.0)
    """
    def __init__(self, service: str, srv_type: str, timeout_s: float = 10.0):
        self.service = service
        self.srv_type = srv_type
        self.timeout_s = float(timeout_s)

    def run(self, ctx: ProbeContext) -> ProbeResult:
        node = ctx.node
        try:
            mod_name, cls_name = TopicPublishTypedProbe._resolve_import(self.srv_type)
            # srv import path is pkg.srv
            mod_name = mod_name.replace(".msg", ".srv")
            mod = __import__(mod_name, fromlist=[cls_name])
            Srv = getattr(mod, cls_name)
        except Exception as e:
            return ProbeResult(False, f"Cannot import srv_type={self.srv_type}: {e}")

        client = node.create_client(Srv, self.service)

        ok = _spin_until(node, lambda: client.service_is_ready(), self.timeout_s)
        node.destroy_client(client)

        if ok:
            return ProbeResult(True, f"Service ready: {self.service}")
        return ProbeResult(False, f"Service not ready: {self.service} within {self.timeout_s}s")

TfAvailableProbe

Bases: Probe

Probe that verifies a TF transform is available.

Checks if a transform between two frames exists in the TF tree.

Parameters:

Name Type Description Default
from_frame str

Source frame (e.g., "odom")

required
to_frame str

Target frame (e.g., "base_link")

required
timeout_s float

Maximum time to wait (default: 10.0)

10.0
Source code in runner/probes/ros_probes.py
class TfAvailableProbe(Probe):
    """Probe that verifies a TF transform is available.

    Checks if a transform between two frames exists in the TF tree.

    Args:
        from_frame: Source frame (e.g., "odom")
        to_frame: Target frame (e.g., "base_link")
        timeout_s: Maximum time to wait (default: 10.0)
    """
    def __init__(self, from_frame: str, to_frame: str, timeout_s: float = 10.0):
        self.from_frame = from_frame
        self.to_frame = to_frame
        self.timeout_s = float(timeout_s)

    def run(self, ctx: ProbeContext) -> ProbeResult:
        node = ctx.node
        buf = Buffer()
        listener = TransformListener(buf, node, spin_thread=False)

        def can():
            try:
                # allow a small timeout per check
                return buf.can_transform(self.from_frame, self.to_frame, rclpy.time.Time(), timeout=Duration(seconds=0.1))
            except Exception:
                return False

        ok = _spin_until(node, can, self.timeout_s)
        # listener is tied to node; no explicit destroy needed beyond letting it go

        if ok:
            return ProbeResult(True, f"TF available {self.from_frame}->{self.to_frame}")
        return ProbeResult(False, f"TF not available {self.from_frame}->{self.to_frame} within {self.timeout_s}s")

TopicHzTypedProbe

Bases: Probe

Probe that verifies a topic is publishing at a minimum frequency.

Measures the publication rate over a sliding window and ensures it meets the minimum Hz requirement.

Parameters:

Name Type Description Default
topic str

Topic name

required
msg_type str

Message type string

required
min_hz float

Minimum required frequency in Hz

required
window_s float

Time window for frequency calculation (default: 5.0)

5.0
timeout_s float

Maximum time to wait (default: 20.0)

20.0
Source code in runner/probes/ros_probes.py
class TopicHzTypedProbe(Probe):
    """Probe that verifies a topic is publishing at a minimum frequency.

    Measures the publication rate over a sliding window and ensures it meets
    the minimum Hz requirement.

    Args:
        topic: Topic name
        msg_type: Message type string
        min_hz: Minimum required frequency in Hz
        window_s: Time window for frequency calculation (default: 5.0)
        timeout_s: Maximum time to wait (default: 20.0)
    """
    def __init__(self, topic: str, msg_type: str, min_hz: float, window_s: float = 5.0, timeout_s: float = 20.0):
        self.topic = topic
        self.msg_type = msg_type
        self.min_hz = float(min_hz)
        self.window_s = float(window_s)
        self.timeout_s = float(timeout_s)

    def run(self, ctx: ProbeContext) -> ProbeResult:
        node = ctx.node
        try:
            mod_name, cls_name = TopicPublishTypedProbe._resolve_import(self.msg_type)
            mod = __import__(mod_name, fromlist=[cls_name])
            Msg = getattr(mod, cls_name)
        except Exception as e:
            return ProbeResult(False, f"Cannot import msg_type={self.msg_type}: {e}")

        stamps = []

        def cb(_msg):
            stamps.append(time.time())

        sub = node.create_subscription(Msg, self.topic, cb, qos_profile_sensor_data)

        def enough():
            # keep only window
            now = time.time()
            while stamps and stamps[0] < now - self.window_s:
                stamps.pop(0)
            if len(stamps) < 2:
                return False
            hz = (len(stamps) - 1) / max(1e-6, (stamps[-1] - stamps[0]))
            return hz >= self.min_hz

        ok = _spin_until(node, enough, self.timeout_s)
        node.destroy_subscription(sub)

        if not stamps:
            return ProbeResult(False, f"No messages received on {self.topic}")
        now = time.time()
        w = [s for s in stamps if s >= now - self.window_s]
        hz = 0.0
        if len(w) >= 2:
            hz = (len(w) - 1) / max(1e-6, (w[-1] - w[0]))

        if ok:
            return ProbeResult(True, f"{self.topic} hz={hz:.2f} >= {self.min_hz}")
        return ProbeResult(False, f"{self.topic} hz={hz:.2f} < {self.min_hz}", {"hz": hz})

TopicPublishProbe

Bases: Probe

Deprecated probe - use TopicPublishTypedProbe instead.

This probe requires explicit message type specification.

Source code in runner/probes/ros_probes.py
class TopicPublishProbe(Probe):
    """Deprecated probe - use TopicPublishTypedProbe instead.

    This probe requires explicit message type specification.
    """
    def __init__(self, topic: str, min_messages: int = 1, timeout_s: float = 10.0):
        self.topic = topic
        self.min_messages = int(min_messages)
        self.timeout_s = float(timeout_s)

    def run(self, ctx: ProbeContext) -> ProbeResult:
        node = ctx.node
        count = {"n": 0}

        # Use AnyMsg-style subscription: easiest is to use std_msgs/String? Not ok.
        # In practice, you should know msg types; for generic probing, use /rosapi is not available.
        # So we probe existence by graph + then count via ros2 topic echo is too heavy.
        # Better: implement per-known topics with types, or provide msg_type in config.
        # Here: require msg_type in params; we'll support that in a second probe below.
        return ProbeResult(
            ok=False,
            message=f"TopicPublishProbe requires msg_type. Use TopicPublishTypedProbe(topic=..., msg_type='sensor_msgs/msg/LaserScan')."
        )

TopicPublishTypedProbe

Bases: Probe

Probe that verifies a topic is publishing messages.

Subscribes to a topic and waits for a minimum number of messages to be received.

Parameters:

Name Type Description Default
topic str

Topic name (e.g., "/scan")

required
msg_type str

Message type string (e.g., "sensor_msgs/msg/LaserScan")

required
min_messages int

Minimum number of messages to receive (default: 1)

1
timeout_s float

Maximum time to wait in seconds (default: 10.0)

10.0
Source code in runner/probes/ros_probes.py
class TopicPublishTypedProbe(Probe):
    """Probe that verifies a topic is publishing messages.

    Subscribes to a topic and waits for a minimum number of messages to be received.

    Args:
        topic: Topic name (e.g., "/scan")
        msg_type: Message type string (e.g., "sensor_msgs/msg/LaserScan")
        min_messages: Minimum number of messages to receive (default: 1)
        timeout_s: Maximum time to wait in seconds (default: 10.0)
    """
    def __init__(self, topic: str, msg_type: str, min_messages: int = 1, timeout_s: float = 10.0):
        self.topic = topic
        self.msg_type = msg_type
        self.min_messages = int(min_messages)
        self.timeout_s = float(timeout_s)

    def run(self, ctx: ProbeContext) -> ProbeResult:
        node = ctx.node

        # dynamic import of message type string "pkg/msg/Type"
        try:
            mod_name, cls_name = self._resolve_import(self.msg_type)
            mod = __import__(mod_name, fromlist=[cls_name])
            Msg = getattr(mod, cls_name)
        except Exception as e:
            return ProbeResult(False, f"Cannot import msg_type={self.msg_type}: {e}")

        n = 0

        def cb(_msg):
            nonlocal n
            n += 1

        sub = node.create_subscription(Msg, self.topic, cb, 10)

        ok = _spin_until(node, lambda: n >= self.min_messages, self.timeout_s)
        node.destroy_subscription(sub)

        if ok:
            return ProbeResult(True, f"Received {n} msgs on {self.topic}")
        return ProbeResult(False, f"Timeout waiting {self.min_messages} msgs on {self.topic}", {"received": n})

    @staticmethod
    def _resolve_import(msg_type: str) -> tuple[str, str]:
        # accepts "sensor_msgs/msg/LaserScan" or "sensor_msgs.msg.LaserScan"
        if "/" in msg_type:
            pkg, _, name = msg_type.split("/", 2)
            if name.startswith("msg/"):
                name = name.split("/", 1)[1]
            return f"{pkg}.msg", name
        # dotted
        parts = msg_type.split(".")
        return ".".join(parts[:-1]), parts[-1]