Tutorial 2: Read Data from a Break Beam Sensor
Tutorial 2
Prerequisites: Introduction to Tutorials and Tutorial 1
Source Code: https://github.com/jaybrecht/ariac_tutorials/tree/tutorial_2
Switch Branch:
cd ~/ariac_ws/src/ariac_tutorials git switch tutorial_2
This tutorial covers the following:
Creating a configuration file for sensors and cameras,
Reading data published by the sensor,
Logging the outputs in the terminal.
Package Structure
Updates and additions that are specific to Tutorial 2 are highlighted in the tree below.
Updated/Created Files
Sensor Configuration File
A sensor configuration file for a given package must be created in the folder config
in the package. The file must be added to the CMakeLists.txt
file in the package to allow the competition software to find the file.
To learn more about sensor configuration files, see the Sensor configuration file section.
A break beam sensor was added to sensors.yaml
as seen in Listing 31.
sensors:
breakbeam_0:
type: break_beam
visualize_fov: true
pose:
xyz: [-0.36, 3.5, 0.88]
rpy: [0, 0, pi]
Competition Interface
The competition interface for Tutorial 2 is shown in Listing 32.
1import rclpy
2from rclpy.node import Node
3from rclpy.qos import qos_profile_sensor_data
4from rclpy.parameter import Parameter
5
6from ariac_msgs.msg import (
7 CompetitionState as CompetitionStateMsg,
8 BreakBeamStatus as BreakBeamStatusMsg,
9)
10
11from std_srvs.srv import Trigger
12
13
14class CompetitionInterface(Node):
15 '''
16 Class for a competition interface node.
17
18 Args:
19 Node (rclpy.node.Node): Parent class for ROS nodes
20
21 Raises:
22 KeyboardInterrupt: Exception raised when the user uses Ctrl+C to kill a process
23 '''
24 _competition_states = {
25 CompetitionStateMsg.IDLE: 'idle',
26 CompetitionStateMsg.READY: 'ready',
27 CompetitionStateMsg.STARTED: 'started',
28 CompetitionStateMsg.ORDER_ANNOUNCEMENTS_DONE: 'order_announcements_done',
29 CompetitionStateMsg.ENDED: 'ended',
30 }
31 '''Dictionary for converting CompetitionState constants to strings'''
32
33 def __init__(self):
34 super().__init__('competition_interface')
35
36 sim_time = Parameter(
37 "use_sim_time",
38 rclpy.Parameter.Type.BOOL,
39 True
40 )
41
42 self.set_parameters([sim_time])
43
44 # Service client for starting the competition
45 self._start_competition_client = self.create_client(Trigger, '/ariac/start_competition')
46
47 # Subscriber to the competition state topic
48 self._competition_state_sub = self.create_subscription(
49 CompetitionStateMsg,
50 '/ariac/competition_state',
51 self._competition_state_cb,
52 10)
53
54 # Store the state of the competition
55 self._competition_state: CompetitionStateMsg = None
56
57 # Subscriber to the break beam status topic
58 self._break_beam0_sub = self.create_subscription(
59 BreakBeamStatusMsg,
60 '/ariac/sensors/breakbeam_0/status',
61 self._breakbeam0_cb,
62 qos_profile_sensor_data)
63
64 # Store the number of parts that crossed the beam
65 self._conveyor_part_count = 0
66
67 # Store whether the beam is broken
68 self._object_detected = False
69
70 @property
71 def conveyor_part_count(self):
72 return self._conveyor_part_count
73
74 def _breakbeam0_cb(self, msg: BreakBeamStatusMsg):
75 '''Callback for the topic /ariac/sensors/breakbeam_0/status
76
77 Arguments:
78 msg -- BreakBeamStatusMsg message
79 '''
80 if not self._object_detected and msg.object_detected:
81 self._conveyor_part_count += 1
82
83 self._object_detected = msg.object_detected
84
85 def _competition_state_cb(self, msg: CompetitionStateMsg):
86 '''Callback for the topic /ariac/competition_state
87 Arguments:
88 msg -- CompetitionState message
89 '''
90 # Log if competition state has changed
91 if self._competition_state != msg.competition_state:
92 state = CompetitionInterface._competition_states[msg.competition_state]
93 self.get_logger().info(f'Competition state is: {state}', throttle_duration_sec=1.0)
94
95 self._competition_state = msg.competition_state
96
97 def start_competition(self):
98 '''Function to start the competition.
99 '''
100 self.get_logger().info('Waiting for competition to be ready')
101
102 if self._competition_state == CompetitionStateMsg.STARTED:
103 return
104 # Wait for competition to be ready
105 while self._competition_state != CompetitionStateMsg.READY:
106 try:
107 rclpy.spin_once(self)
108 except KeyboardInterrupt:
109 return
110
111 self.get_logger().info('Competition is ready. Starting...')
112
113 # Check if service is available
114 if not self._start_competition_client.wait_for_service(timeout_sec=3.0):
115 self.get_logger().error('Service \'/ariac/start_competition\' is not available.')
116 return
117
118 # Create trigger request and call starter service
119 request = Trigger.Request()
120 future = self._start_competition_client.call_async(request)
121
122 # Wait until the service call is completed
123 rclpy.spin_until_future_complete(self, future)
124
125 if future.result().success:
126 self.get_logger().info('Started competition.')
127 else:
128 self.get_logger().warn('Unable to start competition')
Code Explanation
Imports
from rclpy.qos import qos_profile_sensor_data
is the ROS 2 Quality of Service API. This is used to set the QoS profile for the floor robot gripper state subscriber.BreakBeamStatus
: Message class that stores the status of the break beam (ariac_msgs/msg/BreakBeamStatus).
Instance Variables
_break_beam0_sub
: Subscriber to the break beam status topic._conveyor_part_count
: Store the number of parts that crossed the beam._object_detected
: Store whether the beam is broken.
Instance Methods
conveyor_part_count(self)
is a getter to the_conveyor_part_count
attribute._breakbeam0_cb(self, msg)
is the callback function for the sensor topic. The callback increments the variable_conveyor_part_count
when the beam is broken and the variable_object_detected
is false. The variable_object_detected
is set to true when the beam is broken.
Executable
#!/usr/bin/env python3
'''
To test this script, run the following commands in separate terminals:
- ros2 launch ariac_gazebo ariac.launch.py trial_name:=tutorial competition_pkg:=ariac_tutorials
- ros2 run ariac_tutorials tutorial_2.py
'''
import rclpy
from ariac_tutorials.competition_interface import CompetitionInterface
def main(args=None):
rclpy.init(args=args)
interface = CompetitionInterface()
interface.start_competition()
while rclpy.ok():
try:
rclpy.spin_once(interface)
interface.get_logger().info(
f'Part Count: {interface.conveyor_part_count}',
throttle_duration_sec=2.0)
except KeyboardInterrupt:
break
interface.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Code Explanation
This executable does the following:
Initialize the ROS client library.
Create an instance of the class
CompetitionInterface
as a ROS node.Start the competition.
Log the number of parts that crossed the beam every 2 seconds.
Build Instructions
To allow for the competition software to be able to find the sensor configuration, it must be installed to the share directory of the package. The Tutorial 2 executable also needs to be installed.
cmake_minimum_required(VERSION 3.8)
project(ariac_tutorials)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
find_package(ament_cmake REQUIRED)
find_package(ament_cmake_python REQUIRED)
find_package(rclcpp REQUIRED)
find_package(rclpy REQUIRED)
find_package(ariac_msgs REQUIRED)
# Install the config directory to the package share directory
install(DIRECTORY
config
DESTINATION share/${PROJECT_NAME}
)
# Install Python modules
ament_python_install_package(${PROJECT_NAME} SCRIPTS_DESTINATION lib/${PROJECT_NAME})
# Install Python executables
install(PROGRAMS
scripts/tutorial_1.py
scripts/tutorial_2.py
DESTINATION lib/${PROJECT_NAME}
)
ament_package()
Test the Sensor Configuration
To test the sensor was correctly added to the environment, run the following commands:
cd ~/ariac_ws
colcon build
. install/setup.bash
ros2 launch ariac_gazebo ariac.launch.py trial_name:=tutorial competitor_pkg:=ariac_tutorials
You should see a break beam sensor on the right side of the conveyor belt, as shown in the figure.

Run the Executable
In terminal 1, run the following commands:
cd ~/ariac_ws colcon build . install/setup.bash ros2 run ariac_tutorials tutorial_2.py
The node will wait until the competition is ready. In a second terminal, run the following:
In terminal 2, run the following commands:
cd ~/ariac_ws . install/setup.bash ros2 launch ariac_gazebo ariac.launch.py competitor_pkg:=ariac_tutorials trial_name:=tutorial
Once the environment is loaded and the competition state is ready, the interface node running in terminal 2 will start the competition and the sensor will start publishing data. In terminal 1 you should see the the part count output increasing as parts on the conveyor break the sensor beam, as shown in the figure below.