Tutorial 6: Enable/Disable a Gripper

Tutorial 6

This tutorial shows how to use service calls to enable and disable the gripper of the floor robot.

Package Structure

Updates and additions that are specific to Tutorial 6 are highlighted in the tree below.

ariac_tutorials
├── CMakeLists.txt
├── package.xml
├── config
│   └── sensors.yaml
├── ariac_tutorials
│   ├── __init__.py
│   ├── utils.py
│   └── competition_interface.py
└── scripts
    ├── tutorial_1.py
    ├── tutorial_2.py
    ├── tutorial_3.py
    ├── tutorial_4.py
    ├── tutorial_5.py
    └── tutorial_6.py

Updated/Created Files

Competition Interface

The competition interface for Tutorial 6 is shown in Listing 49.

Listing 49 competition_interface.py
  1import rclpy
  2from rclpy.time import Duration
  3from rclpy.node import Node
  4from rclpy.qos import qos_profile_sensor_data
  5from rclpy.parameter import Parameter
  6
  7from ariac_msgs.msg import (
  8    CompetitionState as CompetitionStateMsg,
  9    BreakBeamStatus as BreakBeamStatusMsg,
 10    AdvancedLogicalCameraImage as AdvancedLogicalCameraImageMsg,
 11    Part as PartMsg,
 12    PartPose as PartPoseMsg,
 13    Order as OrderMsg,
 14    AssemblyPart as AssemblyPartMsg,
 15    AGVStatus as AGVStatusMsg,
 16    AssemblyTask as AssemblyTaskMsg,
 17    VacuumGripperState,
 18)
 19
 20from ariac_msgs.srv import (
 21    MoveAGV,
 22    VacuumGripperControl
 23)
 24
 25from std_srvs.srv import Trigger
 26
 27from ariac_tutorials.utils import (
 28    multiply_pose,
 29    rpy_from_quaternion,
 30    rad_to_deg_str,
 31    AdvancedLogicalCameraImage,
 32    Order,
 33    KittingTask,
 34    CombinedTask,
 35    AssemblyTask,
 36    KittingPart
 37)
 38
 39class CompetitionInterface(Node):
 40    '''
 41    Class for a competition interface node.
 42
 43    Args:
 44        Node (rclpy.node.Node): Parent class for ROS nodes
 45
 46    Raises:
 47        KeyboardInterrupt: Exception raised when the user uses Ctrl+C to kill a process
 48    '''
 49    _competition_states = {
 50        CompetitionStateMsg.IDLE: 'idle',
 51        CompetitionStateMsg.READY: 'ready',
 52        CompetitionStateMsg.STARTED: 'started',
 53        CompetitionStateMsg.ORDER_ANNOUNCEMENTS_DONE: 'order_announcements_done',
 54        CompetitionStateMsg.ENDED: 'ended',
 55    }
 56    '''Dictionary for converting CompetitionState constants to strings'''
 57
 58    _part_colors = {
 59        PartMsg.RED: 'red',
 60        PartMsg.BLUE: 'blue',
 61        PartMsg.GREEN: 'green',
 62        PartMsg.ORANGE: 'orange',
 63        PartMsg.PURPLE: 'purple',
 64    }
 65    '''Dictionary for converting Part color constants to strings'''
 66
 67    _part_colors_emoji = {
 68        PartMsg.RED: '🟥',
 69        PartMsg.BLUE: '🟦',
 70        PartMsg.GREEN: '🟩',
 71        PartMsg.ORANGE: '🟧',
 72        PartMsg.PURPLE: '🟪',
 73    }
 74    '''Dictionary for converting Part color constants to emojis'''
 75
 76    _part_types = {
 77        PartMsg.BATTERY: 'battery',
 78        PartMsg.PUMP: 'pump',
 79        PartMsg.REGULATOR: 'regulator',
 80        PartMsg.SENSOR: 'sensor',
 81    }
 82    '''Dictionary for converting Part type constants to strings'''
 83
 84    _destinations = {
 85        AGVStatusMsg.KITTING: 'kitting station',
 86        AGVStatusMsg.ASSEMBLY_FRONT: 'front assembly station',
 87        AGVStatusMsg.ASSEMBLY_BACK: 'back assembly station',
 88        AGVStatusMsg.WAREHOUSE: 'warehouse',
 89    }
 90    '''Dictionary for converting AGVDestination constants to strings'''
 91
 92    _stations = {
 93        AssemblyTaskMsg.AS1: 'assembly station 1',
 94        AssemblyTaskMsg.AS2: 'assembly station 2',
 95        AssemblyTaskMsg.AS3: 'assembly station 3',
 96        AssemblyTaskMsg.AS4: 'assembly station 4',
 97    }
 98    '''Dictionary for converting AssemblyTask constants to strings'''
 99
100    _gripper_states = {
101        True: 'enabled',
102        False: 'disabled'
103    }
104    '''Dictionary for converting VacuumGripperState constants to strings'''
105
106    def __init__(self):
107        super().__init__('competition_interface')
108
109        sim_time = Parameter(
110            "use_sim_time",
111            rclpy.Parameter.Type.BOOL,
112            True
113        )
114
115        self.set_parameters([sim_time])
116
117        # Service client for starting the competition
118        self._start_competition_client = self.create_client(Trigger, '/ariac/start_competition')
119
120        # Subscriber to the competition state topic
121        self._competition_state_sub = self.create_subscription(
122            CompetitionStateMsg,
123            '/ariac/competition_state',
124            self._competition_state_cb,
125            10)
126
127        # Store the state of the competition
128        self._competition_state: CompetitionStateMsg = None
129
130        # Subscriber to the break beam status topic
131        self._break_beam0_sub = self.create_subscription(
132            BreakBeamStatusMsg,
133            '/ariac/sensors/breakbeam_0/status',
134            self._breakbeam0_cb,
135            qos_profile_sensor_data)
136
137        # Store the number of parts that crossed the beam
138        self._conveyor_part_count = 0
139
140        # Store whether the beam is broken
141        self._object_detected = False
142
143        # Subscriber to the logical camera topic
144        self._advanced_camera0_sub = self.create_subscription(
145            AdvancedLogicalCameraImageMsg,
146            '/ariac/sensors/advanced_camera_0/image',
147            self._advanced_camera0_cb,
148            qos_profile_sensor_data)
149
150        # Store each camera image as an AdvancedLogicalCameraImage object
151        self._camera_image: AdvancedLogicalCameraImage = None
152
153        # Subscriber to the order topic
154        self.orders_sub = self.create_subscription(
155            OrderMsg,
156            '/ariac/orders',
157            self._orders_cb,
158            10)
159
160        # Flag for parsing incoming orders
161        self._parse_incoming_order = False
162
163        # List of orders
164        self._orders = []
165
166        # Subscriber to the floor gripper state topic
167        self._floor_robot_gripper_state_sub = self.create_subscription(
168            VacuumGripperState,
169            '/ariac/floor_robot_gripper_state',
170            self._floor_robot_gripper_state_cb,
171            qos_profile_sensor_data)
172
173        # Service client for turning on/off the vacuum gripper on the floor robot
174        self._floor_gripper_enable = self.create_client(
175            VacuumGripperControl,
176            "/ariac/floor_robot_enable_gripper")
177
178        # Attribute to store the current state of the floor robot gripper
179        self._floor_robot_gripper_state = VacuumGripperState()
180
181    @property
182    def orders(self):
183        return self._orders
184
185    @property
186    def camera_image(self):
187        return self._camera_image
188
189    @property
190    def conveyor_part_count(self):
191        return self._conveyor_part_count
192
193    @property
194    def parse_incoming_order(self):
195        return self._parse_incoming_order
196
197    @parse_incoming_order.setter
198    def parse_incoming_order(self, value):
199        self._parse_incoming_order = value
200
201    def _orders_cb(self, msg: Order):
202        '''Callback for the topic /ariac/orders
203        Arguments:
204            msg -- Order message
205        '''
206        order = Order(msg)
207        self._orders.append(order)
208        if self._parse_incoming_order:
209            self.get_logger().info(self._parse_order(order))
210
211    def _advanced_camera0_cb(self, msg: AdvancedLogicalCameraImageMsg):
212        '''Callback for the topic /ariac/sensors/advanced_camera_0/image
213
214        Arguments:
215            msg -- AdvancedLogicalCameraImage message
216        '''
217        self._camera_image = AdvancedLogicalCameraImage(msg.part_poses,
218                                                        msg.tray_poses,
219                                                        msg.sensor_pose)
220
221    def _breakbeam0_cb(self, msg: BreakBeamStatusMsg):
222        '''Callback for the topic /ariac/sensors/breakbeam_0/status
223
224        Arguments:
225            msg -- BreakBeamStatusMsg message
226        '''
227        if not self._object_detected and msg.object_detected:
228            self._conveyor_part_count += 1
229
230        self._object_detected = msg.object_detected
231
232    def _competition_state_cb(self, msg: CompetitionStateMsg):
233        '''Callback for the topic /ariac/competition_state
234        Arguments:
235            msg -- CompetitionState message
236        '''
237        # Log if competition state has changed
238        if self._competition_state != msg.competition_state:
239            state = CompetitionInterface._competition_states[msg.competition_state]
240            self.get_logger().info(f'Competition state is: {state}', throttle_duration_sec=1.0)
241
242        self._competition_state = msg.competition_state
243
244    def _floor_robot_gripper_state_cb(self, msg: VacuumGripperState):
245        '''Callback for the topic /ariac/floor_robot_gripper_state
246
247        Arguments:
248            msg -- VacuumGripperState message
249        '''
250        self._floor_robot_gripper_state = msg
251
252    def start_competition(self):
253        '''Function to start the competition.
254        '''
255        self.get_logger().info('Waiting for competition to be ready')
256
257        if self._competition_state == CompetitionStateMsg.STARTED:
258            return
259        # Wait for competition to be ready
260        while self._competition_state != CompetitionStateMsg.READY:
261            try:
262                rclpy.spin_once(self)
263            except KeyboardInterrupt:
264                return
265
266        self.get_logger().info('Competition is ready. Starting...')
267
268        # Check if service is available
269        if not self._start_competition_client.wait_for_service(timeout_sec=3.0):
270            self.get_logger().error('Service \'/ariac/start_competition\' is not available.')
271            return
272
273        # Create trigger request and call starter service
274        request = Trigger.Request()
275        future = self._start_competition_client.call_async(request)
276
277        # Wait until the service call is completed
278        rclpy.spin_until_future_complete(self, future)
279
280        if future.result().success:
281            self.get_logger().info('Started competition.')
282        else:
283            self.get_logger().warn('Unable to start competition')
284
285    def parse_advanced_camera_image(self, image: AdvancedLogicalCameraImage) -> str:
286        '''
287        Parse an AdvancedLogicalCameraImage message and return a string representation.
288        '''
289
290        if len(image._part_poses) == 0:
291            return 'No parts detected'
292
293        output = '\n\n'
294        for i, part_pose in enumerate(image._part_poses):
295            part_pose: PartPoseMsg
296            output += '==========================\n'
297            part_color = CompetitionInterface._part_colors[part_pose.part.color].capitalize()
298            part_color_emoji = CompetitionInterface._part_colors_emoji[part_pose.part.color]
299            part_type = CompetitionInterface._part_types[part_pose.part.type].capitalize()
300            output += f'Part {i+1}: {part_color_emoji} {part_color} {part_type}\n'
301            output += '--------------------------\n'
302            output += 'Camera Frame\n'
303            output += '--------------------------\n'
304
305            output += '  Position:\n'
306            output += f'    x: {part_pose.pose.position.x:.3f} (m)\n'
307            output += f'    y: {part_pose.pose.position.y:.3f} (m)\n'
308            output += f'    z: {part_pose.pose.position.z:.3f} (m)\n'
309
310            roll, pitch, yaw = rpy_from_quaternion(part_pose.pose.orientation)
311            output += '  Orientation:\n'
312            output += f'    roll: {rad_to_deg_str(roll)}\n'
313            output += f'    pitch: {rad_to_deg_str(pitch)}\n'
314            output += f'    yaw: {rad_to_deg_str(yaw)}\n'
315
316            part_world_pose = multiply_pose(image._sensor_pose, part_pose.pose)
317            output += '--------------------------\n'
318            output += 'World Frame\n'
319            output += '--------------------------\n'
320
321            output += '  Position:\n'
322            output += f'    x: {part_world_pose.position.x:.3f} (m)\n'
323            output += f'    y: {part_world_pose.position.y:.3f} (m)\n'
324            output += f'    z: {part_world_pose.position.z:.3f} (m)\n'
325
326            roll, pitch, yaw = rpy_from_quaternion(part_world_pose.orientation)
327            output += '  Orientation:\n'
328            output += f'    roll: {rad_to_deg_str(roll)}\n'
329            output += f'    pitch: {rad_to_deg_str(pitch)}\n'
330            output += f'    yaw: {rad_to_deg_str(yaw)}\n'
331
332            output += '==========================\n\n'
333
334        return output
335
336    def _parse_kitting_task(self, kitting_task: KittingTask):
337        '''
338        Parses a KittingTask object and returns a string representation.
339        Args:
340            kitting_task (KittingTask): KittingTask object to parse
341        Returns:
342            str: String representation of the KittingTask object
343        '''
344        output = 'Type: Kitting\n'
345        output += '==========================\n'
346        output += f'AGV: {kitting_task.agv_number}\n'
347        output += f'Destination: {CompetitionInterface._destinations[kitting_task.destination]}\n'
348        output += f'Tray ID: {kitting_task.tray_id}\n'
349        output += 'Products:\n'
350        output += '==========================\n'
351
352        quadrants = {1: "Quadrant 1: -",
353                    2: "Quadrant 2: -",
354                    3: "Quadrant 3: -",
355                    4: "Quadrant 4: -"}
356
357        for i in range(1, 5):
358            product: KittingPart
359            for product in kitting_task.parts:
360                if i == product.quadrant:
361                    part_color = CompetitionInterface._part_colors[product.part.color].capitalize()
362                    part_color_emoji = CompetitionInterface._part_colors_emoji[product.part.color]
363                    part_type = CompetitionInterface._part_types[product.part.type].capitalize()
364                    quadrants[i] = f'Quadrant {i}: {part_color_emoji} {part_color} {part_type}'
365        output += f'\t{quadrants[1]}\n'
366        output += f'\t{quadrants[2]}\n'
367        output += f'\t{quadrants[3]}\n'
368        output += f'\t{quadrants[4]}\n'
369
370        return output
371
372    def _parse_assembly_task(self, assembly_task: AssemblyTask):
373        '''
374        Parses an AssemblyTask object and returns a string representation.
375
376        Args:
377            assembly_task (AssemblyTask): AssemblyTask object to parse
378
379        Returns:
380            str: String representation of the AssemblyTask object
381        '''
382        output = 'Type: Assembly\n'
383        output += '==========================\n'
384        if len(assembly_task.agv_numbers) == 1:
385            output += f'AGV: {assembly_task.agv_number[0]}\n'
386        elif len(assembly_task.agv_numbers) == 2:
387            output += f'AGV(s): [{assembly_task.agv_numbers[0]}, {assembly_task.agv_numbers[1]}]\n'
388        output += f'Station: {self._stations[assembly_task.station].title()}\n'
389        output += 'Products:\n'
390        output += '==========================\n'
391
392        product: AssemblyPartMsg
393        for product in assembly_task.parts:
394            part_color = CompetitionInterface._part_colors[product.part.color].capitalize()
395            part_color_emoji = CompetitionInterface._part_colors_emoji[product.part.color]
396            part_type = CompetitionInterface._part_types[product.part.type].capitalize()
397
398            output += f'Part: {part_color_emoji} {part_color} {part_type}\n'
399
400            output += '  Position:\n'
401            output += f'    x: {product.assembled_pose.pose.position.x:.3f} (m)\n'
402            output += f'    y: {product.assembled_pose.pose.position.y:.3f} (m)\n'
403            output += f'    z: {product.assembled_pose.pose.position.z:.3f} (m)\n'
404
405            roll, pitch, yaw = rpy_from_quaternion(product.assembled_pose.pose.orientation)
406            output += '  Orientation:\n'
407            output += f'    roll: {rad_to_deg_str(roll)}\n'
408            output += f'    pitch: {rad_to_deg_str(pitch)}\n'
409            output += f'    yaw: {rad_to_deg_str(yaw)}\n'
410
411            output += f'  Install direction:\n'
412            output += f'    x: {product.install_direction.x:.1f}\n'
413            output += f'    y: {product.install_direction.y:.1f}\n'
414            output += f'    z: {product.install_direction.z:.1f}\n'
415
416        return output
417
418    def _parse_combined_task(self, combined_task: CombinedTask):
419        '''
420        Parses a CombinedTask object and returns a string representation.
421
422        Args:
423            combined_task (CombinedTask): CombinedTask object to parse
424
425        Returns:
426            str: String representation of the CombinedTask object
427        '''
428
429        output = 'Type: Combined\n'
430        output += '==========================\n'
431        output += f'Station: {self._stations[combined_task.station].title()}\n'
432        output += 'Products:\n'
433        output += '==========================\n'
434
435        product: AssemblyPartMsg
436        for product in combined_task.parts:
437            part_color = CompetitionInterface._part_colors[product.part.color].capitalize()
438            part_color_emoji = CompetitionInterface._part_colors_emoji[product.part.color]
439            part_type = CompetitionInterface._part_types[product.part.type].capitalize()
440
441            output += f'Part: {part_color_emoji} {part_color} {part_type}\n'
442
443            output += '  Position:\n'
444            output += f'    x: {product.assembled_pose.pose.position.x:.3f} (m)\n'
445            output += f'    y: {product.assembled_pose.pose.position.y:.3f} (m)\n'
446            output += f'    z: {product.assembled_pose.pose.position.z:.3f} (m)\n'
447
448            roll, pitch, yaw = rpy_from_quaternion(product.assembled_pose.pose.orientation)
449            output += '  Orientation:\n'
450            output += f'    roll: {rad_to_deg_str(roll)}\n'
451            output += f'    pitch: {rad_to_deg_str(pitch)}\n'
452            output += f'    yaw: {rad_to_deg_str(yaw)}\n'
453
454            output += f'  Install direction:\n'
455            output += f'    x: {product.install_direction.x:.1f}\n'
456            output += f'    y: {product.install_direction.y:.1f}\n'
457            output += f'    z: {product.install_direction.z:.1f}\n'
458
459        return output
460
461    def _parse_order(self, order: Order):
462        '''Parse an order message and return a string representation.
463        Args:
464            order (Order) -- Order message
465        Returns:
466            String representation of the order message
467        '''
468        output = '\n\n==========================\n'
469        output += f'Received Order: {order.order_id}\n'
470        output += f'Priority: {order.order_priority}\n'
471
472        if order.order_type == OrderMsg.KITTING:
473            output += self._parse_kitting_task(order.order_task)
474        elif order.order_type == OrderMsg.ASSEMBLY:
475            output += self._parse_assembly_task(order.order_task)
476        elif order.order_type == OrderMsg.COMBINED:
477            output += self._parse_combined_task(order.order_task)
478        else:
479            output += 'Type: Unknown\n'
480        return output
481
482    def lock_agv_tray(self, num):
483        '''
484        Lock the tray of an AGV and parts on the tray. This will prevent tray and parts from moving during transport.
485        Args:
486            num (int):  AGV number
487        Raises:
488            KeyboardInterrupt: Exception raised when the user presses Ctrl+C
489        '''
490
491        # Create a client to send a request to the `/ariac/agv{num}_lock_tray` service
492        tray_locker = self.create_client(
493            Trigger,
494            f'/ariac/agv{num}_lock_tray'
495        )
496
497        # Build the request
498        request = Trigger.Request()
499        # Send the request
500        future = tray_locker.call_async(request)
501
502        # Wait for the response
503        try:
504            rclpy.spin_until_future_complete(self, future)
505        except KeyboardInterrupt as kb_error:
506            raise KeyboardInterrupt from kb_error
507
508        # Check the response
509        if future.result().success:
510            self.get_logger().info(f'Locked AGV{num}\'s tray')
511        else:
512            self.get_logger().warn('Unable to lock tray')
513
514    def move_agv_to_station(self, num, station):
515        '''
516        Move an AGV to an assembly station.
517        Args:
518            num (int): AGV number
519            station (int): Assembly station number
520        Raises:
521            KeyboardInterrupt: Exception raised when the user presses Ctrl+C
522        '''
523
524        # Create a client to send a request to the `/ariac/move_agv` service.
525        mover = self.create_client(
526            MoveAGV,
527            f'/ariac/move_agv{num}')
528
529        # Create a request object.
530        request = MoveAGV.Request()
531
532        # Set the request location.
533        if station in [AssemblyTaskMsg.AS1, AssemblyTaskMsg.AS3]:
534            request.location = MoveAGV.Request.ASSEMBLY_FRONT
535        else:
536            request.location = MoveAGV.Request.ASSEMBLY_BACK
537
538        # Send the request.
539        future = mover.call_async(request)
540
541        # Wait for the server to respond.
542        try:
543            rclpy.spin_until_future_complete(self, future)
544        except KeyboardInterrupt as kb_error:
545            raise KeyboardInterrupt from kb_error
546
547        # Check the result of the service call.
548        if future.result().success:
549            self.get_logger().info(f'Moved AGV{num} to {self._stations[station]}')
550        else:
551            self.get_logger().warn(future.result().message)
552
553    def set_floor_robot_gripper_state(self, state):
554        '''Set the gripper state of the floor robot.
555
556        Arguments:
557            state -- True to enable, False to disable
558
559        Raises:
560            KeyboardInterrupt: Exception raised when the user presses Ctrl+C
561        '''
562        if self._floor_robot_gripper_state.enabled == state:
563            self.get_logger().warn(f'Gripper is already {self._gripper_states[state]}')
564            return
565
566        request = VacuumGripperControl.Request()
567        request.enable = state
568
569        future = self._floor_gripper_enable.call_async(request)
570
571        try:
572            rclpy.spin_until_future_complete(self, future)
573        except KeyboardInterrupt as kb_error:
574            raise KeyboardInterrupt from kb_error
575
576        if future.result().success:
577            self.get_logger().info(f'Changed gripper state to {self._gripper_states[state]}')
578        else:
579            self.get_logger().warn('Unable to change gripper state')
580
581    def wait(self, duration):
582        '''Wait for a specified duration.
583
584        Arguments:
585            duration -- Duration to wait in seconds
586
587        Raises:
588            KeyboardInterrupt: Exception raised when the user presses Ctrl+C
589        '''
590        start = self.get_clock().now()
591
592        while self.get_clock().now() <= start + Duration(seconds=duration):
593            try:
594                rclpy.spin_once(self)
595            except KeyboardInterrupt as kb_error:
596                raise KeyboardInterrupt from kb_error

Code Explanation

The competition interface from Tutorial 5 was augmented with the components described below.

  • Imports

    • Duration: A wait(self, duration) method is implemented using the Duration class. The wait(self, duration) method is used to wait for a specified duration while the gripper state is being changed.

    • VacuumGripperState: Message class for the vacuum gripper state (ariac_msgs/msg/VacuumGripperState). This message is used to determine if the gripper is enabled or disabled, if an object is attached, and the type of the gripper.

    • VacuumGripperControl: Service type for controlling the vacuum gripper (ariac_msgs/srv/VacuumGripperControl).

  • Class Variables

    • _gripper_states: A dictionary for converting the VacuumGripperState constants to strings. This is used for logging the gripper state.

  • Instance Variables

    • _floor_robot_gripper_state_sub: Subscriber to the floor robot gripper state topic.

    • _floor_gripper_enable: Service client for turning on/off the vacuum gripper on the floor robot.

    • _floor_robot_gripper_state: Attribute to store the current state of the floor robot gripper.

  • Instance Methods

    • _floor_robot_gripper_state_cb(self, msg: VacuumGripperState): Callback for the topic /ariac/floor_robot_gripper_state. This is used to store the current state of the floor robot gripper.

    • set_floor_robot_gripper_state(self, state): Function to set the gripper state of the floor robot. This function calls the ROS service to change the gripper state.

    • wait(self, duration): Function to wait for a specified duration. This function is used to wait for the gripper state to change.

Executable

Listing 50 tutorial_6.py
#!/usr/bin/env python3
'''
To test this script, run the following commands in separate terminals:
- ros2 launch ariac_gazebo ariac.launch.py trial_name:=tutorial competitor_pkg:=ariac_tutorials
- ros2 run ariac_tutorials tutorial_6.py
'''

import rclpy
from ariac_tutorials.competition_interface import CompetitionInterface


def main(args=None):
    rclpy.init(args=args)
    interface = CompetitionInterface()
    interface.start_competition()

    while rclpy.ok():
        try:
            interface.set_floor_robot_gripper_state(True)
            interface.wait(3)
            interface.set_floor_robot_gripper_state(False)
            interface.wait(3)
        except KeyboardInterrupt:
            break

    interface.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

Code Explanation

This executable does the following:

  • Initialize the ROS client library.

  • Create an instance of the class CompetitionInterface as a ROS node.

  • Start the competition.

  • In a while loop:

    • Enable the gripper of the floor robot.

    • Wait for 3 seconds.

    • Disable the gripper of the floor robot.

    • Wait for 3 seconds.

Build Instructions

Listing 51 CMakeLists.txt
cmake_minimum_required(VERSION 3.8)
project(ariac_tutorials)

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()

find_package(ament_cmake REQUIRED)
find_package(ament_cmake_python REQUIRED)
find_package(rclcpp REQUIRED)
find_package(rclpy REQUIRED)
find_package(ariac_msgs REQUIRED)
find_package(orocos_kdl REQUIRED)

# Install the config directory to the package share directory
install(DIRECTORY
config
DESTINATION share/${PROJECT_NAME}
)

# Install Python modules
ament_python_install_package(${PROJECT_NAME} SCRIPTS_DESTINATION lib/${PROJECT_NAME})

# Install Python executables
install(PROGRAMS
scripts/tutorial_1.py
scripts/tutorial_2.py
scripts/tutorial_3.py
scripts/tutorial_4.py
scripts/tutorial_5.py
scripts/tutorial_6.py
DESTINATION lib/${PROJECT_NAME}
)

ament_package()

Run the Executable

  • In terminal 1, run the following commands:

    cd ~/ariac_ws
    colcon build
    . install/setup.bash
    ros2 run ariac_tutorials tutorial_6.py
    

    The node will wait until the competition is ready.

  • In terminal 2, run the following commands:

    cd ~/ariac_ws
    . install/setup.bash
    ros2 launch ariac_gazebo ariac.launch.py trial_name:=tutorial competitor_pkg:=ariac_tutorials
    

Outputs

Listing 52 terminal 1 output
[INFO] [1679048497.138846958] [competition_interface]: Waiting for competition to be ready
[INFO] [1679048497.139894604] [competition_interface]: Competition state is: ready
[INFO] [1679048497.140293729] [competition_interface]: Competition is ready. Starting...
[INFO] [1679048497.142822117] [competition_interface]: Started competition.
[INFO] [1679048497.145127615] [competition_interface]: Changed gripper state to enabled
[INFO] [1679048501.986702439] [competition_interface]: Changed gripper state to disabled
[INFO] [1679048507.031545831] [competition_interface]: Changed gripper state to enabled