Skip to content

Evaluation Module

evaluation.metrics

Skeleton evaluation script for frontier-based exploration runs.

What it does: - Loads a GT occupancy grid map from a YAML + PGM. - Reads a ros2 bag (rosbag2) with topics: - /map (nav_msgs/OccupancyGrid) - /odom (nav_msgs/Odometry) - Computes: - time_to_50, time_to_80, time_to_90 percent coverage - final occupancy IoU vs GT - total path length from /odom

Requirements
  • ROS 2 Python environment (source your ROS 2 + workspace)
  • rosbag2_py
  • numpy
  • opencv-python
  • pyyaml

This is a skeleton: adjust topic names, thresholds, and map alignment to your setup.

align_est_map_to_gt(gt_map, gt_res, gt_origin, est_msg)

Projette une OccupancyGrid 'est_msg' dans le repère de la carte GT en utilisant les origines + résolution (on suppose pas de rotation).

Retourne une grille de même taille que gt_map, remplie avec: - valeurs de la carte estimée là où c'est défini - -1 ailleurs

Source code in evaluation/metrics.py
def align_est_map_to_gt(gt_map, gt_res, gt_origin, est_msg):
    """
    Projette une OccupancyGrid 'est_msg' dans le repère de la carte GT
    en utilisant les origines + résolution (on suppose pas de rotation).

    Retourne une grille de même taille que gt_map, remplie avec:
        - valeurs de la carte estimée là où c'est défini
        - -1 ailleurs
    """
    gt_h, gt_w = gt_map.shape

    est_res = est_msg.info.resolution
    est_w = est_msg.info.width
    est_h = est_msg.info.height

    # on suppose résolutions identiques (slam_toolbox + map_saver)
    if abs(est_res - gt_res) > 1e-6:
        print(f"[WARN] est_res={est_res} != gt_res={gt_res}, on applique un facteur d'échelle.")
    scale = est_res / gt_res

    gt_origin_x, gt_origin_y, _ = gt_origin
    est_origin_x = est_msg.info.origin.position.x
    est_origin_y = est_msg.info.origin.position.y

    print(f"[DEBUG] GT map: {gt_w}x{gt_h}, res={gt_res}, origin=({gt_origin_x}, {gt_origin_y})")
    print(f"[DEBUG] Est map: {est_w}x{est_h}, res={est_res}, origin=({est_origin_x}, {est_origin_y})")

    est_flat = np.array(est_msg.data, dtype=np.int8)
    est_grid = est_flat.reshape((est_h, est_w))

    # Count known cells in estimated map
    known_cells = np.sum(est_grid != -1)
    free_cells = np.sum(est_grid == 0)
    occupied_cells = np.sum(est_grid > 50)
    print(f"[DEBUG] Est map stats: {known_cells} known ({free_cells} free, {occupied_cells} occupied)")

    # carte de sortie dans le repère GT
    est_on_gt = np.full_like(gt_map, -1, dtype=np.int8)

    cells_mapped = 0

    # IMPORTANT: The GT map generator uses np.flipud() when saving the PGM
    # This means when we load it with cv2.imread, it's ALREADY in ROS convention
    # (row 0 = bottom, increasing row = increasing Y in world coordinates)
    # So we DON'T need to flip the Y coordinate - use same logic as X

    # boucle sur les cellules de la carte estimée
    for i in range(est_h):
        for j in range(est_w):
            val = est_grid[i, j]
            if val == -1:
                continue  # inconnue, on ignore

            # coord monde (centre de cellule)
            # In ROS OccupancyGrid, row i corresponds to y = origin_y + i * resolution
            world_x = est_origin_x + (j + 0.5) * est_res
            world_y = est_origin_y + (i + 0.5) * est_res

            # Transform to GT map indices
            # Both X and Y use the same logic since GT is already in ROS convention
            gt_j = int((world_x - gt_origin_x) / gt_res)
            gt_i = int((world_y - gt_origin_y) / gt_res)

            if 0 <= gt_i < gt_h and 0 <= gt_j < gt_w:
                est_on_gt[gt_i, gt_j] = val
                cells_mapped += 1

    print(f"[DEBUG] Cells successfully mapped to GT: {cells_mapped}")

    return est_on_gt

compute_accessible_coverage(gt_map, est_map, gt_res, gt_origin, est_origin, est_width, est_height, est_res)

Coverage of the ACTUAL accessible area (Reachability analysis on GT map). Defines 'Accessible' as the connected component of free space in the GT map that corresponds to the area strictly explored/seen by the robot.

This handles cases where the GT map has a large "outside" free area that is unreachable (separated by walls), preventing the "largest component" heuristic from picking the wrong area.

Parameters:

Name Type Description Default
gt_map

Ground truth occupancy grid (0=Free, 100=Occ, -1=Unk)

required
est_map

Estimated map aligned to GT (0=Free, 100=Occ, -1=Unk)

required
Source code in evaluation/metrics.py
def compute_accessible_coverage(gt_map, est_map, gt_res, gt_origin, est_origin, est_width, est_height, est_res):
    """
    Coverage of the ACTUAL accessible area (Reachability analysis on GT map).
    Defines 'Accessible' as the connected component of free space in the GT map
    that corresponds to the area strictly explored/seen by the robot.

    This handles cases where the GT map has a large "outside" free area that is 
    unreachable (separated by walls), preventing the "largest component" heuristic 
    from picking the wrong area.

    Args:
        gt_map: Ground truth occupancy grid (0=Free, 100=Occ, -1=Unk)
        est_map: Estimated map aligned to GT (0=Free, 100=Occ, -1=Unk)
    """
    # 1. Identify Accessible Space candidates
    gt_free = (gt_map == 0).astype(np.uint8)
    if gt_free.sum() == 0:
        return 0.0

    # Label connected components (8-connectivity)
    num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(gt_free, connectivity=8)

    # Background is label 0. labels 1..N are components.
    if num_labels <= 1:
        return 0.0

    # 2. Find the component that matches the robot's location/map
    # We look for the component that has the most overlap with the ESTIMATED Free Space.
    est_free = (est_map == 0)

    best_label = -1
    max_overlap = -1

    # Heuristic: If est_map is empty (no free space), fall back to largest component
    if est_free.sum() == 0:
        # Fallback: largest area
        best_label = 1
        max_overlap = stats[1, cv2.CC_STAT_AREA]
        for i in range(2, num_labels):
            area = stats[i, cv2.CC_STAT_AREA]
            if area > max_overlap:
                max_overlap = area
                best_label = i
    else:
        # Check overlap for each component
        # Optimization: We can just use the labels imagemasked by est_free to count occurances
        # BUT: est_free might cross walls due to SLAM errors.
        # We want the component with the *highest number of shared pixels*.

        # Fast way using numpy bincount on the masked labels
        # labels[est_free] gives all label IDs under the free pixels of estimated map
        masked_labels = labels[est_free]
        if masked_labels.size > 0:
            # bincount accounts for 0 (background) too, so logic holds
            counts = np.bincount(masked_labels, minlength=num_labels)
            # We ignore label 0 (background)
            counts[0] = 0
            best_label = np.argmax(counts)

            # If for some reason overlap is 0 (unlikely if est_free > 0), fallback
            if counts[best_label] == 0:
                 best_label = np.argmax(stats[1:, cv2.CC_STAT_AREA]) + 1
        else:
             best_label = np.argmax(stats[1:, cv2.CC_STAT_AREA]) + 1

    accessible_mask = (labels == best_label)
    total_accessible = accessible_mask.sum()

    if total_accessible == 0:
        return 0.0

    # 3. Compute Coverage
    # Check which pixels in the accessible mask are "known" in the estimated map
    est_known = (est_map != -1)

    covered_accessible = (accessible_mask & est_known).sum()

    return covered_accessible / total_accessible

compute_coverage(gt_map, est_map)

Coverage: fraction of free GT cells that are known (free or occupied) in est_map. This measures coverage of the ENTIRE GT map.

Source code in evaluation/metrics.py
def compute_coverage(gt_map, est_map):
    """
    Coverage: fraction of free GT cells that are known (free or occupied) in est_map.
    This measures coverage of the ENTIRE GT map.
    """
    gt_free = (gt_map == 0)
    est_known = (est_map != -1)

    if gt_free.sum() == 0:
        return 0.0

    covered = (gt_free & est_known).sum()
    return covered / gt_free.sum()

compute_iou(gt_map, est_map)

IoU over OCCUPIED cells (100 vs >50 threshold).

Source code in evaluation/metrics.py
def compute_iou(gt_map, est_map):
    """
    IoU over OCCUPIED cells (100 vs >50 threshold).
    """
    gt_occ = (gt_map > 50)
    est_occ = (est_map > 50)

    intersection = (gt_occ & est_occ).sum()
    union = (gt_occ | est_occ).sum()
    return float(intersection) / float(union) if union > 0 else 0.0

compute_path_length(odom_msgs, bag_path=None)

Integrate path length from /odom messages.

Source code in evaluation/metrics.py
def compute_path_length(odom_msgs, bag_path=None):
    """
    Integrate path length from /odom messages.
    """
    if not odom_msgs:
        return 0.0

    prev_x = None
    prev_y = None
    length = 0.0

    for t, msg in odom_msgs:
        x = msg.pose.pose.position.x
        y = msg.pose.pose.position.y

        if prev_x is not None:
            dx = x - prev_x
            dy = y - prev_y
            length += math.sqrt(dx*dx + dy*dy)

        prev_x, prev_y = x, y

    return length

compute_ssim(gt_map, est_map)

Compute Structural Similarity Index (SSIM) between maps. Only considers common known areas.

Source code in evaluation/metrics.py
def compute_ssim(gt_map, est_map):
    """
    Compute Structural Similarity Index (SSIM) between maps.
    Only considers common known areas.
    """
    if not SKIMAGE_AVAILABLE:
        return 0.0

    # 1. Prepare normalized maps (0-1)
    # Treat unknown (-1) as 0.5 for a "neutral" background or mask them
    mask = (gt_map != -1) & (est_map != -1)
    if not np.any(mask):
        return 0.0

    # Convert to float [0, 1]
    gt_norm = np.clip(gt_map.astype(np.float32) / 100.0, 0, 1)
    est_norm = np.clip(est_map.astype(np.float32) / 100.0, 0, 1)

    # Compute SSIM on the whole image but focus on the relevant part
    score, _ = ssim(gt_norm, est_norm, full=True, data_range=1.0)

    # Optional: localized SSIM only on known cells

    return float(score)

compute_time_to_coverage(gt_map, gt_res, gt_origin, map_msgs, thresholds)

Compute time to reach each coverage threshold using un alignement correct est->GT pour chaque carte.

Source code in evaluation/metrics.py
def compute_time_to_coverage(gt_map, gt_res, gt_origin, map_msgs, thresholds):
    """
    Compute time to reach each coverage threshold using
    un alignement correct est->GT pour chaque carte.
    """
    times_to_cov = {th: None for th in thresholds}
    reached = {th: False for th in thresholds}

    for t, msg in map_msgs:
        if msg.info.width == 0 or msg.info.height == 0:
            continue

        est_on_gt = align_est_map_to_gt(gt_map, gt_res, gt_origin, msg)
        cov = compute_coverage(gt_map, est_on_gt)

        for th in thresholds:
            if not reached[th] and cov >= th:
                times_to_cov[th] = t
                reached[th] = True

    return times_to_cov

compute_wall_thickness(map_array, resolution)

Estimate the average thickness of walls in the map (in meters). Uses Distance Transform on occupied cells.

Source code in evaluation/metrics.py
def compute_wall_thickness(map_array, resolution):
    """
    Estimate the average thickness of walls in the map (in meters).
    Uses Distance Transform on occupied cells.
    """
    if not SKIMAGE_AVAILABLE:
        return 0.0

    # Occupied mask
    occ = (map_array > 50).astype(np.uint8)
    if not np.any(occ):
        return 0.0

    # Skeletonize to find center lines
    try:
        skel = skeletonize(occ).astype(np.uint8)
    except:
        return 0.0

    if not np.any(skel):
        return 0.0

    # Distance transform from background (non-wall)
    dist = cv2.distanceTransform(occ, cv2.DIST_L2, 3)

    # The thickness is roughly twice the distance from the center line to the edge
    thickness_cells = 2.0 * np.mean(dist[skel > 0])
    return thickness_cells * resolution

detect_anomalies(odom_msgs, ate_rmse=None)

Heuristic-based failure detection. Returns: (list of warning strings, is_failure boolean)

Source code in evaluation/metrics.py
def detect_anomalies(odom_msgs, ate_rmse=None):
    """
    Heuristic-based failure detection.
    Returns: (list of warning strings, is_failure boolean)
    """
    warnings = []
    is_failure = False

    if not odom_msgs:
        return (["No odometry data found."], True)

    # 1. Minimum Movement Check
    total_dist = compute_path_length(odom_msgs)
    if total_dist < 0.2: # Less than 20cm
        warnings.append("Robot barely moved (Stuck/Process fail)")
        is_failure = True

    # 2. Velocity / Jump Check
    # TB3 max speed is ~0.26 m/s. Let's flag anything > 1.5 m/s as a TF Jump
    max_jump = 0.0
    for i in range(1, len(odom_msgs)):
        t1, p1 = odom_msgs[i-1]
        t2, p2 = odom_msgs[i]
        dt = t2 - t1
        if dt <= 0: continue

        dx = p2.pose.pose.position.x - p1.pose.pose.position.x
        dy = p2.pose.pose.position.y - p1.pose.pose.position.y
        dist = math.sqrt(dx*dx + dy*dy)
        vel = dist / dt

        if vel > 1.5: 
            max_jump = max(max_jump, vel)

    if max_jump > 0:
        warnings.append(f"Major TF Jump detected! (Max speed pulse: {max_jump:.2f} m/s)")
        is_failure = True

    # 3. Accuracy Fail
    if ate_rmse is not None and ate_rmse > 1.0:
        warnings.append(f"Massive drift detected! (ATE RMSE: {ate_rmse:.3f} m)")
        is_failure = True

    return warnings, is_failure

get_trajectory(odom_msgs)

Extract x, y points from a list of (timestamp, msg) odom messages. Returns: (list_x, list_y)

Source code in evaluation/metrics.py
def get_trajectory(odom_msgs):
    """
    Extract x, y points from a list of (timestamp, msg) odom messages.
    Returns: (list_x, list_y)
    """
    tx, ty = [], []
    for _, msg in odom_msgs:
        tx.append(msg.pose.pose.position.x)
        ty.append(msg.pose.pose.position.y)
    return tx, ty

load_gt_map(yaml_path)

Load ground-truth map from .yaml + .pgm. Returns: gt_occ_grid: np.ndarray of shape (H, W) with int8 data (0..100, -1 for unknown) resolution: float (m/cell) origin: (x, y, yaw) of map in world frame

Source code in evaluation/metrics.py
def load_gt_map(yaml_path: str):
    """
    Load ground-truth map from <name>.yaml + <name>.pgm.
    Returns:
        gt_occ_grid: np.ndarray of shape (H, W) with int8 data (0..100, -1 for unknown)
        resolution: float (m/cell)
        origin: (x, y, yaw) of map in world frame
    """
    with open(yaml_path, "r") as f:
        info = yaml.safe_load(f)

    pgm_path = info["image"]
    if not os.path.isabs(pgm_path):
        pgm_path = os.path.join(os.path.dirname(yaml_path), pgm_path)

    # load pgm as grayscale
    img = cv2.imread(pgm_path, cv2.IMREAD_UNCHANGED)
    if img is None:
        raise RuntimeError(f"Failed to load PGM map from: {pgm_path}")

    # YAML fields (nav2/slam_toolbox convention)
    resolution = float(info["resolution"])
    origin = info["origin"]   # [x, y, yaw]
    negate = int(info.get("negate", 0))
    occupied_thresh = float(info.get("occupied_thresh", 0.65))
    free_thresh = float(info.get("free_thresh", 0.196))

    # Convert PGM to occupancy values [0..100] + -1 for unknown
    # Here's a simple heuristic; you may want to tune it to your pipeline.
    img_norm = img.astype(np.float32) / 255.0
    if negate:
        img_norm = 1.0 - img_norm


    gt_occ_grid = np.full(img_norm.shape, -1, dtype=np.int8)
    # Standard PGM: White (255) is Free, Black (0) is Occupied
    # High value > occupied_thresh (0.65) -> Free (0)
    # Low value < free_thresh (0.196) -> Occupied (100)

    # Note: ROS map_server docs say "occupied_thresh" determines occupied, 
    # but usually pixels < free_thresh are free?
    # Actually map_server:
    # "Pixels with brightness greater than free_thresh are considered free... (Wait no)"
    # Standard: p > occ_th -> Occupied. p < free_th -> Free.
    # BUT: 
    # If negate=0 (Black is occupied).
    # Then p (brightness) is LOW for occupied.
    # So if p < threshold -> Occupied.
    # The variable "occupied_thresh" in YAML usually means probability?
    # No, YAML has "occupied_thresh" (default 0.65). 
    # Interpretation depends on mode.
    # 
    # Let's assume standard "Black is Walls".
    # Black = 0. White = 1.
    # Walls are 0.
    # So if img_norm < free_thresh -> Occupied.

    gt_occ_grid[img_norm > occupied_thresh] = 0    # White -> Free
    gt_occ_grid[img_norm < free_thresh] = 100      # Black -> Occupied



    # Flip the map to match ROS convention (Top-Down image -> Bottom-Up grid)
    # ROS map origin is bottom-left, but image origin is top-left.
    gt_occ_grid = np.flipud(gt_occ_grid)

    return gt_occ_grid, resolution, origin

occupancy_arrays_from_msgs(map_msgs, gt_map, gt_res, gt_origin)

Utilise le dernier /map et l'aligne correctement sur la GT en utilisant les origines + résolution. Returns: (est_on_gt, last_map_msg)

Source code in evaluation/metrics.py
def occupancy_arrays_from_msgs(map_msgs, gt_map, gt_res, gt_origin):
    """
    Utilise le dernier /map et l'aligne correctement sur la GT
    en utilisant les origines + résolution.
    Returns: (est_on_gt, last_map_msg)
    """
    if not map_msgs:
        raise RuntimeError("No /map messages found in bag.")

    _, last_map = map_msgs[-1]
    est_on_gt = align_est_map_to_gt(gt_map, gt_res, gt_origin, last_map)
    return est_on_gt

open_bag_reader(bag_path)

Open a ros2 bag for reading, auto-detecting storage format.

Source code in evaluation/metrics.py
def open_bag_reader(bag_path: str) -> SequentialReader:
    """Open a ros2 bag for reading, auto-detecting storage format."""
    # Try different storage formats
    storage_formats = ["sqlite3", "mcap"]

    for storage_id in storage_formats:
        try:
            storage_options = StorageOptions(
                uri=bag_path,
                storage_id=storage_id
            )
            converter_options = ConverterOptions(
                input_serialization_format="cdr",
                output_serialization_format="cdr",
            )
            reader = SequentialReader()
            reader.open(storage_options, converter_options)
            return reader
        except Exception as e:
            if storage_id == storage_formats[-1]:
                # Last format failed, raise error
                raise RuntimeError(
                    f"Failed to open rosbag at '{bag_path}'. "
                    f"Tried formats: {storage_formats}. "
                    f"Make sure the path points to a valid ROS2 bag directory. "
                    f"Last error: {str(e)}"
                )
            # Try next format
            continue

read_messages_by_topic(bag_path, topics_of_interest)

Read all messages from a ros2 bag for given topics.

Returns:

Name Type Description
dict

{ topic_name: [(timestamp_sec, msg), ...] }

Source code in evaluation/metrics.py
def read_messages_by_topic(bag_path: str, topics_of_interest):
    """
    Read all messages from a ros2 bag for given topics.

    Returns:
        dict: { topic_name: [(timestamp_sec, msg), ...] }
    """
    try:
        reader = open_bag_reader(bag_path)
    except Exception as e:
        raise RuntimeError(f"Cannot open rosbag: {str(e)}")

    # Map topic name -> type string
    topic_types = {}
    for topic in reader.get_all_topics_and_types():
        topic_types[topic.name] = topic.type

    # Prepare message classes only for the topics we care about
    msg_classes = {}
    for tname in topics_of_interest:
        if tname in topic_types:
            msg_classes[tname] = get_message(topic_types[tname])

    messages = {t: [] for t in topics_of_interest}

    while reader.has_next():
        topic_name, data, t = reader.read_next()
        if topic_name not in topics_of_interest:
            continue

        msg_cls = msg_classes.get(topic_name)
        if msg_cls is None:
            # topic not in our interest list or type unknown
            continue

        # ✅ correct way: use rclpy.serialization
        msg = deserialize_message(data, msg_cls)

        # t is nanoseconds since bag start (int) in rosbag2; convert to seconds
        timestamp_sec = float(t) * 1e-9
        messages[topic_name].append((timestamp_sec, msg))

    # sort by time
    for t in messages:
        messages[t].sort(key=lambda x: x[0])

    return messages

ros_time_to_sec(t)

Convert builtin_interfaces/Time to seconds (float).

Source code in evaluation/metrics.py
def ros_time_to_sec(t: RosTime) -> float:
    """Convert builtin_interfaces/Time to seconds (float)."""
    return float(t.sec) + float(t.nanosec) * 1e-9

save_map_image(map_array, output_path, title=None)

Save the occupancy grid as an image. map_array: 2D numpy array (0-100, -1). output_path: path to save the image.

Source code in evaluation/metrics.py
def save_map_image(map_array, output_path, title=None):
    """
    Save the occupancy grid as an image.
    map_array: 2D numpy array (0-100, -1).
    output_path: path to save the image.
    """
    # Normalize for visualization:
    # -1 (unknown) -> 127 (gray)
    # 0 (free) -> 255 (white)
    # 100 (occupied) -> 0 (black)

    vis_map = np.full(map_array.shape, 127, dtype=np.uint8)
    vis_map[map_array == 0] = 255
    vis_map[map_array > 50] = 0


    # Flip if needed (usually map_array is already in image coords if loaded via cv2, 
    # but if it came from ROS msg it might be bottom-up. 
    # align_est_map_to_gt returns map in GT indices. GT loader uses cv2 no flip?
    # Actually load_gt_map uses cv2.imread which is top-down.
    # But usually ROS maps need flipud to be viewed correctly as image files.
    vis_map = np.flipud(vis_map)

    if title:
        # Add a title bar? slightly complex for cv2 only, let's keep it raw for now.
        pass

    cv2.imwrite(str(output_path), vis_map)