Tutorial 3: Read Data from an Advanced Logical Camera
Tutorial 3
Prerequisites: Introduction to Tutorials and Tutorial 2
Source Code: https://github.com/jaybrecht/ariac_tutorials/tree/tutorial_3
Switch Branch:
cd ~/ariac_ws/src/ariac_tutorials git switch tutorial_3
This tutorial covers the following topics:
Add a camera to the environment,
Receive messages from the camera,
Store camera data internally as an instance of a class,
Display the stored data on the standard output.
Package Structure
Updates and additions that are specific to Tutorial 3 are highlighted in the tree below.
Updated/Created Files
Sensor Configuration File
An advanced logical camera is added to sensors.yaml
above the right bins (see lines 8-13 in Listing 35).
1sensors:
2 breakbeam_0:
3 type: break_beam
4 visualize_fov: true
5 pose:
6 xyz: [-0.36, 3.5, 0.88]
7 rpy: [0, 0, pi]
8 advanced_camera_0:
9 type: advanced_logical_camera
10 visualize_fov: true
11 pose:
12 xyz: [-2.286, 2.96, 1.8]
13 rpy: [pi, pi/2, 0]
Competition Interface
The competition interface for Tutorial 3 is shown in Listing 36.
1import rclpy
2from rclpy.node import Node
3from rclpy.qos import qos_profile_sensor_data
4from rclpy.parameter import Parameter
5
6from ariac_msgs.msg import (
7 CompetitionState as CompetitionStateMsg,
8 BreakBeamStatus as BreakBeamStatusMsg,
9 AdvancedLogicalCameraImage as AdvancedLogicalCameraImageMsg,
10 Part as PartMsg,
11 PartPose as PartPoseMsg,
12)
13
14from std_srvs.srv import Trigger
15
16from ariac_tutorials.utils import (
17 multiply_pose,
18 rpy_from_quaternion,
19 rad_to_deg_str,
20 AdvancedLogicalCameraImage
21)
22
23
24class CompetitionInterface(Node):
25 '''
26 Class for a competition interface node.
27
28 Args:
29 Node (rclpy.node.Node): Parent class for ROS nodes
30
31 Raises:
32 KeyboardInterrupt: Exception raised when the user uses Ctrl+C to kill a process
33 '''
34 _competition_states = {
35 CompetitionStateMsg.IDLE: 'idle',
36 CompetitionStateMsg.READY: 'ready',
37 CompetitionStateMsg.STARTED: 'started',
38 CompetitionStateMsg.ORDER_ANNOUNCEMENTS_DONE: 'order_announcements_done',
39 CompetitionStateMsg.ENDED: 'ended',
40 }
41 '''Dictionary for converting CompetitionState constants to strings'''
42
43 _part_colors = {
44 PartMsg.RED: 'red',
45 PartMsg.BLUE: 'blue',
46 PartMsg.GREEN: 'green',
47 PartMsg.ORANGE: 'orange',
48 PartMsg.PURPLE: 'purple',
49 }
50 '''Dictionary for converting Part color constants to strings'''
51
52 _part_colors_emoji = {
53 PartMsg.RED: '🟥',
54 PartMsg.BLUE: '🟦',
55 PartMsg.GREEN: '🟩',
56 PartMsg.ORANGE: '🟧',
57 PartMsg.PURPLE: '🟪',
58 }
59 '''Dictionary for converting Part color constants to emojis'''
60
61 _part_types = {
62 PartMsg.BATTERY: 'battery',
63 PartMsg.PUMP: 'pump',
64 PartMsg.REGULATOR: 'regulator',
65 PartMsg.SENSOR: 'sensor',
66 }
67 '''Dictionary for converting Part type constants to strings'''
68
69 def __init__(self):
70 super().__init__('competition_interface')
71
72 sim_time = Parameter(
73 "use_sim_time",
74 rclpy.Parameter.Type.BOOL,
75 True
76 )
77
78 self.set_parameters([sim_time])
79
80 # Service client for starting the competition
81 self._start_competition_client = self.create_client(Trigger, '/ariac/start_competition')
82
83 # Subscriber to the competition state topic
84 self._competition_state_sub = self.create_subscription(
85 CompetitionStateMsg,
86 '/ariac/competition_state',
87 self._competition_state_cb,
88 10)
89
90 # Store the state of the competition
91 self._competition_state: CompetitionStateMsg = None
92
93 # Subscriber to the break beam status topic
94 self._break_beam0_sub = self.create_subscription(
95 BreakBeamStatusMsg,
96 '/ariac/sensors/breakbeam_0/status',
97 self._breakbeam0_cb,
98 qos_profile_sensor_data)
99
100 # Store the number of parts that crossed the beam
101 self._conveyor_part_count = 0
102
103 # Store whether the beam is broken
104 self._object_detected = False
105
106 # Subscriber to the logical camera topic
107 self._advanced_camera0_sub = self.create_subscription(
108 AdvancedLogicalCameraImageMsg,
109 '/ariac/sensors/advanced_camera_0/image',
110 self._advanced_camera0_cb,
111 qos_profile_sensor_data)
112
113 # Store each camera image as an AdvancedLogicalCameraImage object
114 self._camera_image: AdvancedLogicalCameraImage = None
115
116 @property
117 def camera_image(self):
118 return self._camera_image
119
120 @property
121 def conveyor_part_count(self):
122 return self._conveyor_part_count
123
124 def _advanced_camera0_cb(self, msg: AdvancedLogicalCameraImageMsg):
125 '''Callback for the topic /ariac/sensors/advanced_camera_0/image
126
127 Arguments:
128 msg -- AdvancedLogicalCameraImage message
129 '''
130 self._camera_image = AdvancedLogicalCameraImage(msg.part_poses,
131 msg.tray_poses,
132 msg.sensor_pose)
133
134 def _breakbeam0_cb(self, msg: BreakBeamStatusMsg):
135 '''Callback for the topic /ariac/sensors/breakbeam_0/status
136
137 Arguments:
138 msg -- BreakBeamStatusMsg message
139 '''
140 if not self._object_detected and msg.object_detected:
141 self._conveyor_part_count += 1
142
143 self._object_detected = msg.object_detected
144
145 def _competition_state_cb(self, msg: CompetitionStateMsg):
146 '''Callback for the topic /ariac/competition_state
147 Arguments:
148 msg -- CompetitionState message
149 '''
150 # Log if competition state has changed
151 if self._competition_state != msg.competition_state:
152 state = CompetitionInterface._competition_states[msg.competition_state]
153 self.get_logger().info(f'Competition state is: {state}', throttle_duration_sec=1.0)
154
155 self._competition_state = msg.competition_state
156
157 def start_competition(self):
158 '''Function to start the competition.
159 '''
160 self.get_logger().info('Waiting for competition to be ready')
161
162 if self._competition_state == CompetitionStateMsg.STARTED:
163 return
164 # Wait for competition to be ready
165 while self._competition_state != CompetitionStateMsg.READY:
166 try:
167 rclpy.spin_once(self)
168 except KeyboardInterrupt:
169 return
170
171 self.get_logger().info('Competition is ready. Starting...')
172
173 # Check if service is available
174 if not self._start_competition_client.wait_for_service(timeout_sec=3.0):
175 self.get_logger().error('Service \'/ariac/start_competition\' is not available.')
176 return
177
178 # Create trigger request and call starter service
179 request = Trigger.Request()
180 future = self._start_competition_client.call_async(request)
181
182 # Wait until the service call is completed
183 rclpy.spin_until_future_complete(self, future)
184
185 if future.result().success:
186 self.get_logger().info('Started competition.')
187 else:
188 self.get_logger().warn('Unable to start competition')
189
190 def parse_advanced_camera_image(self, image: AdvancedLogicalCameraImage) -> str:
191 '''
192 Parse an AdvancedLogicalCameraImage message and return a string representation.
193 '''
194
195 if len(image._part_poses) == 0:
196 return 'No parts detected'
197
198 output = '\n\n'
199 for i, part_pose in enumerate(image._part_poses):
200 part_pose: PartPoseMsg
201 output += '==========================\n'
202 part_color = CompetitionInterface._part_colors[part_pose.part.color].capitalize()
203 part_color_emoji = CompetitionInterface._part_colors_emoji[part_pose.part.color]
204 part_type = CompetitionInterface._part_types[part_pose.part.type].capitalize()
205 output += f'Part {i+1}: {part_color_emoji} {part_color} {part_type}\n'
206 output += '--------------------------\n'
207 output += 'Camera Frame\n'
208 output += '--------------------------\n'
209
210 output += ' Position:\n'
211 output += f' x: {part_pose.pose.position.x:.3f} (m)\n'
212 output += f' y: {part_pose.pose.position.y:.3f} (m)\n'
213 output += f' z: {part_pose.pose.position.z:.3f} (m)\n'
214
215 roll, pitch, yaw = rpy_from_quaternion(part_pose.pose.orientation)
216 output += ' Orientation:\n'
217 output += f' roll: {rad_to_deg_str(roll)}\n'
218 output += f' pitch: {rad_to_deg_str(pitch)}\n'
219 output += f' yaw: {rad_to_deg_str(yaw)}\n'
220
221 part_world_pose = multiply_pose(image._sensor_pose, part_pose.pose)
222 output += '--------------------------\n'
223 output += 'World Frame\n'
224 output += '--------------------------\n'
225
226 output += ' Position:\n'
227 output += f' x: {part_world_pose.position.x:.3f} (m)\n'
228 output += f' y: {part_world_pose.position.y:.3f} (m)\n'
229 output += f' z: {part_world_pose.position.z:.3f} (m)\n'
230
231 roll, pitch, yaw = rpy_from_quaternion(part_world_pose.orientation)
232 output += ' Orientation:\n'
233 output += f' roll: {rad_to_deg_str(roll)}\n'
234 output += f' pitch: {rad_to_deg_str(pitch)}\n'
235 output += f' yaw: {rad_to_deg_str(yaw)}\n'
236
237 output += '==========================\n\n'
238
239 return output
Code Explanation
The competition interface from Tutorial 2 was augmented with the components described below.
Imports
AdvancedLogicalCameraImage
: Message class that stores the part poses and sensor pose of the advanced logical camera (ariac_msgs/msg/AdvancedLogicalCameraImage).Part
: Message class that stores the part type and color (ariac_msgs/msg/Part).PartPose
: Message class that stores aPart
and itsPose
(ariac_msgs/msg/PartPose).Note: These message classes are imported as aliases since the package consists of Python classes with the same name.
The module
utils
contains reusable functions and classes.The function
multiply_pose()
is used to compute the pose of the parts detected by the camera in the world frame.The function
rpy_from_quaternion()
is used to convert a quaternion to euler angles roll, pitch, yaw.The function
rad_to_deg_str()
is used to convert a value in radians to a string in degrees.The class
AdvancedLogicalCameraImage
is a Python class which is used to store the message published on the camera topic. Although a class is not strictly necessary, it makes the code more readable and easier to maintain.
Class Variables
_part_colors
is a dictionary that maps the integer values of the part color to their string representations._part_types
is a dictionary that maps the integer values of the part type to their string representations._part_colors_emoji
is a dictionary that maps the integer values of the part color to their emoji representations.Note: These dictionaries are mainly used to display the part color and type in a human-readable format.
Instance Variables
_advanced_camera0_sub
is a subscriber to the camera topic. The callback functionadvanced_camera0_cb()
is called when a message is published on the camera topic._camera_image
is an object of the classAdvancedLogicalCameraImage
that stores the latest message published on the camera topic.
Instance Methods
camera_image(self)
is a getter to the_camera_image
attribute and is provided to access the latest message published on the camera topic._advanced_camera0_cb(self, msg)
is the callback function for the camera topic. It stores the message in the_camera_image
attribute.parse_advanced_camera_image(self)
parses the message stored in_camera_image
and returns a string representation of the message. This method is used to display the part color, type, and pose in a human-readable format. The output is printed in the following format:Emoji for the part color using the class attribute
part_colors_emoji_
.Part color using the class attribute
part_colors_
.Part type using the class attribute
part_types_
.Part pose in the camera frame: This is the pose returned by the camera.
Part pose in the world frame: This is calculated by multiplying the camera pose with the part pose in the camera frame. This multiplication is done using the method
multiply_pose()
.
Create the Executable
#!/usr/bin/env python3
'''
To test this script, run the following commands in separate terminals:
- ros2 launch ariac_gazebo ariac.launch.py trial_name:=tutorial competitor_pkg:=ariac_tutorials
- ros2 run ariac_tutorials tutorial_3.py
'''
import rclpy
from ariac_tutorials.competition_interface import CompetitionInterface
def main(args=None):
rclpy.init(args=args)
interface = CompetitionInterface()
interface.start_competition()
while rclpy.ok():
try:
rclpy.spin_once(interface)
image = interface.camera_image
if image is not None:
interface.get_logger().info(interface.parse_advanced_camera_image(image), throttle_duration_sec=5.0)
except KeyboardInterrupt:
break
interface.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Code Explanation
This executable does the following:
Initialize the ROS client library.
Create an instance of the class
CompetitionInterface
as a ROS node.Start the competition.
Log the content of
interface.camera_image
every 5 seconds.
Build Instructions
cmake_minimum_required(VERSION 3.8)
project(ariac_tutorials)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
find_package(ament_cmake REQUIRED)
find_package(ament_cmake_python REQUIRED)
find_package(rclcpp REQUIRED)
find_package(rclpy REQUIRED)
find_package(ariac_msgs REQUIRED)
find_package(orocos_kdl REQUIRED)
# Install the config directory to the package share directory
install(DIRECTORY
config
DESTINATION share/${PROJECT_NAME}
)
# Install Python modules
ament_python_install_package(${PROJECT_NAME} SCRIPTS_DESTINATION lib/${PROJECT_NAME})
# Install Python executables
install(PROGRAMS
scripts/tutorial_1.py
scripts/tutorial_2.py
scripts/tutorial_3.py
DESTINATION lib/${PROJECT_NAME}
)
ament_package()
Package Manifest
This tutorial adds a dependency to the PyKDL module. This is added to the package manifest.
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>ariac_tutorials</name>
<version>0.0.0</version>
<description>Tutorial 3</description>
<maintainer email="[email protected]">Justin Albrecht</maintainer>
<license>Apache License 2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<depend>rclcpp</depend>
<depend>rclpy</depend>
<depend>ariac_msgs</depend>
<depend>geometry_msgs</depend>
<depend>orocos_kdl</depend>
<depend>python3-pykdl</depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>
Test the Sensor Configuration
To test the camera was correctly added to the environment, run the following commands:
cd ~/ariac_ws
colcon build
. install/setup.bash
ros2 launch ariac_gazebo ariac.launch.py trial_name:=tutorial competitor_pkg:=ariac_tutorials
You should see the camera above bins 1-4 as shown in the figure below.

Run the Executable
In terminal 1, run the following commands:
cd ~/ariac_ws rosdep install --from-paths src -y --ignore-src colcon build . install/setup.bash ros2 run ariac_tutorials tutorial_3.py
The rosdep command needs to be run again because the package.xml was updated with a new dependency.
In terminal 2, run the following commands:
cd ~/ariac_ws . install/setup.bash ros2 launch ariac_gazebo ariac.launch.py trial_name:=tutorial competitor_pkg:=ariac_tutorials
Once the environment is loaded and the competition state is ready, the interface node running in terminal 1 will start the competition and the sensor will start publishing data. Each part detected by the camera will be logged to the terminal.