Tutorial 3: Read Data from an Advanced Logical Camera

Tutorial 3

This tutorial covers the following topics:

  • Add a camera to the environment,

  • Receive messages from the camera,

  • Store camera data internally as an instance of a class,

  • Display the stored data on the standard output.

Package Structure

Updates and additions that are specific to Tutorial 3 are highlighted in the tree below.

ariac_tutorials
├── CMakeLists.txt
├── package.xml
├── config
│   └── sensors.yaml
├── ariac_tutorials
│   ├── __init__.py
│   ├── utils.py
│   └── competition_interface.py
└── nodes
    ├── tutorial_1.py
    ├── tutorial_2.py
    └── tutorial_3.py

Updated/Created Files

Sensor Configuration File

An advanced logical camera is added to sensors.yaml above the right bins (see lines 8-13 in Listing 35).

Listing 35 sensors.yaml
 1sensors:
 2  breakbeam_0:
 3    type: break_beam
 4    visualize_fov: true
 5    pose:
 6      xyz: [-0.36, 3.5, 0.88]
 7      rpy: [0, 0, pi]
 8  advanced_camera_0:
 9    type: advanced_logical_camera
10    visualize_fov: true
11    pose:
12      xyz: [-2.286, 2.96, 1.8]
13      rpy: [pi, pi/2, 0]

Competition Interface

The competition interface for Tutorial 3 is shown in Listing 36.

Listing 36 competition_interface.py
  1import rclpy
  2from rclpy.node import Node
  3from rclpy.qos import qos_profile_sensor_data
  4from rclpy.parameter import Parameter
  5
  6from ariac_msgs.msg import (
  7    CompetitionState as CompetitionStateMsg,
  8    BreakBeamStatus as BreakBeamStatusMsg,
  9    AdvancedLogicalCameraImage as AdvancedLogicalCameraImageMsg,
 10    Part as PartMsg,
 11    PartPose as PartPoseMsg,
 12)
 13
 14from std_srvs.srv import Trigger
 15
 16from ariac_tutorials.utils import (
 17    multiply_pose,
 18    rpy_from_quaternion,
 19    rad_to_deg_str,
 20    AdvancedLogicalCameraImage
 21)
 22
 23
 24class CompetitionInterface(Node):
 25    '''
 26    Class for a competition interface node.
 27
 28    Args:
 29        Node (rclpy.node.Node): Parent class for ROS nodes
 30
 31    Raises:
 32        KeyboardInterrupt: Exception raised when the user uses Ctrl+C to kill a process
 33    '''
 34    _competition_states = {
 35        CompetitionStateMsg.IDLE: 'idle',
 36        CompetitionStateMsg.READY: 'ready',
 37        CompetitionStateMsg.STARTED: 'started',
 38        CompetitionStateMsg.ORDER_ANNOUNCEMENTS_DONE: 'order_announcements_done',
 39        CompetitionStateMsg.ENDED: 'ended',
 40    }
 41    '''Dictionary for converting CompetitionState constants to strings'''
 42
 43    _part_colors = {
 44        PartMsg.RED: 'red',
 45        PartMsg.BLUE: 'blue',
 46        PartMsg.GREEN: 'green',
 47        PartMsg.ORANGE: 'orange',
 48        PartMsg.PURPLE: 'purple',
 49    }
 50    '''Dictionary for converting Part color constants to strings'''
 51
 52    _part_colors_emoji = {
 53        PartMsg.RED: '🟥',
 54        PartMsg.BLUE: '🟦',
 55        PartMsg.GREEN: '🟩',
 56        PartMsg.ORANGE: '🟧',
 57        PartMsg.PURPLE: '🟪',
 58    }
 59    '''Dictionary for converting Part color constants to emojis'''
 60
 61    _part_types = {
 62        PartMsg.BATTERY: 'battery',
 63        PartMsg.PUMP: 'pump',
 64        PartMsg.REGULATOR: 'regulator',
 65        PartMsg.SENSOR: 'sensor',
 66    }
 67    '''Dictionary for converting Part type constants to strings'''
 68
 69    def __init__(self):
 70        super().__init__('competition_interface')
 71
 72        sim_time = Parameter(
 73            "use_sim_time",
 74            rclpy.Parameter.Type.BOOL,
 75            True
 76        )
 77
 78        self.set_parameters([sim_time])
 79
 80        # Service client for starting the competition
 81        self._start_competition_client = self.create_client(Trigger, '/ariac/start_competition')
 82
 83        # Subscriber to the competition state topic
 84        self._competition_state_sub = self.create_subscription(
 85            CompetitionStateMsg,
 86            '/ariac/competition_state',
 87            self._competition_state_cb,
 88            10)
 89
 90        # Store the state of the competition
 91        self._competition_state: CompetitionStateMsg = None
 92
 93        # Subscriber to the break beam status topic
 94        self._break_beam0_sub = self.create_subscription(
 95            BreakBeamStatusMsg,
 96            '/ariac/sensors/breakbeam_0/status',
 97            self._breakbeam0_cb,
 98            qos_profile_sensor_data)
 99
100        # Store the number of parts that crossed the beam
101        self._conveyor_part_count = 0
102
103        # Store whether the beam is broken
104        self._object_detected = False
105
106        # Subscriber to the logical camera topic
107        self._advanced_camera0_sub = self.create_subscription(
108            AdvancedLogicalCameraImageMsg,
109            '/ariac/sensors/advanced_camera_0/image',
110            self._advanced_camera0_cb,
111            qos_profile_sensor_data)
112
113        # Store each camera image as an AdvancedLogicalCameraImage object
114        self._camera_image: AdvancedLogicalCameraImage = None
115
116    @property
117    def camera_image(self):
118        return self._camera_image
119
120    @property
121    def conveyor_part_count(self):
122        return self._conveyor_part_count
123
124    def _advanced_camera0_cb(self, msg: AdvancedLogicalCameraImageMsg):
125        '''Callback for the topic /ariac/sensors/advanced_camera_0/image
126
127        Arguments:
128            msg -- AdvancedLogicalCameraImage message
129        '''
130        self._camera_image = AdvancedLogicalCameraImage(msg.part_poses,
131                                                        msg.tray_poses,
132                                                        msg.sensor_pose)
133
134    def _breakbeam0_cb(self, msg: BreakBeamStatusMsg):
135        '''Callback for the topic /ariac/sensors/breakbeam_0/status
136
137        Arguments:
138            msg -- BreakBeamStatusMsg message
139        '''
140        if not self._object_detected and msg.object_detected:
141            self._conveyor_part_count += 1
142
143        self._object_detected = msg.object_detected
144
145    def _competition_state_cb(self, msg: CompetitionStateMsg):
146        '''Callback for the topic /ariac/competition_state
147        Arguments:
148            msg -- CompetitionState message
149        '''
150        # Log if competition state has changed
151        if self._competition_state != msg.competition_state:
152            state = CompetitionInterface._competition_states[msg.competition_state]
153            self.get_logger().info(f'Competition state is: {state}', throttle_duration_sec=1.0)
154
155        self._competition_state = msg.competition_state
156
157    def start_competition(self):
158        '''Function to start the competition.
159        '''
160        self.get_logger().info('Waiting for competition to be ready')
161
162        if self._competition_state == CompetitionStateMsg.STARTED:
163            return
164        # Wait for competition to be ready
165        while self._competition_state != CompetitionStateMsg.READY:
166            try:
167                rclpy.spin_once(self)
168            except KeyboardInterrupt:
169                return
170
171        self.get_logger().info('Competition is ready. Starting...')
172
173        # Check if service is available
174        if not self._start_competition_client.wait_for_service(timeout_sec=3.0):
175            self.get_logger().error('Service \'/ariac/start_competition\' is not available.')
176            return
177
178        # Create trigger request and call starter service
179        request = Trigger.Request()
180        future = self._start_competition_client.call_async(request)
181
182        # Wait until the service call is completed
183        rclpy.spin_until_future_complete(self, future)
184
185        if future.result().success:
186            self.get_logger().info('Started competition.')
187        else:
188            self.get_logger().warn('Unable to start competition')
189
190    def parse_advanced_camera_image(self, image: AdvancedLogicalCameraImage) -> str:
191        '''
192        Parse an AdvancedLogicalCameraImage message and return a string representation.
193        '''
194
195        if len(image._part_poses) == 0:
196            return 'No parts detected'
197
198        output = '\n\n'
199        for i, part_pose in enumerate(image._part_poses):
200            part_pose: PartPoseMsg
201            output += '==========================\n'
202            part_color = CompetitionInterface._part_colors[part_pose.part.color].capitalize()
203            part_color_emoji = CompetitionInterface._part_colors_emoji[part_pose.part.color]
204            part_type = CompetitionInterface._part_types[part_pose.part.type].capitalize()
205            output += f'Part {i+1}: {part_color_emoji} {part_color} {part_type}\n'
206            output += '--------------------------\n'
207            output += 'Camera Frame\n'
208            output += '--------------------------\n'
209
210            output += '  Position:\n'
211            output += f'    x: {part_pose.pose.position.x:.3f} (m)\n'
212            output += f'    y: {part_pose.pose.position.y:.3f} (m)\n'
213            output += f'    z: {part_pose.pose.position.z:.3f} (m)\n'
214
215            roll, pitch, yaw = rpy_from_quaternion(part_pose.pose.orientation)
216            output += '  Orientation:\n'
217            output += f'    roll: {rad_to_deg_str(roll)}\n'
218            output += f'    pitch: {rad_to_deg_str(pitch)}\n'
219            output += f'    yaw: {rad_to_deg_str(yaw)}\n'
220
221            part_world_pose = multiply_pose(image._sensor_pose, part_pose.pose)
222            output += '--------------------------\n'
223            output += 'World Frame\n'
224            output += '--------------------------\n'
225
226            output += '  Position:\n'
227            output += f'    x: {part_world_pose.position.x:.3f} (m)\n'
228            output += f'    y: {part_world_pose.position.y:.3f} (m)\n'
229            output += f'    z: {part_world_pose.position.z:.3f} (m)\n'
230
231            roll, pitch, yaw = rpy_from_quaternion(part_world_pose.orientation)
232            output += '  Orientation:\n'
233            output += f'    roll: {rad_to_deg_str(roll)}\n'
234            output += f'    pitch: {rad_to_deg_str(pitch)}\n'
235            output += f'    yaw: {rad_to_deg_str(yaw)}\n'
236
237            output += '==========================\n\n'
238
239        return output

Code Explanation

The competition interface from Tutorial 2 was augmented with the components described below.

  • Imports

    • AdvancedLogicalCameraImage: Message class that stores the part poses and sensor pose of the advanced logical camera (ariac_msgs/msg/AdvancedLogicalCameraImage).

    • Part: Message class that stores the part type and color (ariac_msgs/msg/Part).

    • PartPose: Message class that stores a Part and its Pose (ariac_msgs/msg/PartPose).

      • Note: These message classes are imported as aliases since the package consists of Python classes with the same name.

    • The module utils contains reusable functions and classes.

    • The function multiply_pose() is used to compute the pose of the parts detected by the camera in the world frame.

    • The function rpy_from_quaternion() is used to convert a quaternion to euler angles roll, pitch, yaw.

    • The function rad_to_deg_str() is used to convert a value in radians to a string in degrees.

    • The class AdvancedLogicalCameraImage is a Python class which is used to store the message published on the camera topic. Although a class is not strictly necessary, it makes the code more readable and easier to maintain.

  • Class Variables

    • _part_colors is a dictionary that maps the integer values of the part color to their string representations.

    • _part_types is a dictionary that maps the integer values of the part type to their string representations.

    • _part_colors_emoji is a dictionary that maps the integer values of the part color to their emoji representations.

      • Note: These dictionaries are mainly used to display the part color and type in a human-readable format.

  • Instance Variables

    • _advanced_camera0_sub is a subscriber to the camera topic. The callback function advanced_camera0_cb() is called when a message is published on the camera topic.

    • _camera_image is an object of the class AdvancedLogicalCameraImage that stores the latest message published on the camera topic.

  • Instance Methods

    • camera_image(self) is a getter to the _camera_image attribute and is provided to access the latest message published on the camera topic.

    • _advanced_camera0_cb(self, msg) is the callback function for the camera topic. It stores the message in the _camera_image attribute.

    • parse_advanced_camera_image(self) parses the message stored in _camera_image and returns a string representation of the message. This method is used to display the part color, type, and pose in a human-readable format. The output is printed in the following format:

      • Emoji for the part color using the class attribute part_colors_emoji_.

      • Part color using the class attribute part_colors_.

      • Part type using the class attribute part_types_.

      • Part pose in the camera frame: This is the pose returned by the camera.

      • Part pose in the world frame: This is calculated by multiplying the camera pose with the part pose in the camera frame. This multiplication is done using the method multiply_pose().

Create the Executable

Listing 37 tutorial_3.py
#!/usr/bin/env python3
'''
To test this script, run the following commands in separate terminals:
- ros2 launch ariac_gazebo ariac.launch.py trial_name:=tutorial competitor_pkg:=ariac_tutorials
- ros2 run ariac_tutorials tutorial_3.py
'''

import rclpy
from ariac_tutorials.competition_interface import CompetitionInterface


def main(args=None):
    rclpy.init(args=args)
    interface = CompetitionInterface()
    interface.start_competition()

    while rclpy.ok():
        try:
            rclpy.spin_once(interface)
            image = interface.camera_image
            if image is not None:
                interface.get_logger().info(interface.parse_advanced_camera_image(image), throttle_duration_sec=5.0)
        except KeyboardInterrupt:
            break

    interface.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

Code Explanation

This executable does the following:

  • Initialize the ROS client library.

  • Create an instance of the class CompetitionInterface as a ROS node.

  • Start the competition.

  • Log the content of interface.camera_image every 5 seconds.

Build Instructions

Listing 38 CMakeLists.txt
cmake_minimum_required(VERSION 3.8)
project(ariac_tutorials)

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()

find_package(ament_cmake REQUIRED)
find_package(ament_cmake_python REQUIRED)
find_package(rclcpp REQUIRED)
find_package(rclpy REQUIRED)
find_package(ariac_msgs REQUIRED)
find_package(orocos_kdl REQUIRED)

# Install the config directory to the package share directory
install(DIRECTORY
config
DESTINATION share/${PROJECT_NAME}
)

# Install Python modules
ament_python_install_package(${PROJECT_NAME} SCRIPTS_DESTINATION lib/${PROJECT_NAME})

# Install Python executables
install(PROGRAMS
scripts/tutorial_1.py
scripts/tutorial_2.py
scripts/tutorial_3.py
DESTINATION lib/${PROJECT_NAME}
)

ament_package()

Package Manifest

This tutorial adds a dependency to the PyKDL module. This is added to the package manifest.

Listing 39 package.xml
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
    <name>ariac_tutorials</name>
    <version>0.0.0</version>
    <description>Tutorial 3</description>
    <maintainer email="[email protected]">Justin Albrecht</maintainer>
    <license>Apache License 2.0</license>

    <buildtool_depend>ament_cmake</buildtool_depend>

    <depend>rclcpp</depend>
    <depend>rclpy</depend>
    <depend>ariac_msgs</depend>
    <depend>geometry_msgs</depend>
    <depend>orocos_kdl</depend>
    <depend>python3-pykdl</depend>

    <export>
        <build_type>ament_cmake</build_type>
    </export>
</package>

Test the Sensor Configuration

To test the camera was correctly added to the environment, run the following commands:

cd ~/ariac_ws
colcon build
. install/setup.bash
ros2 launch ariac_gazebo ariac.launch.py trial_name:=tutorial competitor_pkg:=ariac_tutorials

You should see the camera above bins 1-4 as shown in the figure below.

../_images/advanced_camera_0.png

Run the Executable

  • In terminal 1, run the following commands:

    cd ~/ariac_ws
    rosdep install --from-paths src -y --ignore-src
    colcon build
    . install/setup.bash
    ros2 run ariac_tutorials tutorial_3.py
    

    The rosdep command needs to be run again because the package.xml was updated with a new dependency.

  • In terminal 2, run the following commands:

    cd ~/ariac_ws
    . install/setup.bash
    ros2 launch ariac_gazebo ariac.launch.py trial_name:=tutorial competitor_pkg:=ariac_tutorials
    

    Once the environment is loaded and the competition state is ready, the interface node running in terminal 1 will start the competition and the sensor will start publishing data. Each part detected by the camera will be logged to the terminal.

Outputs

Listing 40 terminal 1 output
[INFO] [1679430757.556470727] [competition_interface]: Waiting for competition to be ready
[INFO] [1679430770.831452522] [competition_interface]: Competition state is: idle
[INFO] [1679430778.086868765] [competition_interface]: Competition state is: ready
[INFO] [1679430778.087507486] [competition_interface]: Competition is ready. Starting...
[INFO] [1679430778.090600012] [competition_interface]: Started competition.
[INFO] [1679430778.252110253] [competition_interface]:

==========================
Part 1: 🟪 Purple Pump
--------------------------
Camera Frame
--------------------------
Position:
    x: 1.077 (m)
    y: 0.515 (m)
    z: -0.206 (m)
Orientation:
    roll: 0°
    pitch: -90°
    yaw: 0°
--------------------------
World Frame
--------------------------
Position:
    x: -2.080 (m)
    y: 2.445 (m)
    z: 0.723 (m)
Orientation:
    roll: 0°
    pitch: 0°
    yaw: 180°
==========================

==========================
Part 2: 🟪 Purple Pump
--------------------------
Camera Frame
--------------------------
Position:
    x: 1.077 (m)
    y: 0.155 (m)
    z: -0.206 (m)
Orientation:
    roll: 179°
    pitch: -90°
    yaw: -178°
--------------------------
World Frame
--------------------------
Position:
    x: -2.080 (m)
    y: 2.805 (m)
    z: 0.723 (m)
Orientation:
    roll: 0°
    pitch: 0°
    yaw: 180°
==========================

==========================
Part 3: 🟪 Purple Pump
--------------------------
Camera Frame
--------------------------
Position:
    x: 1.077 (m)
    y: 0.515 (m)
    z: -0.566 (m)
Orientation:
    roll: 177°
    pitch: -90°
    yaw: -177°
--------------------------
World Frame
--------------------------
Position:
    x: -1.720 (m)
    y: 2.445 (m)
    z: 0.723 (m)
Orientation:
    roll: 0°
    pitch: 0°
    yaw: 180°
==========================

==========================
Part 4: 🟪 Purple Pump
--------------------------
Camera Frame
--------------------------
Position:
    x: 1.077 (m)
    y: 0.155 (m)
    z: -0.566 (m)
Orientation:
    roll: 0°
    pitch: -90°
    yaw: 0°
--------------------------
World Frame
--------------------------
Position:
    x: -1.720 (m)
    y: 2.805 (m)
    z: 0.723 (m)
Orientation:
    roll: 0°
    pitch: 0°
    yaw: 180°
==========================