当前位置:   article > 正文

ROS2 Publisher-Subscriber源码解析_ros2 源码解析

ros2 源码解析
  1. #include <chrono>
  2. #include <memory>
  3. #include "rclcpp/rclcpp.hpp"
  4. #include "std_msgs/msg/string.hpp"
  5. using namespace std::chrono_literals;
  6. class MinimalPublisher : public rclcpp::Node
  7. {
  8. public:
  9. MinimalPublisher()
  10. : Node("minimal_publisher"), count_(0)
  11. {
  12. publisher_ = this->create_publisher<std_msgs::msg::String>("topic", 10);
  13. auto timer_callback =
  14. [this]() -> void {
  15. auto message = std_msgs::msg::String();
  16. message.data = "Hello, world! " + std::to_string(this->count_++);
  17. RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", message.data.c_str());
  18. this->publisher_->publish(message);
  19. };
  20. timer_ = this->create_wall_timer(500ms, timer_callback);
  21. }
  22. private:
  23. rclcpp::TimerBase::SharedPtr timer_;
  24. rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
  25. size_t count_;
  26. };
  27. int main(int argc, char * argv[])
  28. {
  29. rclcpp::init(argc, argv);
  30. rclcpp::spin(std::make_shared<MinimalPublisher>());
  31. rclcpp::shutdown();
  32. return 0;
  33. }

MinimalPublisher必须继承rclcpp::Node类,这样才能让executor去spin它。MinimalPublisher声明了一个rclcpp::Publisher<std_msgs::msg::String>::SharedPtr成员,在构造函数中调用了rclcpp::Node::create_publisher<>进行了初始化。create_publisher的定义看一下

  1. template <
  2. typename MessageT,
  3. typename AllocatorT = std::allocator<void>,
  4. typename PublisherT = rclcpp::Publisher<MessageT, AllocatorT>>
  5. std::shared_ptr<PublisherT>
  6. create_publisher(
  7. const std::string &topic_name,
  8. const rclcpp::QoS &qos,
  9. const PublisherOptionsWithAllocator<AllocatorT> &options =
  10. PublisherOptionsWithAllocator<AllocatorT>());
  1. emplate<typename MessageT, typename AllocatorT, typename PublisherT>
  2. std::shared_ptr<PublisherT>
  3. Node::create_publisher(
  4. const std::string & topic_name,
  5. const rclcpp::QoS & qos,
  6. const PublisherOptionsWithAllocator<AllocatorT> & options)
  7. {
  8. // first to check whitelist of topic name
  9. if (!rclcpp::node_interfaces::topology_validate_topic_name(topic_name)) {
  10. throw rclcpp::exceptions::InvalidTopicNameError(
  11. topic_name.c_str(),
  12. "not in the whitelist",
  13. 0);
  14. }
  15. return rclcpp::create_publisher<MessageT, AllocatorT, PublisherT>(
  16. *this,
  17. extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()),
  18. qos,
  19. options);
  20. }
  1. template<
  2. typename MessageT,
  3. typename AllocatorT = std::allocator<void>,
  4. typename PublisherT = rclcpp::Publisher<MessageT, AllocatorT>,
  5. typename NodeT>
  6. std::shared_ptr<PublisherT>
  7. create_publisher(
  8. NodeT & node,
  9. const std::string & topic_name,
  10. const rclcpp::QoS & qos,
  11. const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options = (
  12. rclcpp::PublisherOptionsWithAllocator<AllocatorT>()
  13. )
  14. )
  15. {
  16. return detail::create_publisher<MessageT, AllocatorT, PublisherT>(
  17. node, node, topic_name, qos, options);
  18. }
  1. template<
  2. typename MessageT,
  3. typename AllocatorT = std::allocator<void>,
  4. typename PublisherT = rclcpp::Publisher<MessageT, AllocatorT>,
  5. typename NodeParametersT,
  6. typename NodeTopicsT>
  7. std::shared_ptr<PublisherT>
  8. create_publisher(
  9. NodeParametersT & node_parameters,
  10. NodeTopicsT & node_topics,
  11. const std::string & topic_name,
  12. const rclcpp::QoS & qos,
  13. const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options = (
  14. rclcpp::PublisherOptionsWithAllocator<AllocatorT>()
  15. )
  16. )
  17. {
  18. auto node_topics_interface = rclcpp::node_interfaces::get_node_topics_interface(node_topics);
  19. const rclcpp::QoS & actual_qos = options.qos_overriding_options.get_policy_kinds().size() ?
  20. rclcpp::detail::declare_qos_parameters(
  21. options.qos_overriding_options, node_parameters,
  22. node_topics_interface->resolve_topic_name(topic_name),
  23. qos, rclcpp::detail::PublisherQosParametersTraits{}) :
  24. qos;
  25. // Create the publisher.
  26. auto pub = node_topics_interface->create_publisher(
  27. topic_name,
  28. rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(options),
  29. actual_qos
  30. );
  31. // Add the publisher to the node topics interface.
  32. node_topics_interface->add_publisher(pub, options.callback_group);
  33. return std::dynamic_pointer_cast<PublisherT>(pub);
  34. }

create_publisher借助一个rclcpp::PublisherFactory中的函数成员来构造PublisherBase

node_topics.cpp

  1. rclcpp::PublisherBase::SharedPtr
  2. NodeTopics::create_publisher(
  3. const std::string & topic_name,
  4. const rclcpp::PublisherFactory & publisher_factory,
  5. const rclcpp::QoS & qos)
  6. {
  7. rclcpp::QoS & qos_adapter = const_cast<rclcpp::QoS &>(qos);
  8. qos_adapter.reliable();
  9. // Create the MessageT specific Publisher using the factory, but return it as PublisherBase.
  10. return publisher_factory.create_typed_publisher(node_base_, topic_name, qos_adapter);
  11. }

create_typed_publisher成员是一个lambda函数构造的函数对象,这个函数为:

  1. [options](
  2. rclcpp::node_interfaces::NodeBaseInterface * node_base,
  3. const std::string & topic_name,
  4. const rclcpp::QoS & qos
  5. ) -> std::shared_ptr<PublisherT>
  6. {
  7. auto publisher = std::make_shared<PublisherT>(node_base, topic_name, qos, options);
  8. // This is used for setting up things like intra process comms which
  9. // require this->shared_from_this() which cannot be called from
  10. // the constructor.
  11. publisher->post_init_setup(node_base, topic_name, qos, options);
  12. return publisher;
  13. }

node 初始化过程:

  1. ret = rcl_node_init(
  2. rcl_node.get(),
  3. node_name.c_str(), namespace_.c_str(),
  4. context_->get_rcl_context().get(), &rcl_node_options);
  1. node->impl->rmw_node_handle = rmw_create_node(
  2. &(node->context->impl->rmw_context),
  3. name, local_namespace_);
  1. rmw_node_t *
  2. rmw_api_connextdds_create_node(
  3. rmw_context_t * context,
  4. const char * name,
  5. const char * ns)
  1. rmw_ret_t
  2. rmw_context_impl_t::initialize_node(
  3. const char * const node_namespace,
  4. const char * const node_name,
  5. const bool localhost_only)
  1. rmw_ret_t
  2. rmw_context_impl_t::initialize_participant(
  3. const char * const node_namespace,
  4. const char * const node_name,
  5. const bool localhost_only)
  1. RMW_Connext_Node *
  2. RMW_Connext_Node::create(
  3. rmw_context_impl_t * const ctx)

并初始化和更新了graph_cache

  1. rmw_ret_t
  2. rmw_context_impl_t::enable_participant()
  3. {
  4. if (DDS_RETCODE_OK !=
  5. DDS_Entity_enable(
  6. DDS_DomainParticipant_as_entity(this->participant)))
  7. {
  8. RMW_CONNEXT_LOG_ERROR_SET("failed to enable participant")
  9. return RMW_RET_ERROR;
  10. }
  11. if (DDS_RETCODE_OK !=
  12. DDS_Entity_enable(DDS_Subscriber_as_entity(this->dds_sub)))
  13. {
  14. RMW_CONNEXT_LOG_ERROR_SET("failed to enable dds subscriber")
  15. return RMW_RET_ERROR;
  16. }
  17. if (DDS_RETCODE_OK !=
  18. DDS_Entity_enable(DDS_Publisher_as_entity(this->dds_pub)))
  19. {
  20. RMW_CONNEXT_LOG_ERROR_SET("failed to enable dds subscriber")
  21. return RMW_RET_ERROR;
  22. }
  23. if (RMW_RET_OK != rmw_connextdds_graph_enable(this)) {
  24. RMW_CONNEXT_LOG_ERROR("failed to enable graph cache")
  25. return RMW_RET_ERROR;
  26. }
  27. return RMW_RET_OK;
  28. }

并在此时启动了discover线程

  1. rmw_ret_t
  2. rmw_connextdds_graph_enable(rmw_context_impl_t * const ctx)
  3. {
  4. auto pub = reinterpret_cast<RMW_Connext_Publisher *>(ctx->common.pub->data);
  5. if (RMW_RET_OK != pub->enable()) {
  6. return RMW_RET_ERROR;
  7. }
  8. auto sub = reinterpret_cast<RMW_Connext_Subscriber *>(ctx->common.sub->data);
  9. if (RMW_RET_OK != sub->enable()) {
  10. return RMW_RET_ERROR;
  11. }
  12. if (RMW_RET_OK != rmw_connextdds_enable_builtin_readers(ctx)) {
  13. return RMW_RET_ERROR;
  14. }
  15. rmw_ret_t ret = rmw_connextdds_discovery_thread_start(ctx);
  16. if (RMW_RET_OK != ret) {
  17. RMW_CONNEXT_LOG_ERROR("failed to start discovery thread")
  18. return ret;
  19. }
  20. return RMW_RET_OK;
  21. }
  1. rmw_ret_t
  2. rmw_connextdds_discovery_thread_start(rmw_context_impl_t * ctx)
  3. {
  4. rmw_dds_common::Context * const common_ctx = &ctx->common;
  5. RMW_CONNEXT_LOG_DEBUG("starting discovery thread...")
  6. common_ctx->listener_thread_gc =
  7. rmw_connextdds_create_guard_condition(true /* internal */);
  8. if (nullptr == common_ctx->listener_thread_gc) {
  9. RMW_CONNEXT_LOG_ERROR(
  10. "failed to create discovery thread condition")
  11. return RMW_RET_ERROR;
  12. }
  13. common_ctx->thread_is_running.store(true);
  14. try {
  15. common_ctx->listener_thread =
  16. std::thread(rmw_connextdds_discovery_thread, ctx);
  17. const char* THREAD_NAME = "ROSdiscovery";
  18. //pthread_setname_np(common_ctx->listener_thread.native_handle(), THREAD_NAME);
  19. ros::common::utils::SetThreadName(common_ctx->listener_thread.native_handle(), THREAD_NAME);
  20. RMW_CONNEXT_LOG_DEBUG("discovery thread started")
  21. return RMW_RET_OK;
  22. } catch (const std::exception & exc) {
  23. RMW_CONNEXT_LOG_ERROR_A_SET("Failed to create std::thread: %s", exc.what())
  24. } catch (...) {
  25. RMW_CONNEXT_LOG_ERROR_SET("Failed to create std::thread")
  26. }
  27. /* We'll get here only on error, so clean up things accordingly */
  28. common_ctx->thread_is_running.store(false);
  29. if (RMW_RET_OK !=
  30. rmw_connextdds_destroy_guard_condition(
  31. common_ctx->listener_thread_gc))
  32. {
  33. RMW_CONNEXT_LOG_ERROR(
  34. "Failed to destroy discovery thread guard condition")
  35. }
  36. return RMW_RET_ERROR;
  37. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/727224
推荐阅读
  

闽ICP备14008679号