Tutorial 4: Read an Order
Tutorial 4
Prerequisites: Introduction to Tutorials and Tutorial 3
Source Code: https://github.com/jaybrecht/ariac_tutorials/tree/tutorial_4
Switch Branch:
cd ~/ariac_ws/src/ariac_tutorials git switch tutorial_4
This tutorial shows how to read each order published by the ARIAC manager. The following steps are performed:
Receive order messages,
Store each order internally as an instance of a class,
Display each order on the standard output.
Package Structure
Updates and additions that are specific to Tutorial 4 are highlighted in the tree below.
Updated/Created Files
Competition Interface
The competition interface for Tutorial 4 is shown in Listing 41.
1import rclpy
2from rclpy.node import Node
3from rclpy.qos import qos_profile_sensor_data
4from rclpy.parameter import Parameter
5
6from ariac_msgs.msg import (
7 CompetitionState as CompetitionStateMsg,
8 BreakBeamStatus as BreakBeamStatusMsg,
9 AdvancedLogicalCameraImage as AdvancedLogicalCameraImageMsg,
10 Part as PartMsg,
11 PartPose as PartPoseMsg,
12 Order as OrderMsg,
13 AssemblyPart as AssemblyPartMsg,
14 AGVStatus as AGVStatusMsg,
15 AssemblyTask as AssemblyTaskMsg,
16)
17
18from std_srvs.srv import Trigger
19
20from ariac_tutorials.utils import (
21 multiply_pose,
22 rpy_from_quaternion,
23 rad_to_deg_str,
24 AdvancedLogicalCameraImage,
25 Order,
26 KittingTask,
27 CombinedTask,
28 AssemblyTask,
29 KittingPart,
30)
31
32
33class CompetitionInterface(Node):
34 '''
35 Class for a competition interface node.
36
37 Args:
38 Node (rclpy.node.Node): Parent class for ROS nodes
39
40 Raises:
41 KeyboardInterrupt: Exception raised when the user uses Ctrl+C to kill a process
42 '''
43 _competition_states = {
44 CompetitionStateMsg.IDLE: 'idle',
45 CompetitionStateMsg.READY: 'ready',
46 CompetitionStateMsg.STARTED: 'started',
47 CompetitionStateMsg.ORDER_ANNOUNCEMENTS_DONE: 'order_announcements_done',
48 CompetitionStateMsg.ENDED: 'ended',
49 }
50 '''Dictionary for converting CompetitionState constants to strings'''
51
52 _part_colors = {
53 PartMsg.RED: 'red',
54 PartMsg.BLUE: 'blue',
55 PartMsg.GREEN: 'green',
56 PartMsg.ORANGE: 'orange',
57 PartMsg.PURPLE: 'purple',
58 }
59 '''Dictionary for converting Part color constants to strings'''
60
61 _part_colors_emoji = {
62 PartMsg.RED: '🟥',
63 PartMsg.BLUE: '🟦',
64 PartMsg.GREEN: '🟩',
65 PartMsg.ORANGE: '🟧',
66 PartMsg.PURPLE: '🟪',
67 }
68 '''Dictionary for converting Part color constants to emojis'''
69
70 _part_types = {
71 PartMsg.BATTERY: 'battery',
72 PartMsg.PUMP: 'pump',
73 PartMsg.REGULATOR: 'regulator',
74 PartMsg.SENSOR: 'sensor',
75 }
76 '''Dictionary for converting Part type constants to strings'''
77
78 _destinations = {
79 AGVStatusMsg.KITTING: 'kitting station',
80 AGVStatusMsg.ASSEMBLY_FRONT: 'front assembly station',
81 AGVStatusMsg.ASSEMBLY_BACK: 'back assembly station',
82 AGVStatusMsg.WAREHOUSE: 'warehouse',
83 }
84 '''Dictionary for converting AGVDestination constants to strings'''
85
86 _stations = {
87 AssemblyTaskMsg.AS1: 'assembly station 1',
88 AssemblyTaskMsg.AS2: 'assembly station 2',
89 AssemblyTaskMsg.AS3: 'assembly station 3',
90 AssemblyTaskMsg.AS4: 'assembly station 4',
91 }
92 '''Dictionary for converting AssemblyTask constants to strings'''
93
94 def __init__(self):
95 super().__init__('competition_interface')
96
97 sim_time = Parameter(
98 "use_sim_time",
99 rclpy.Parameter.Type.BOOL,
100 True
101 )
102
103 self.set_parameters([sim_time])
104
105 # Service client for starting the competition
106 self._start_competition_client = self.create_client(Trigger, '/ariac/start_competition')
107
108 # Subscriber to the competition state topic
109 self._competition_state_sub = self.create_subscription(
110 CompetitionStateMsg,
111 '/ariac/competition_state',
112 self._competition_state_cb,
113 10)
114
115 # Store the state of the competition
116 self._competition_state: CompetitionStateMsg = None
117
118 # Subscriber to the break beam status topic
119 self._break_beam0_sub = self.create_subscription(
120 BreakBeamStatusMsg,
121 '/ariac/sensors/breakbeam_0/status',
122 self._breakbeam0_cb,
123 qos_profile_sensor_data)
124
125 # Store the number of parts that crossed the beam
126 self._conveyor_part_count = 0
127
128 # Store whether the beam is broken
129 self._object_detected = False
130
131 # Subscriber to the logical camera topic
132 self._advanced_camera0_sub = self.create_subscription(
133 AdvancedLogicalCameraImageMsg,
134 '/ariac/sensors/advanced_camera_0/image',
135 self._advanced_camera0_cb,
136 qos_profile_sensor_data)
137
138 # Store each camera image as an AdvancedLogicalCameraImage object
139 self._camera_image: AdvancedLogicalCameraImage = None
140
141 # Subscriber to the order topic
142 self.orders_sub = self.create_subscription(
143 OrderMsg,
144 '/ariac/orders',
145 self._orders_cb,
146 10)
147
148 # Flag for parsing incoming orders
149 self._parse_incoming_order = False
150
151 # List of orders
152 self._orders = []
153
154 @property
155 def orders(self):
156 return self._orders
157
158 @property
159 def camera_image(self):
160 return self._camera_image
161
162 @property
163 def conveyor_part_count(self):
164 return self._conveyor_part_count
165
166 @property
167 def parse_incoming_order(self):
168 return self._parse_incoming_order
169
170 @parse_incoming_order.setter
171 def parse_incoming_order(self, value):
172 self._parse_incoming_order = value
173
174 def _orders_cb(self, msg: Order):
175 '''Callback for the topic /ariac/orders
176 Arguments:
177 msg -- Order message
178 '''
179 order = Order(msg)
180 self._orders.append(order)
181 if self._parse_incoming_order:
182 self.get_logger().info(self._parse_order(order))
183
184 def _advanced_camera0_cb(self, msg: AdvancedLogicalCameraImageMsg):
185 '''Callback for the topic /ariac/sensors/advanced_camera_0/image
186
187 Arguments:
188 msg -- AdvancedLogicalCameraImage message
189 '''
190 self._camera_image = AdvancedLogicalCameraImage(msg.part_poses,
191 msg.tray_poses,
192 msg.sensor_pose)
193
194 def _breakbeam0_cb(self, msg: BreakBeamStatusMsg):
195 '''Callback for the topic /ariac/sensors/breakbeam_0/status
196
197 Arguments:
198 msg -- BreakBeamStatusMsg message
199 '''
200 if not self._object_detected and msg.object_detected:
201 self._conveyor_part_count += 1
202
203 self._object_detected = msg.object_detected
204
205 def _competition_state_cb(self, msg: CompetitionStateMsg):
206 '''Callback for the topic /ariac/competition_state
207 Arguments:
208 msg -- CompetitionState message
209 '''
210 # Log if competition state has changed
211 if self._competition_state != msg.competition_state:
212 state = CompetitionInterface._competition_states[msg.competition_state]
213 self.get_logger().info(f'Competition state is: {state}', throttle_duration_sec=1.0)
214
215 self._competition_state = msg.competition_state
216
217 def start_competition(self):
218 '''Function to start the competition.
219 '''
220 self.get_logger().info('Waiting for competition to be ready')
221
222 if self._competition_state == CompetitionStateMsg.STARTED:
223 return
224 # Wait for competition to be ready
225 while self._competition_state != CompetitionStateMsg.READY:
226 try:
227 rclpy.spin_once(self)
228 except KeyboardInterrupt:
229 return
230
231 self.get_logger().info('Competition is ready. Starting...')
232
233 # Check if service is available
234 if not self._start_competition_client.wait_for_service(timeout_sec=3.0):
235 self.get_logger().error('Service \'/ariac/start_competition\' is not available.')
236 return
237
238 # Create trigger request and call starter service
239 request = Trigger.Request()
240 future = self._start_competition_client.call_async(request)
241
242 # Wait until the service call is completed
243 rclpy.spin_until_future_complete(self, future)
244
245 if future.result().success:
246 self.get_logger().info('Started competition.')
247 else:
248 self.get_logger().warn('Unable to start competition')
249
250 def parse_advanced_camera_image(self, image: AdvancedLogicalCameraImage) -> str:
251 '''
252 Parse an AdvancedLogicalCameraImage message and return a string representation.
253 '''
254
255 if len(image._part_poses) == 0:
256 return 'No parts detected'
257
258 output = '\n\n'
259 for i, part_pose in enumerate(image._part_poses):
260 part_pose: PartPoseMsg
261 output += '==========================\n'
262 part_color = CompetitionInterface._part_colors[part_pose.part.color].capitalize()
263 part_color_emoji = CompetitionInterface._part_colors_emoji[part_pose.part.color]
264 part_type = CompetitionInterface._part_types[part_pose.part.type].capitalize()
265 output += f'Part {i+1}: {part_color_emoji} {part_color} {part_type}\n'
266 output += '--------------------------\n'
267 output += 'Camera Frame\n'
268 output += '--------------------------\n'
269
270 output += ' Position:\n'
271 output += f' x: {part_pose.pose.position.x:.3f} (m)\n'
272 output += f' y: {part_pose.pose.position.y:.3f} (m)\n'
273 output += f' z: {part_pose.pose.position.z:.3f} (m)\n'
274
275 roll, pitch, yaw = rpy_from_quaternion(part_pose.pose.orientation)
276 output += ' Orientation:\n'
277 output += f' roll: {rad_to_deg_str(roll)}\n'
278 output += f' pitch: {rad_to_deg_str(pitch)}\n'
279 output += f' yaw: {rad_to_deg_str(yaw)}\n'
280
281 part_world_pose = multiply_pose(image._sensor_pose, part_pose.pose)
282 output += '--------------------------\n'
283 output += 'World Frame\n'
284 output += '--------------------------\n'
285
286 output += ' Position:\n'
287 output += f' x: {part_world_pose.position.x:.3f} (m)\n'
288 output += f' y: {part_world_pose.position.y:.3f} (m)\n'
289 output += f' z: {part_world_pose.position.z:.3f} (m)\n'
290
291 roll, pitch, yaw = rpy_from_quaternion(part_world_pose.orientation)
292 output += ' Orientation:\n'
293 output += f' roll: {rad_to_deg_str(roll)}\n'
294 output += f' pitch: {rad_to_deg_str(pitch)}\n'
295 output += f' yaw: {rad_to_deg_str(yaw)}\n'
296
297 output += '==========================\n\n'
298
299 return output
300
301 def _parse_kitting_task(self, kitting_task: KittingTask):
302 '''
303 Parses a KittingTask object and returns a string representation.
304
305 Args:
306 kitting_task (KittingTask): KittingTask object to parse
307
308 Returns:
309 str: String representation of the KittingTask object
310 '''
311 output = 'Type: Kitting\n'
312 output += '==========================\n'
313 output += f'AGV: {kitting_task.agv_number}\n'
314 output += f'Destination: {CompetitionInterface._destinations[kitting_task.destination]}\n'
315 output += f'Tray ID: {kitting_task.tray_id}\n'
316 output += 'Products:\n'
317 output += '==========================\n'
318
319 quadrants = {1: "Quadrant 1: -",
320 2: "Quadrant 2: -",
321 3: "Quadrant 3: -",
322 4: "Quadrant 4: -"}
323
324 for i in range(1, 5):
325 product: KittingPart
326 for product in kitting_task.parts:
327 if i == product.quadrant:
328 part_color = CompetitionInterface._part_colors[product.part.color].capitalize()
329 part_color_emoji = CompetitionInterface._part_colors_emoji[product.part.color]
330 part_type = CompetitionInterface._part_types[product.part.type].capitalize()
331 quadrants[i] = f'Quadrant {i}: {part_color_emoji} {part_color} {part_type}'
332 output += f'\t{quadrants[1]}\n'
333 output += f'\t{quadrants[2]}\n'
334 output += f'\t{quadrants[3]}\n'
335 output += f'\t{quadrants[4]}\n'
336
337 return output
338
339 def _parse_assembly_task(self, assembly_task: AssemblyTask):
340 '''
341 Parses an AssemblyTask object and returns a string representation.
342
343 Args:
344 assembly_task (AssemblyTask): AssemblyTask object to parse
345
346 Returns:
347 str: String representation of the AssemblyTask object
348 '''
349 output = 'Type: Assembly\n'
350 output += '==========================\n'
351 if len(assembly_task.agv_numbers) == 1:
352 output += f'AGV: {assembly_task.agv_number[0]}\n'
353 elif len(assembly_task.agv_numbers) == 2:
354 output += f'AGV(s): [{assembly_task.agv_numbers[0]}, {assembly_task.agv_numbers[1]}]\n'
355 output += f'Station: {self._stations[assembly_task.station].title()}\n'
356 output += 'Products:\n'
357 output += '==========================\n'
358
359 product: AssemblyPartMsg
360 for product in assembly_task.parts:
361 part_color = CompetitionInterface._part_colors[product.part.color].capitalize()
362 part_color_emoji = CompetitionInterface._part_colors_emoji[product.part.color]
363 part_type = CompetitionInterface._part_types[product.part.type].capitalize()
364
365 output += f'Part: {part_color_emoji} {part_color} {part_type}\n'
366
367 output += ' Position:\n'
368 output += f' x: {product.assembled_pose.pose.position.x:.3f} (m)\n'
369 output += f' y: {product.assembled_pose.pose.position.y:.3f} (m)\n'
370 output += f' z: {product.assembled_pose.pose.position.z:.3f} (m)\n'
371
372 roll, pitch, yaw = rpy_from_quaternion(product.assembled_pose.pose.orientation)
373 output += ' Orientation:\n'
374 output += f' roll: {rad_to_deg_str(roll)}\n'
375 output += f' pitch: {rad_to_deg_str(pitch)}\n'
376 output += f' yaw: {rad_to_deg_str(yaw)}\n'
377
378 output += f' Install direction:\n'
379 output += f' x: {product.install_direction.x:.1f}\n'
380 output += f' y: {product.install_direction.y:.1f}\n'
381 output += f' z: {product.install_direction.z:.1f}\n'
382
383 return output
384
385 def _parse_combined_task(self, combined_task: CombinedTask):
386 '''
387 Parses a CombinedTask object and returns a string representation.
388
389 Args:
390 combined_task (CombinedTask): CombinedTask object to parse
391
392 Returns:
393 str: String representation of the CombinedTask object
394 '''
395
396 output = 'Type: Combined\n'
397 output += '==========================\n'
398 output += f'Station: {self._stations[combined_task.station].title()}\n'
399 output += 'Products:\n'
400 output += '==========================\n'
401
402 product: AssemblyPartMsg
403 for product in combined_task.parts:
404 part_color = CompetitionInterface._part_colors[product.part.color].capitalize()
405 part_color_emoji = CompetitionInterface._part_colors_emoji[product.part.color]
406 part_type = CompetitionInterface._part_types[product.part.type].capitalize()
407
408 output += f'Part: {part_color_emoji} {part_color} {part_type}\n'
409
410 output += ' Position:\n'
411 output += f' x: {product.assembled_pose.pose.position.x:.3f} (m)\n'
412 output += f' y: {product.assembled_pose.pose.position.y:.3f} (m)\n'
413 output += f' z: {product.assembled_pose.pose.position.z:.3f} (m)\n'
414
415 roll, pitch, yaw = rpy_from_quaternion(product.assembled_pose.pose.orientation)
416 output += ' Orientation:\n'
417 output += f' roll: {rad_to_deg_str(roll)}\n'
418 output += f' pitch: {rad_to_deg_str(pitch)}\n'
419 output += f' yaw: {rad_to_deg_str(yaw)}\n'
420
421 output += f' Install direction:\n'
422 output += f' x: {product.install_direction.x:.1f}\n'
423 output += f' y: {product.install_direction.y:.1f}\n'
424 output += f' z: {product.install_direction.z:.1f}\n'
425
426 return output
427
428 def _parse_order(self, order: Order):
429 '''Parse an order message and return a string representation.
430
431 Args:
432 order (Order) -- Order message
433
434 Returns:
435 String representation of the order message
436 '''
437 output = '\n\n==========================\n'
438 output += f'Received Order: {order.order_id}\n'
439 output += f'Priority: {order.order_priority}\n'
440
441 if order.order_type == OrderMsg.KITTING:
442 output += self._parse_kitting_task(order.order_task)
443 elif order.order_type == OrderMsg.ASSEMBLY:
444 output += self._parse_assembly_task(order.order_task)
445 elif order.order_type == OrderMsg.COMBINED:
446 output += self._parse_combined_task(order.order_task)
447 else:
448 output += 'Type: Unknown\n'
449 return output
Code Explanation
The competition interface from Tutorial 3 was augmented with the components described below.
Imports
Order
: Message class that contains the order information (ariac_msgs/msg/Order).AssemblyPart
: Message class that contains the assembly part information (ariac_msgs/msg/AssemblyPart).AssemblyTask
: Message class that contains the assembly task information (ariac_msgs/msg/AssemblyTask).AGVStatus
: Message class that contains the AGV status information ( ariac_msgs/msg/AGVStatus).
Class Variables
_destinations
is a dictionary that maps the integer values of the AGV destination to their string representations._stations
is a dictionary that maps the integer values of the assembly stations to their string representations.
Instance Variables
_orders_sub
: ROS subscriber to the topic /ariac/orders.self._parse_incoming_order
: Flag for logging an order in the terminal. If the flag is set toTrue
, the order is logged in the terminal. If the flag is set toFalse
, the order is not logged in the terminal.self._orders
: List of orders. Each order announced by the competition interface is stored in this list.
Instance Methods
orders(self)
: Getter for the list of ordersself._orders
parse_incoming_order(self)
Getter for the flagself._parse_incoming_order
parse_incoming_order(self, value)
Setter for the flagself._parse_incoming_order
_orders_cb(self, msg: OrderMsg)
: Callback method for the subscriber to the topic /ariac/orders. It parses the order and stores it in the list of ordersself._orders
_parse_order(self, order: Order)
: Parses an order message and returns a string representation. This method calls the appropriate parsing method based on the type of the order._parse_kitting_task(self, kitting_task: KittingTask)
: Parses aKittingTask
object and returns a string representation._parse_assembly_task(self, assembly_task: AssemblyTask)
: Parses anAssemblyTask
object and returns a string representation._parse_combined_task(self, combined_task: CombinedTask)
: Parses aCombinedTask
object and returns a string representation.
Executable
#!/usr/bin/env python3
'''
To test this script, run the following commands in separate terminals:
- ros2 launch ariac_gazebo ariac.launch.py trial_name:=tutorial competitor_pkg:=ariac_tutorials
- ros2 run ariac_tutorials tutorial_4.py
'''
import rclpy
from ariac_tutorials.competition_interface import CompetitionInterface
def main(args=None):
rclpy.init(args=args)
interface = CompetitionInterface()
interface.start_competition()
# The following line enables order displays in the terminal.
# Set to False to disable.
interface.parse_incoming_order = True
while rclpy.ok():
try:
rclpy.spin_once(interface)
except KeyboardInterrupt:
break
interface.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Code Explanation
This executable does the following:
Initialize the ROS client library.
Create an instance of the class
CompetitionInterface
as a ROS node.Start the competition.
Log each published message to the terminal.
Build Instructions
cmake_minimum_required(VERSION 3.8)
project(ariac_tutorials)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
find_package(ament_cmake REQUIRED)
find_package(ament_cmake_python REQUIRED)
find_package(rclcpp REQUIRED)
find_package(rclpy REQUIRED)
find_package(ariac_msgs REQUIRED)
find_package(orocos_kdl REQUIRED)
# Install the config directory to the package share directory
install(DIRECTORY
config
DESTINATION share/${PROJECT_NAME}
)
# Install Python modules
ament_python_install_package(${PROJECT_NAME} SCRIPTS_DESTINATION lib/${PROJECT_NAME})
# Install Python executables
install(PROGRAMS
scripts/tutorial_1.py
scripts/tutorial_2.py
scripts/tutorial_3.py
scripts/tutorial_4.py
DESTINATION lib/${PROJECT_NAME}
)
ament_package()
Run the Executable
In terminal 1, run the following commands:
cd ~/ariac_ws colcon build . install/setup.bash ros2 run ariac_tutorials tutorial_4.py
The node will wait until the competition is ready.
In terminal 2, run the following commands:
cd ~/ariac_ws . install/setup.bash ros2 launch ariac_gazebo ariac.launch.py trial_name:=tutorial competitor_pkg:=ariac_tutorials
Once the environment is loaded and the competition state is ready, the interface node running in terminal 1 will start the competition and published orders will be displayed on the standard output.