ROS2接口扩展 目标:了解更多方法来在ROS 2中实现自定义接口
虽然最好来说定义接口是在一个新的package中,比如上一节中的tutorial_interface中,但是有时也会把接口的定义和使用都再一个包里面
如何使用一个已存在的interface 在类中使用参数 Goal: Create and run a class with ROS parameters using Python (rclpy).
在制作自己的节点时,您有时需要添加可以从启动文件设置的参数。
本教程将向您展示如何在Python类中创建这些参数,以及如何将它们设置为在启动文件中。
1 ros2 pkg create --build-type ament_python python_parameters --dependencies rclpy
更新update.xml以及setup.py 1 2 3 <description>Python parameter tutorial</description> <maintainer email="you@email.com">Your Name</maintainer> <license>Apache License 2.0</license>
python代码 再dev_ws/src/python_parameters/python_parameters
目录下,新建python_parameters_node.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import rclpy import rclpy.node from rclpy.exceptions import ParameterNotDeclaredException from rcl_interfaces.msg import ParameterType class MinimalParam(rclpy.node.Node): def __init__(self): super().__init__('minimal_param_node') timer_period = 2 # seconds self.timer = self.create_timer(timer_period, self.timer_callback) self.declare_parameter('my_parameter', 'world') def timer_callback(self): my_param = self.get_parameter('my_parameter').get_parameter_value().string_value self.get_logger().info('Hello %s!' % my_param) my_new_param = rclpy.parameter.Parameter( 'my_parameter', rclpy.Parameter.Type.STRING, 'world' ) all_new_parameters = [my_new_param] self.set_parameters(all_new_parameters) def main(): rclpy.init() node = MinimalParam() rclpy.spin(node) if __name__ == '__main__': main()
代码审查 注意:在获取或设置之前必须声明参数,或者将提出parameternotdeclaredException
异常。
1 2 3 4 import rclpy import rclpy.node from rclpy.exceptions import ParameterNotDeclaredException from rcl_interfaces.msg import ParameterType
下一段代码创建类和构造函数。timer被初始化(timer_period设置为2秒),这将导致timer_callback函数每两秒执行一次。构造函数的self.declare_parameter(‘my_parameter’, ‘world’)一行创建了一个名为my_parameter的形参,默认值为world。
1 2 3 4 5 6 7 class MinimalParam(rclpy.node.Node): def __init__(self): super().__init__('minimal_param_node') timer_period = 2 # seconds self.timer = self.create_timer(timer_period, self.timer_callback) self.declare_parameter('my_parameter', 'world')
timer_callback函数的第一行从节点获取参数my_parameter,并将其存储在my_param中。接下来,get_logger函数确保打印了消息。然后,我们将参数’ my_parameter ‘设置回默认的字符串值’ world ‘。
1 2 3 4 5 6 7 8 9 10 11 12 def timer_callback(self): my_param = self.get_parameter('my_parameter').get_parameter_value().string_value self.get_logger().info('Hello %s!' % my_param) my_new_param = rclpy.parameter.Parameter( 'my_parameter', rclpy.Parameter.Type.STRING, 'world' ) all_new_parameters = [my_new_param] self.set_parameters(all_new_parameters)
timer_callback之后是初始化ROS 2的主函数。然后定义名为node的MinimalParam类的实例。最后,rclpy.spin开始处理来自节点的数据。
1 2 3 4 5 6 7 def main(): rclpy.init() node = MinimalParam() rclpy.spin(node) if __name__ == '__main__': main()
添加对参数的描述parameterDescriptor. 还可以为参数设置描述符。描述符允许您指定参数的类型和一些描述文本。为了让它工作,__init__
代码必须更改为:
1 2 3 4 5 6 7 8 9 10 11 12 13 class MinimalParam(rclpy.node.Node): def __init__(self): super().__init__('minimal_param_node') timer_period = 2 # seconds self.timer = self.create_timer(timer_period, self.timer_callback) from rcl_interfaces.msg import ParameterDescriptor my_parameter_descriptor = ParameterDescriptor(type=ParameterType.PARAMETER_STRING, description='This parameter is mine!') self.declare_parameter('my_parameter', 'default value for my_parameter', my_parameter_descriptor)
其余的代码保持不变。运行节点之后,可以运行ros2 param description /minimal_param_node my_parameter
查看类型和描述。
Add an entry point 1 2 3 4 5 entry_points={ 'console_scripts': [ 'param_talker = python_parameters.python_parameters_node:main', ], },
build and run 1 2 3 rosdep install -i --from-path src --rosdistro dashing -y cd ~/dev_ws colcon build
新打开一个终端,两个source之后
1 ros2 run python_parameters param_talker
会出现
1 2 3 4 [INFO] [minimal_param_node]: Hello world! [INFO] [minimal_param_node]: Hello world1! [INFO] [minimal_param_node]: Hello world1! [INFO] [minimal_param_node]: Hello world1!
两种方式去改变这个参数
通过命令行 我们可以通过命令行还修改参数的设置
首先查看现在有什么参数
1 2 3 4 ros2 param list /minimal_param_node: my_parameter use_sim_time
接下来使用命令行还修改参数
1 ros2 param set /minimal_param_node my_parameter earth
现在去查看另外一个终端 就会出现hello earth
通过launch文件 您还可以在启动文件中设置参数,但首先需要添加一个启动目录。在dev_ws/src/python_parameters/目录中,创建一个名为launch的新目录。在这里,创建一个名为python_parameters_launch.py的新文件
emulate_tty
, which prints output to the console, is not available in Dashing.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from launch import LaunchDescription from launch_ros.actions import Node def generate_launch_description(): return LaunchDescription([ Node( package='python_parameters', node_executable='param_talker', node_name='custom_parameter_node', output='screen', parameters=[ {'my_parameter': 'earth'} ] ) ])
在这里,您可以看到我们在启动Node Parameter_Node时将My_Parameter设置为earth。
现在打开setup.py文件。将导入语句添加到文件的顶部,并将另一条新语句添加到data_files参数中,以包含所有启动文件:
1 ros2 launch python_parameters python_parameters_launch.py
Summary 您创建了一个带有自定义参数的节点,该参数可以从启动文件或命令行设置。您编写了一个参数对话器的代码:一个Python节点声明,然后循环获取和设置一个字符串参数。您添加了入口点,以便可以构建和运行它,并使用ros2 param与参数对话器交互。
creating an action 1 2 cd dev_ws/src ros2 pkg create action_tutorials_interfaces
定义一个action action在.action文件中定义:
1 2 3 4 5 # Request --- # Result --- # Feedback
操作定义由三个用—-分隔的消息定义组成。
Request消息从操作客户端发送到启动新goal的操作服务器。
Result 从action服务端出到action客户端,当goal已经完成。
feedback 从动作服务器周期性地发送到具有关于目标的更新的动作客户端。
action的一个实例通常被称为goal。
假设我们想定义一个新的动作“Fibonacci”来计算Fibonacci序列。
1 2 3 4 5 6 7 8 cd action_tutorials_interfaces mkdir action vi Fibonacci.action int32 order --- int32[] sequence --- int32[] partial_sequence
目标请求是我们想要计算的斐波纳契序列的order,结果是最终sequence,反馈是到目前为止计算的Partial_sequence。
building an action 在代码中使用新的Fibonacci动作类型之前,必须将定义传递给rosidl code generation pipeline.
这是通过在action_tutorials_interfaces的ament_package()行之前添加以下几行CMakeLists.txt来完成的:
1 2 3 4 5 find_package(rosidl_default_generators REQUIRED) rosidl_generate_interfaces(${PROJECT_NAME} "action/Fibonacci.action" )
我们还应该将所需的依赖项添加到package.xml中:
注意,我们需要依赖于action_msgs,因为action操作定义包括其他元数据(例如目标ID)。
打开package.xml
1 2 3 4 5 <buildtool_depend>rosidl_default_generators</buildtool_depend> <depend>action_msgs</depend> <member_of_group>rosidl_interface_packages</member_of_group>
然后进行
1 ros2 action show action_tutorials_interfaces/action/Fibonacci
写一个action的客户端和服务端 action是ROS 2中异步通信的一种形式。action客户端向action服务器发送goal请求。action服务器向action客户端发送目标feedback和result。
写一个action的服务端 下面写一个服务端用之前定义好的action。为了教程的简单,我们会把服务端单独做成一个文件
打开home目录,建立文件fibonacci_action_server.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import rclpy from rclpy.action import ActionServer from rclpy.node import Node from action_tutorials_interfaces.action import Fibonacci class FibonacciActionServer(Node): def __init__(self): super().__init__('fibonacci_action_server') self._action_server = ActionServer( self, Fibonacci, 'fibonacci', self.execute_callback) def execute_callback(self, goal_handle): self.get_logger().info('Executing goal...') result = Fibonacci.Result() return result def main(args=None): rclpy.init(args=args) fibonacci_action_server = FibonacciActionServer() rclpy.spin(fibonacci_action_server) if __name__ == '__main__': main()
第8行定义了一个类FibonacciActionServer,它是Node的子类。通过调用节点的构造函数来初始化该类,将我们的节点命名为fibonacci_action_server:
1 super().__init__('fibonacci_action_server')
在构造函数中,我们还实例化了一个新的action server:
1 2 3 4 5 self._action_server = ActionServer( self, Fibonacci, 'fibonacci', self.execute_callback)
action server需要四个参数:
ROS 2节点将动作客户端添加到:self。
动作的类型:Fibonacci(以第5行导入)。action_tutorials_interfaces
动作名称:’fibonacci’。
用于执行接受的目标的回调函数:self.execute_callback。此回调必须返回操作类型的结果消息。
1 2 3 4 5 def execute_callback(self, goal_handle): self.get_logger().info('Executing goal...')action_tutorials_interfaces result = Fibonacci.Result() return result python3 fibonacci_action_server.py
在另一个终端中,我们可以使用命令行界面来发送目标:
1 ros2 action send_goal fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"
将看见
1 2 3 4 5 6 7 8 9 10 Waiting for an action server to become available... Sending goal: order: 5 Goal accepted with ID: 0a7d8ac416a14368b467604786edb81a Result: sequence: [] Goal finished with status: ABORTED
再服务端,我们有
1 2 [INFO] [fibonacci_action_server]: Executing goal... [WARN] [fibonacci_action_server]: Goal state not set, assuming aborted. Goal ID: [ 10 125 138 196 22 161 67 104 180 103 96 71 134 237 184 26]
在运行操作服务器的终端中,您应该看到一条日志消息“Executing goal…”,后面是一个目标状态未设置的警告。默认情况下,如果在执行回调中没有设置目标句柄状态,它将假定为aborted状态。
我们可以使用目标句柄上的succeed()方法来指示目标成功:
1 2 3 4 5 def execute_callback(self, goal_handle): self.get_logger().info('Executing goal...') goal_handle.succeed() result = Fibonacci.Result() return result
现在,如果重新启动操作服务器并发送另一个goal,您应该会看到goal已经完成,状态为SUCCEEDED。
1 2 3 4 5 6 7 8 9 10 Waiting for an action server to become available... Sending goal: order: 5 Goal accepted with ID: 7ca482d0b76546cdbaaa4bf03647d8fa Result: sequence: [] Goal finished with status: SUCCEEDED
现在,让我们的目标执行实际计算并返回所请求的斐波那契序列:
1 2 3 4 5 6 7 8 9 10 11 12 13 def execute_callback(self, goal_handle): self.get_logger().info('Executing goal...') sequence = [0, 1] for i in range(1, goal_handle.request.order): sequence.append(sequence[i] + sequence[i-1]) goal_handle.succeed() result = Fibonacci.Result() result.sequence = sequence return result
在计算序列之后,我们在返回之前将其分配给result消息字段。
.再次重启操作服务器并发送另一个目标。您应该看到目标以正确的结果序列结束。
1 2 3 4 5 6 7 8 9 10 Waiting for an action server to become available... Sending goal: order: 5 Goal accepted with ID: 06a9917c3e664f6d8e39f51bc5f31802 Result: sequence: [0, 1, 1, 2, 3, 5] Goal finished with status: SUCCEEDED
Publishing feedback 关于action的一个很好的事情是在目标执行期间提供对动作客户端的反馈的能力。我们可以通过调用目标句柄的publish_feedback()方法使我们的action服务器发布action客户端的反馈。
我们将替换sequence变量,并使用feedback消息来存储序列。在for循环中每次更新反馈信息后,我们都会发布feedback信息并休眠以获得戏剧性效果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import time import rclpy from rclpy.action import ActionServer from rclpy.node import Node from action_tutorials_interfaces.action import Fibonacci class FibonacciActionServer(Node): def __init__(self): super().__init__('fibonacci_action_server') self._action_server = ActionServer( self, Fibonacci, 'fibonacci', self.execute_callback) def execute_callback(self, goal_handle): self.get_logger().info('Executing goal...') feedback_msg = Fibonacci.Feedback() feedback_msg.partial_sequence = [0, 1] for i in range(1, goal_handle.request.order): feedback_msg.partial_sequence.append( feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i-1]) self.get_logger().info('Feedback: {0}'.format(feedback_msg.partial_sequence)) goal_handle.publish_feedback(feedback_msg) time.sleep(1) goal_handle.succeed() result = Fibonacci.Result() result.sequence = feedback_msg.partial_sequence return result
在重新启动action服务器后,我们可以通过使用命令行工具和—feedback选项确认反馈现在已经发布了:
写一个action客户端 也是单个文件,fibonacci_action_client.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import rclpy from rclpy.action import ActionClient from rclpy.node import Node from action_tutorials_interfaces.action import Fibonacci class FibonacciActionClient(Node): def __init__(self): super().__init__('fibonacci_action_client') self._action_client = ActionClient(self, Fibonacci, 'fibonacci') def send_goal(self, order): goal_msg = Fibonacci.Goal() goal_msg.order = order self._action_client.wait_for_server() return self._action_client.send_goal_async(goal_msg) def main(args=None): rclpy.init(args=args) action_client = FibonacciActionClient() future = action_client.send_goal(10) rclpy.spin_until_future_complete(action_client, future) if __name__ == '__main__': main()
我们还在FibonAcciaInctionClient类中定义了一个方法send_goal:此方法等待可用的action服务器,然后向服务器发送目标。它返回一个future,这个future是我们未来可以获取结果的一个对象
获得结果 我们可以发送一个目标,但是我们如何知道它何时完成呢?我们可以通过几个步骤得到结果信息。首先,我们需要为发送的目标获取一个目标句柄。然后,我们可以使用目标句柄来请求结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import rclpy from rclpy.action import ActionClient from rclpy.node import Node from action_tutorials_interfaces.action import Fibonacci class FibonacciActionClient(Node): def __init__(self): super().__init__('fibonacci_action_client') self._action_client = ActionClient(self, Fibonacci, 'fibonacci') def send_goal(self, order): goal_msg = Fibonacci.Goal() goal_msg.order = order self._action_client.wait_for_server() self._send_goal_future = self._action_client.send_goal_async(goal_msg) self._send_goal_future.add_done_callback(self.goal_response_callback) def goal_response_callback(self, future): goal_handle = future.result() if not goal_handle.accepted: self.get_logger().info('Goal rejected :(') return self.get_logger().info('Goal accepted :)') self._get_result_future = goal_handle.get_result_async() self._get_result_future.add_done_callback(self.get_result_callback) def get_result_callback(self, future): result = future.result().result self.get_logger().info('Result: {0}'.format(result.sequence)) rclpy.shutdown() def main(args=None): rclpy.init(args=args) action_client = FibonacciActionClient() action_client.send_goal(10) rclpy.spin(action_client) if __name__ == '__main__': main()
ActionClient.send_goal_async()方法为目标句柄返回一个future。首先,当future完成时注册一个回调函数:
1 self._send_goal_future.add_done_callback(self.goal_response_callback)
注意,当操作服务器接受或拒绝goal请求时,future就完成了。让我们更详细地看看goal_response_callback。
我们可以检查目标是否被拒绝,并提前返回,因为我们知道不会有结果:
1 2 3 4 5 6 7 def goal_response_callback(self, future): goal_handle = future.result() if not goal_handle.accepted: self.get_logger().info('Goal rejected :(') return self.get_logger().info('Goal accepted :)')
现在我们已经有了一个目标句柄,我们可以使用它来通过get_result_async()方法请求结果。
与发送目标类似,我们将得到一个future,当结果准备好时,它将完成。让我们注册一个回调,就像我们为目标响应所做的那样:
1 2 self._get_result_future = goal_handle.get_result_async() self._get_result_future.add_done_callback(self.get_result_callback)
在回调中,我们记录了结果序列并关闭了ROS 2以获得干净的退出:
1 2 3 4 def get_result_callback(self, future): result = future.result().result self.get_logger().info('Result: {0}'.format(result.sequence)) rclpy.shutdown()
再一次尝试运行服务端和客户端的代码
客户端
1 2 [INFO] [fibonacci_action_client]: Goal accepted :) [INFO] [fibonacci_action_client]: Result: array('i', [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55])
服务端
1 2 3 4 5 6 7 8 9 10 [INFO] [fibonacci_action_server]: Executing goal... [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1]) [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2]) [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3]) [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5]) [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5, 8]) [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5, 8, 13]) [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5, 8, 13, 21]) [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]) [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55])
获得feedback 我们的action客户端可以发送目标。nice!但如果我们可以获得关于我们从Action Server发送的目标的一些反馈,那就太好了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 import rclpy from rclpy.action import ActionClient from rclpy.node import Node from action_tutorials_interfaces.action import Fibonacci class FibonacciActionClient(Node): def __init__(self): super().__init__('fibonacci_action_client') self._action_client = ActionClient(self, Fibonacci, 'fibonacci') def send_goal(self, order): goal_msg = Fibonacci.Goal() goal_msg.order = order self._action_client.wait_for_server() self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback) self._send_goal_future.add_done_callback(self.goal_response_callback) def goal_response_callback(self, future): goal_handle = future.result() if not goal_handle.accepted: self.get_logger().info('Goal rejected :(') return self.get_logger().info('Goal accepted :)') self._get_result_future = goal_handle.get_result_async() self._get_result_future.add_done_callback(self.get_result_callback) def get_result_callback(self, future): result = future.result().result self.get_logger().info('Result: {0}'.format(result.sequence)) rclpy.shutdown() def feedback_callback(self, feedback_msg): feedback = feedback_msg.feedback self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence)) def main(args=None): rclpy.init(args=args) action_client = FibonacciActionClient() action_client.send_goal(10) rclpy.spin(action_client) if __name__ == '__main__': main()
以下是反馈信息的回调函数:
1 2 3 def feedback_callback(self, feedback_msg): feedback = feedback_msg.feedback self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))
在回调中,我们获得消息的反馈部分,并将partial_sequence字段打印到屏幕上。
我们需要使用Action Client注册回调。当我们发送目标时,这将通过此外将回调传递给行动客户端来实现:
1 self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
运行客户端
1 2 3 4 5 6 7 8 9 10 11 [INFO] [fibonacci_action_client]: Goal accepted :) [INFO] [fibonacci_action_client]: Received feedback: array('i', [0, 1, 1]) [INFO] [fibonacci_action_client]: Received feedback: array('i', [0, 1, 1, 2]) [INFO] [fibonacci_action_client]: Received feedback: array('i', [0, 1, 1, 2, 3]) [INFO] [fibonacci_action_client]: Received feedback: array('i', [0, 1, 1, 2, 3, 5]) [INFO] [fibonacci_action_client]: Received feedback: array('i', [0, 1, 1, 2, 3, 5, 8]) [INFO] [fibonacci_action_client]: Received feedback: array('i', [0, 1, 1, 2, 3, 5, 8, 13]) [INFO] [fibonacci_action_client]: Received feedback: array('i', [0, 1, 1, 2, 3, 5, 8, 13, 21]) [INFO] [fibonacci_action_client]: Received feedback: array('i', [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]) [INFO] [fibonacci_action_client]: Received feedback: array('i', [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55]) [INFO] [fibonacci_action_client]: Result: array('i', [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55])
服务端
1 2 3 4 5 6 7 8 9 10 [INFO] [fibonacci_action_server]: Executing goal... [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1]) [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2]) [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3]) [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5]) [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5, 8]) [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5, 8, 13]) [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5, 8, 13, 21]) [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]) [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55])
使用Launch启动/监视多个节点 ros2 启动系统 ROS 2中的启动系统负责帮助用户描述他们的系统配置,然后按照描述执行。系统的配置包括运行什么程序,在哪里运行它们,传递什么参数,以及ROS特定的约定,这些约定通过为组件提供不同的配置,使组件在整个系统中易于重用,它还负责监视已启动流程的状态,并报告和/或对这些流程状态的变化作出反应。
用Python编写的启动文件可以启动和停止不同的节点,也可以触发和处理各种事件。提供这个框架的包是launch_ros,它使用底层的非特定于ros的启动框架。
写一个启动文件 当我们进行了
1 ros2 pkg create --build-type = ament_python my_package --dependency [deps]
应该就会有如下文件结构, 再建一个了launch文件夹
1 2 3 4 5 6 src/ my_package/ launch/ setup.py setup.cfg package.xml
为了让colcon找到启动文件,我们需要使用setup的data_files参数通知Python的安装工具我们的启动文件。
在setup.py里面
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import os from glob import glob from setuptools import setup package_name = 'my_package' setup( # Other parameters ... data_files=[ # ... Other data files # Include all launch files. This is the most important line here! (os.path.join('share', package_name), glob('launch/*.launch.py')) ] )
写一个launch文件 在启动目录中,使用.launch.py后缀创建一个新的启动文件。例如my_script.launch.py。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import launch import launch.actions import launch.substitutions import launch_ros.actions def generate_launch_description(): return launch.LaunchDescription([ launch.actions.DeclareLaunchArgument( 'node_prefix', default_value=[launch.substitutions.EnvironmentVariable('USER'), '_'], description='Prefix for node names'), launch_ros.actions.Node( package='demo_nodes_cpp', node_executable='talker', output='screen', node_name=[launch.substitutions.LaunchConfiguration('node_prefix'), 'talker']), ])
.launch.py这个后缀其实并没有要求,_lannch.py
也是可以的。
launch文件应该定义一个generate_launch_description,并且返回一个LaunchDescription。这样ros2 launch才可以找到
使用 虽然可以将启动文件编写为独立的脚本,但ROS中典型的使用方式是由ROS 2工具调用启动文件。
再colcon build之后,应该可以调用
1 ros2 launch my_package script.launch.py