赞
踩
侦听器是线程安全的,可以不受限制地使用。但是要注意,所有提供的回调都是在侦听器的后台线程中同时执行的。如果你访问这个回调中的结构,你必须确保你是唯一一个访问它的人,或者用“std::mutex”之类的保护来访问它。
有关术语介绍,请阅读
[WaitSet C++example](…/WaitSet)。
Listener是一个完全线程安全的构造,它通过在后台线程中执行已注册的回调来对事件作出反应。事件可以像订阅者或用户触发器一样由_EventOrigins_发出。某些_EventOrigins_(如订阅者)因此可以发出多个事件类型。
侦听器的接口由两个方法组成:用回调附加新事件的“attachEvent”和“detachEvent”。这两个方法可以同时调用,甚至可以从事件触发的回调内部调用!
!!!注意
请注意_Listener_的线程安全限制,并仔细阅读[线程安全](#线程安全)一章。
比方说,我们有一个应用程序,它为我们提供了两种不同的服务:“Radar.FrontLeft.Counter”和“Rader.FrontRight.Ccounter”。每次我们收到来自左和右的样本时,我们都想用最新的值计算总和并打印出来。如果我们只收到其中一个样品,我们会将其保存到收到另一面为止。
此示例的发布者不包含任何新功能,但如果您有一些问题,请查看[icedelivery示例](…/icedelivery)。
subscriber主函数照常启动,在注册运行时之后,我们创建了启动后台线程的侦听器。
iox::popo::Listener listener;
因为它很有趣,我们还创建了一个每4秒触发一次的心跳触发器,这样“收到的心跳”就可以打印到控制台上。此外,我们必须创建两个订阅者来接收这两项服务的样本。
iox::popo::UserTrigger heartbeat;
iox::popo::Subscriber<CounterTopic> subscriberLeft({"Radar", "FrontLeft", "Counter"});
iox::popo::Subscriber<CounterTopic> subscriberRight({"Radar", "FrontRight", "Counter"});
接下来是一个“heartbeatThread”,它将每4秒触发我们的心跳触发。
std::thread heartbeatThread([&] {
while (!iox::posix::hasTerminationRequested())
{
heartbeat.trigger();
std::this_thread::sleep_for(std::chrono::seconds(4));
}
});
现在一切都设置好了,我们可以将订阅者附加到侦听器,这样每当收到新的示例(iox::popo::SubscriberEvent::DATA_RECEIVED
)时,我们的回调(onSampleReceivedCallback
)就会被调用。我们还附加了“heartbeat”用户触发器,通过另一个回调(“heartheartCallback”)将心跳消息打印到控制台。
listener.attachEvent(heartbeat, iox::popo::createNotificationCallback(heartbeatCallback)).or_else([](auto) { std::cerr << "unable to attach heartbeat event" << std::endl; std::exit(EXIT_FAILURE); }); // It is possible to attach any c function here with a signature of void(iox::popo::Subscriber<CounterTopic> *). // But please be aware that the listener does not take ownership of the callback, therefore it has to exist as // long as the event is attached. Furthermore, it excludes lambdas which are capturing data since they are not // convertable to a c function pointer. // to simplify the example we attach the same callback onSampleReceivedCallback again listener .attachEvent(subscriberLeft, iox::popo::SubscriberEvent::DATA_RECEIVED, iox::popo::createNotificationCallback(onSampleReceivedCallback)) .or_else([](auto) { std::cerr << "unable to attach subscriberLeft" << std::endl; std::exit(EXIT_FAILURE); }); listener .attachEvent(subscriberRight, iox::popo::SubscriberEvent::DATA_RECEIVED, iox::popo::createNotificationCallback(onSampleReceivedCallback)) .or_else([](auto) { std::cerr << "unable to attach subscriberRight" << std::endl; std::exit(EXIT_FAILURE); });
由于用户触发器只有一个事件,所以当我们将其附加到侦听器时,不必指定事件attachEvent’返回一个‘expected’,以通知我们附件是否成功。如果不是这种情况,则在每次
attachEvent调用后的
.or_else([](auto){`部分中执行错误处理。
在本例中,我们选择两次附加同一个回调以使事情变得更容易,但您可以自由附加任何具有签名“void(iox::popo::Subscriber<CounterTopic>*)”的回调。
设置已经完成,但它会立即终止,因为我们没有等待“SIGINT”或“SIGTERM”发送的阻塞程序。在其他示例中,我们没有这个问题,因为我们在while true循环中提取了所有事件,但仅使用回调需要类似于“SignalWatcher”的东西,它会等待“SIGINT”或“SIGTERM”发出信号。
iox::posix::waitForTerminationRequest();
当“waitForTerminationRequest”取消阻止时,我们会清理所有资源并优雅地终止进程。
listener.detachEvent(heartbeat);
listener.detachEvent(subscriberLeft, iox::popo::SubscriberEvent::DATA_RECEIVED);
listener.detachEvent(subscriberRight, iox::popo::SubscriberEvent::DATA_RECEIVED);
heartbeatThread.join();
提示:在_EventOrigin_超出范围之前,您不必像订阅者或用户触发器一样分离它。这也适用于_Listener_,实现的基于RAII的设计负责资源清理。
回调必须具有类似“void(PointerToEventOrigin*)”的签名。例如,我们的“heartbeatCallback”只打印消息“heartheat received”。
void heartbeatCallback(iox::popo::UserTrigger*)
{
std::cout << "heartbeat received " << std::endl;
}
“onSampleReceivedCallback”更为复杂。我们首先获取接收到的样本,并通过获取订阅者的服务描述来检查哪个订阅者发出了事件信号。如果实例等于“FrontLeft”,则将样本存储在“leftCache”中,否则存储在“rightCache”中。
void onSampleReceivedCallback(iox::popo::Subscriber<CounterTopic>* subscriber) { subscriber->take().and_then([subscriber](auto& sample) { auto instanceString = subscriber->getServiceDescription().getInstanceIDString(); // store the sample in the corresponding cache if (instanceString == iox::capro::IdString_t("FrontLeft")) { leftCache.emplace(*sample); } else if (instanceString == iox::capro::IdString_t("FrontRight")) { rightCache.emplace(*sample); } std::cout << "received: " << sample->counter << std::endl; }); // ... }
在下一步中,我们检查两个缓存是否都已填充。如果是这种情况,我们将打印一条额外的消息,其中说明两个接收值之和的结果。之后,我们重置两个缓存以重新开始。
void onSampleReceivedCallback(iox::popo::Subscriber<CounterTopic>* subscriber)
{
// ...
// if both caches are filled we can process them
if (leftCache && rightCache)
{
std::cout << "Received samples from FrontLeft and FrontRight. Sum of " << leftCache->counter << " + "
<< rightCache->counter << " = " << leftCache->counter + rightCache->counter << std::endl;
leftCache.reset();
rightCache.reset();
}
}
在这里,我们演示了如何将几乎所有内容都作为回调的附加参数提供。您只需要在“attachEvent”方法中提供一个对值的引用作为附加参数,然后在回调中作为参数提供。其中一个用例是访问静态方法中的对象的成员和方法,我们在这里演示了这一点。
这个例子与[ice_callbacks_subscriber.cpp](#ice_callbacks_subscriber.ppp)的例子相同,只是我们省略了循环心跳触发器。关键的区别在于,监听器现在是类成员,在每次回调中,我们都希望更改一些成员变量。为此,我们需要一个指向对象的额外指针,因为侦听器需要c函数引用,而c函数引用不允许在捕获时使用lambda表达式。在这里,我们可以使用userType功能,它允许我们提供this指针作为回调的附加参数。
主函数现在很短,我们实例化类型为“CounterService”的对象,并像前面的例子一样调用“waitForTerminationRequest”来等待来自用户的控件c事件。
iox::runtime::PoshRuntime::initRuntime(APP_NAME);
CounterService counterService;
iox::posix::waitForTerminationRequest();
Our CounterService
has the following members:
iox::popo::Subscriber<CounterTopic> m_subscriberLeft;
iox::popo::Subscriber<CounterTopic> m_subscriberRight;
iox::optional<CounterTopic> m_leftCache;
iox::optional<CounterTopic> m_rightCache;
iox::popo::Listener m_listener;
它们的目的与前面的例子相同。在构造函数中,我们初始化两个订阅者,并将它们附加到侦听器。但现在我们在iox::popo::createNotificationCallback
中添加了一个附加参数,即取消引用的this
指针。它必须取消引用,因为我们需要引用作为参数。
!!!注意
用户必须确保“attachEvent”中的contextData(“*this”)在附件及其回调被附加期间有效,否则回调上下文数据指针将悬空。
CounterService() : m_subscriberLeft({"Radar", "FrontLeft", "Counter"}) , m_subscriberRight({"Radar", "FrontRight", "Counter"}) { /// Attach the static method onSampleReceivedCallback and provide this as additional argument /// to the callback to gain access to the object whenever the callback is called. /// It is not possible to use a lambda with capturing here since they are not convertable to /// a C function pointer. /// important: the user has to ensure that the contextData (*this) lives as long as /// the subscriber with its callback is attached to the listener m_listener .attachEvent(m_subscriberLeft, iox::popo::SubscriberEvent::DATA_RECEIVED, iox::popo::createNotificationCallback(onSampleReceivedCallback, *this)) .or_else([](auto) { std::cerr << "unable to attach subscriberLeft" << std::endl; std::exit(EXIT_FAILURE); }); m_listener .attachEvent(m_subscriberRight, iox::popo::SubscriberEvent::DATA_RECEIVED, iox::popo::createNotificationCallback(onSampleReceivedCallback, *this)) .or_else([](auto) { std::cerr << "unable to attach subscriberRight" << std::endl; std::exit(EXIT_FAILURE); }); }
“onSampleReceivedCallback”现在是一个静态方法,而不是一个自由函数。它必须是静态的,因为我们需要一个C函数引用作为回调参数,并且静态方法可以转换为这样的类型。但在静态方法中,我们无法访问对象的成员,因此我们必须添加一个额外的参数,即指向对象本身的指针,称为“self”。
static void onSampleReceivedCallback(iox::popo::Subscriber<CounterTopic>* subscriber, CounterService* self) { subscriber->take().and_then([subscriber, self](auto& sample) { auto instanceString = subscriber->getServiceDescription().getInstanceIDString(); // store the sample in the corresponding cache if (instanceString == iox::capro::IdString_t("FrontLeft")) { self->m_leftCache.emplace(*sample); } else if (instanceString == iox::capro::IdString_t("FrontRight")) { self->m_rightCache.emplace(*sample); } std::cout << "received: " << sample->counter << std::endl; }); // if both caches are filled we can process them if (self->m_leftCache && self->m_rightCache) { std::cout << "Received samples from FrontLeft and FrontRight. Sum of " << self->m_leftCache->counter << " + " << self->m_rightCache->counter << " = " << self->m_leftCache->counter + self->m_rightCache->counter << std::endl; self->m_leftCache.reset(); self->m_rightCache.reset(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。