Tutorial 1: Create a Competition Package
Tutorial 1
Prerequisites: Introduction to Tutorials
Source Code: https://github.com/jaybrecht/ariac_tutorials/tree/tutorial_1
Switch Branch:
cd ~/ariac_ws/src/ariac_tutorials git switch tutorial_1
This tutorial details the steps necessary to create a competition package that is able to interface with the ARIAC competition. This competition package will use a python node to listen to the competition state and call a ROS service to start the competition when ready.
Package Structure
Updated/Created Files
The following sections show the modified files in the package with highlighted sections for additions since the previous tutorial. For Tutorial 1, all the files are new so there are no highlights.
Competition Interface
The competition interface for Tutorial 1 is shown in Listing 25. The majority of the code for the tutorials will be located inside of the CompetitionInterface
class which is inherited from the rclpy Node class. This class is part of a python module with the same name as the ROS package (ariac_tutorials
).
import rclpy
from rclpy.node import Node
from rclpy.parameter import Parameter
from ariac_msgs.msg import (
CompetitionState as CompetitionStateMsg,
)
from std_srvs.srv import Trigger
class CompetitionInterface(Node):
'''
Class for a competition interface node.
Args:
Node (rclpy.node.Node): Parent class for ROS nodes
Raises:
KeyboardInterrupt: Exception raised when the user uses Ctrl+C to kill a process
'''
_competition_states = {
CompetitionStateMsg.IDLE: 'idle',
CompetitionStateMsg.READY: 'ready',
CompetitionStateMsg.STARTED: 'started',
CompetitionStateMsg.ORDER_ANNOUNCEMENTS_DONE: 'order_announcements_done',
CompetitionStateMsg.ENDED: 'ended',
}
'''Dictionary for converting CompetitionState constants to strings'''
def __init__(self):
super().__init__('competition_interface')
sim_time = Parameter(
"use_sim_time",
rclpy.Parameter.Type.BOOL,
True
)
self.set_parameters([sim_time])
# Service client for starting the competition
self._start_competition_client = self.create_client(
Trigger, '/ariac/start_competition')
# Subscriber to the competition state topic
self._competition_state_sub = self.create_subscription(
CompetitionStateMsg,
'/ariac/competition_state',
self._competition_state_cb,
10)
# Store the state of the competition
self._competition_state: CompetitionStateMsg = None
def _competition_state_cb(self, msg: CompetitionStateMsg):
'''Callback for the topic /ariac/competition_state
Arguments:
msg -- CompetitionState message
'''
# Log if competition state has changed
if self._competition_state != msg.competition_state:
state = CompetitionInterface._competition_states[msg.competition_state]
self.get_logger().info(f'Competition state is: {state}', throttle_duration_sec=1.0)
self._competition_state = msg.competition_state
def start_competition(self):
'''Function to start the competition.
'''
self.get_logger().info('Waiting for competition to be ready')
if self._competition_state == CompetitionStateMsg.STARTED:
return
# Wait for competition to be ready
while self._competition_state != CompetitionStateMsg.READY:
try:
rclpy.spin_once(self)
except KeyboardInterrupt:
return
self.get_logger().info('Competition is ready. Starting...')
# Check if service is available
if not self._start_competition_client.wait_for_service(timeout_sec=3.0):
self.get_logger().error('Service \'/ariac/start_competition\' is not available.')
return
# Create trigger request and call starter service
request = Trigger.Request()
future = self._start_competition_client.call_async(request)
# Wait until the service call is completed
rclpy.spin_until_future_complete(self, future)
if future.result().success:
self.get_logger().info('Started competition.')
else:
self.get_logger().warn('Unable to start competition')
Code Explanation
Imports:
ariac_msgs.msg
: The ROS2 message API for the ARIAC messages.CompetitionState
: The competition state message (ariac_msgs/msg/CompetitionState).
std_srvs.srv
: The ROS2 service API for the standard services.
Class Variables
self._competition_states
: A dictionary used for logging purposes.
Instance Variables
self._start_competition_client
ROS client for the service /ariac/start_competition.self._competition_state_sub
ROS subscriber for the topic /ariac/competition_state.self._competition_state
a variable to store the current competition state.
Instance Methods
_competition_state_cb(self, msg: CompetitionStateMsg)
: Callback for the topic /ariac/competition_state. This method stores the competition state in the variable_competition_state
.start_competition(self)
: Method to start the competition. This method waits for the competition to be ready by checking the value of_competition_state
and then calls the service /ariac/start_competition through the client_start_competition_client
.
Executable
The entry point to the node is located in the executable tutorial_1.py
. Each tutorial will have a separate executable but will share the same interface.
#!/usr/bin/env python3
'''
To test this script, run the following commands in separate terminals:
- ros2 launch ariac_gazebo ariac.launch.py trial_name:=tutorial
- ros2 run ariac_tutorials tutorial_1.py
'''
import rclpy
from ariac_tutorials.competition_interface import CompetitionInterface
def main(args=None):
rclpy.init(args=args)
interface = CompetitionInterface()
interface.start_competition()
interface.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Code Explanation
This executable does the following:
Initialize the ROS client library.
Create an instance of the class
CompetitionInterface
as a ROS node.Start the competition.
Build Instructions
CMakeLists.txt
defines the build instructions that are used for this package when colcon build
is run for the workspace. The necessary ROS dependencies are located, the ariac_tutorials
python module is installed, and the tutorial_1.py
executable is installed.
cmake_minimum_required(VERSION 3.8)
project(ariac_tutorials)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
find_package(ament_cmake REQUIRED)
find_package(ament_cmake_python REQUIRED)
find_package(rclcpp REQUIRED)
find_package(rclpy REQUIRED)
find_package(ariac_msgs REQUIRED)
# Install Python modules
ament_python_install_package(${PROJECT_NAME} SCRIPTS_DESTINATION lib/${PROJECT_NAME})
# Install Python executables
install(PROGRAMS
scripts/tutorial_1.py
DESTINATION lib/${PROJECT_NAME}
)
ament_package()
Package Manifest
The manifest is used for metadata about the ROS package. It is also used by rosdep to ensure all necessary packages are installed.
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>ariac_tutorials</name>
<version>0.0.0</version>
<description>Tutorial 1</description>
<maintainer email="[email protected]">Justin Albrecht</maintainer>
<license>Apache License 2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<depend>rclcpp</depend>
<depend>rclpy</depend>
<depend>ariac_msgs</depend>
<depend>geometry_msgs</depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>
Important
Make sure to update the description, maintainer(s) and license when creating your team’s competition package.
Running the Executable
In terminal 1, run the following commands:
cd ~/ariac_ws colcon build . install/setup.bash ros2 run ariac_tutorials tutorial_1.py
You should see this output:
[INFO] [1679025057.998334513] [competition_interface]: Waiting for competition to be ready
The node waits until the competition is ready.
In terminal 2, run the following commands:
cd ~/ariac_ws . install/setup.bash ros2 launch ariac_gazebo ariac.launch.py competitor_pkg:=ariac_tutorials trial_name:=tutorial
This should start gazebo. Once the environment is loaded and the competition state is ready, the interface node running in terminal 1 will start the competition. This will activate all sensors, enable the robot controllers, start the conveyor belt, and start order announcements.