赞
踩
- #include <chrono>
- #include <memory>
- #include "rclcpp/rclcpp.hpp"
- #include "std_msgs/msg/string.hpp"
-
- using namespace std::chrono_literals;
- class MinimalPublisher : public rclcpp::Node
- {
- public:
- MinimalPublisher()
- : Node("minimal_publisher"), count_(0)
- {
- publisher_ = this->create_publisher<std_msgs::msg::String>("topic", 10);
- auto timer_callback =
- [this]() -> void {
- auto message = std_msgs::msg::String();
- message.data = "Hello, world! " + std::to_string(this->count_++);
- RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", message.data.c_str());
- this->publisher_->publish(message);
- };
- timer_ = this->create_wall_timer(500ms, timer_callback);
- }
-
- private:
- rclcpp::TimerBase::SharedPtr timer_;
- rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
- size_t count_;
- };
-
- int main(int argc, char * argv[])
- {
- rclcpp::init(argc, argv);
- rclcpp::spin(std::make_shared<MinimalPublisher>());
- rclcpp::shutdown();
- return 0;
- }

MinimalPublisher必须继承rclcpp::Node类,这样才能让executor去spin它。MinimalPublisher声明了一个rclcpp::Publisher<std_msgs::msg::String>::SharedPtr成员,在构造函数中调用了rclcpp::Node::create_publisher<>进行了初始化。create_publisher的定义看一下
- template <
- typename MessageT,
- typename AllocatorT = std::allocator<void>,
- typename PublisherT = rclcpp::Publisher<MessageT, AllocatorT>>
- std::shared_ptr<PublisherT>
- create_publisher(
- const std::string &topic_name,
- const rclcpp::QoS &qos,
- const PublisherOptionsWithAllocator<AllocatorT> &options =
- PublisherOptionsWithAllocator<AllocatorT>());
- emplate<typename MessageT, typename AllocatorT, typename PublisherT>
- std::shared_ptr<PublisherT>
- Node::create_publisher(
- const std::string & topic_name,
- const rclcpp::QoS & qos,
- const PublisherOptionsWithAllocator<AllocatorT> & options)
- {
- // first to check whitelist of topic name
- if (!rclcpp::node_interfaces::topology_validate_topic_name(topic_name)) {
- throw rclcpp::exceptions::InvalidTopicNameError(
- topic_name.c_str(),
- "not in the whitelist",
- 0);
- }
- return rclcpp::create_publisher<MessageT, AllocatorT, PublisherT>(
- *this,
- extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()),
- qos,
- options);
- }

- template<
- typename MessageT,
- typename AllocatorT = std::allocator<void>,
- typename PublisherT = rclcpp::Publisher<MessageT, AllocatorT>,
- typename NodeT>
- std::shared_ptr<PublisherT>
- create_publisher(
- NodeT & node,
- const std::string & topic_name,
- const rclcpp::QoS & qos,
- const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options = (
- rclcpp::PublisherOptionsWithAllocator<AllocatorT>()
- )
- )
- {
- return detail::create_publisher<MessageT, AllocatorT, PublisherT>(
- node, node, topic_name, qos, options);
- }

- template<
- typename MessageT,
- typename AllocatorT = std::allocator<void>,
- typename PublisherT = rclcpp::Publisher<MessageT, AllocatorT>,
- typename NodeParametersT,
- typename NodeTopicsT>
- std::shared_ptr<PublisherT>
- create_publisher(
- NodeParametersT & node_parameters,
- NodeTopicsT & node_topics,
- const std::string & topic_name,
- const rclcpp::QoS & qos,
- const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options = (
- rclcpp::PublisherOptionsWithAllocator<AllocatorT>()
- )
- )
- {
- auto node_topics_interface = rclcpp::node_interfaces::get_node_topics_interface(node_topics);
- const rclcpp::QoS & actual_qos = options.qos_overriding_options.get_policy_kinds().size() ?
- rclcpp::detail::declare_qos_parameters(
- options.qos_overriding_options, node_parameters,
- node_topics_interface->resolve_topic_name(topic_name),
- qos, rclcpp::detail::PublisherQosParametersTraits{}) :
- qos;
-
- // Create the publisher.
- auto pub = node_topics_interface->create_publisher(
- topic_name,
- rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(options),
- actual_qos
- );
-
- // Add the publisher to the node topics interface.
- node_topics_interface->add_publisher(pub, options.callback_group);
-
- return std::dynamic_pointer_cast<PublisherT>(pub);
- }

create_publisher借助一个rclcpp::PublisherFactory中的函数成员来构造PublisherBase
node_topics.cpp
- rclcpp::PublisherBase::SharedPtr
- NodeTopics::create_publisher(
- const std::string & topic_name,
- const rclcpp::PublisherFactory & publisher_factory,
- const rclcpp::QoS & qos)
- {
- rclcpp::QoS & qos_adapter = const_cast<rclcpp::QoS &>(qos);
- qos_adapter.reliable();
-
- // Create the MessageT specific Publisher using the factory, but return it as PublisherBase.
- return publisher_factory.create_typed_publisher(node_base_, topic_name, qos_adapter);
- }
create_typed_publisher成员是一个lambda函数构造的函数对象,这个函数为:
- [options](
- rclcpp::node_interfaces::NodeBaseInterface * node_base,
- const std::string & topic_name,
- const rclcpp::QoS & qos
- ) -> std::shared_ptr<PublisherT>
- {
- auto publisher = std::make_shared<PublisherT>(node_base, topic_name, qos, options);
- // This is used for setting up things like intra process comms which
- // require this->shared_from_this() which cannot be called from
- // the constructor.
- publisher->post_init_setup(node_base, topic_name, qos, options);
- return publisher;
- }
node 初始化过程:
- ret = rcl_node_init(
- rcl_node.get(),
- node_name.c_str(), namespace_.c_str(),
- context_->get_rcl_context().get(), &rcl_node_options);
- node->impl->rmw_node_handle = rmw_create_node(
- &(node->context->impl->rmw_context),
- name, local_namespace_);
- rmw_node_t *
- rmw_api_connextdds_create_node(
- rmw_context_t * context,
- const char * name,
- const char * ns)
- rmw_ret_t
- rmw_context_impl_t::initialize_node(
- const char * const node_namespace,
- const char * const node_name,
- const bool localhost_only)
- rmw_ret_t
- rmw_context_impl_t::initialize_participant(
- const char * const node_namespace,
- const char * const node_name,
- const bool localhost_only)
- RMW_Connext_Node *
- RMW_Connext_Node::create(
- rmw_context_impl_t * const ctx)
并初始化和更新了graph_cache
- rmw_ret_t
- rmw_context_impl_t::enable_participant()
- {
- if (DDS_RETCODE_OK !=
- DDS_Entity_enable(
- DDS_DomainParticipant_as_entity(this->participant)))
- {
- RMW_CONNEXT_LOG_ERROR_SET("failed to enable participant")
- return RMW_RET_ERROR;
- }
-
- if (DDS_RETCODE_OK !=
- DDS_Entity_enable(DDS_Subscriber_as_entity(this->dds_sub)))
- {
- RMW_CONNEXT_LOG_ERROR_SET("failed to enable dds subscriber")
- return RMW_RET_ERROR;
- }
-
- if (DDS_RETCODE_OK !=
- DDS_Entity_enable(DDS_Publisher_as_entity(this->dds_pub)))
- {
- RMW_CONNEXT_LOG_ERROR_SET("failed to enable dds subscriber")
- return RMW_RET_ERROR;
- }
-
- if (RMW_RET_OK != rmw_connextdds_graph_enable(this)) {
- RMW_CONNEXT_LOG_ERROR("failed to enable graph cache")
- return RMW_RET_ERROR;
- }
-
- return RMW_RET_OK;
- }

并在此时启动了discover线程
-
- rmw_ret_t
- rmw_connextdds_graph_enable(rmw_context_impl_t * const ctx)
- {
- auto pub = reinterpret_cast<RMW_Connext_Publisher *>(ctx->common.pub->data);
- if (RMW_RET_OK != pub->enable()) {
- return RMW_RET_ERROR;
- }
-
- auto sub = reinterpret_cast<RMW_Connext_Subscriber *>(ctx->common.sub->data);
- if (RMW_RET_OK != sub->enable()) {
- return RMW_RET_ERROR;
- }
-
- if (RMW_RET_OK != rmw_connextdds_enable_builtin_readers(ctx)) {
- return RMW_RET_ERROR;
- }
-
- rmw_ret_t ret = rmw_connextdds_discovery_thread_start(ctx);
- if (RMW_RET_OK != ret) {
- RMW_CONNEXT_LOG_ERROR("failed to start discovery thread")
- return ret;
- }
-
- return RMW_RET_OK;
- }

-
- rmw_ret_t
- rmw_connextdds_discovery_thread_start(rmw_context_impl_t * ctx)
- {
- rmw_dds_common::Context * const common_ctx = &ctx->common;
-
- RMW_CONNEXT_LOG_DEBUG("starting discovery thread...")
-
- common_ctx->listener_thread_gc =
- rmw_connextdds_create_guard_condition(true /* internal */);
- if (nullptr == common_ctx->listener_thread_gc) {
- RMW_CONNEXT_LOG_ERROR(
- "failed to create discovery thread condition")
- return RMW_RET_ERROR;
- }
-
- common_ctx->thread_is_running.store(true);
-
- try {
- common_ctx->listener_thread =
- std::thread(rmw_connextdds_discovery_thread, ctx);
- const char* THREAD_NAME = "ROSdiscovery";
- //pthread_setname_np(common_ctx->listener_thread.native_handle(), THREAD_NAME);
- ros::common::utils::SetThreadName(common_ctx->listener_thread.native_handle(), THREAD_NAME);
-
- RMW_CONNEXT_LOG_DEBUG("discovery thread started")
-
- return RMW_RET_OK;
- } catch (const std::exception & exc) {
- RMW_CONNEXT_LOG_ERROR_A_SET("Failed to create std::thread: %s", exc.what())
- } catch (...) {
- RMW_CONNEXT_LOG_ERROR_SET("Failed to create std::thread")
- }
-
- /* We'll get here only on error, so clean up things accordingly */
-
- common_ctx->thread_is_running.store(false);
- if (RMW_RET_OK !=
- rmw_connextdds_destroy_guard_condition(
- common_ctx->listener_thread_gc))
- {
- RMW_CONNEXT_LOG_ERROR(
- "Failed to destroy discovery thread guard condition")
- }
-
- return RMW_RET_ERROR;
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。