生活随笔
收集整理的這篇文章主要介紹了
基于OpenDDS应用程序开发(3)订阅端实现
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
連續(xù)的三篇博文演示如何基于OpenDDS開發(fā)應用程序,將數據從發(fā)布端節(jié)點發(fā)送到訂閱端節(jié)點,該示例程序由一個發(fā)布者發(fā)布數據,一個訂閱者訂閱數據,使用默認的QoS策略和TCP/IP傳輸方式。
本文是第三篇,主要介紹開發(fā)一個簡單的OpenDDS訂閱端應用程序所涉及的步驟。省略一些不重要部分(如:#include部分和異常處理等)代碼,只寫出關鍵代碼。
1、新建訂閱端工程:
參考前一博文中MPC的用法,在Demo.mpc文件中增加如下內容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
project(*Subscriber) : dcpsexe_with_tcp {
exename = subscriber after += *idl
TypeSupport_Files { Demo.idl }
Source_Files { Subscriber.cpp DataReaderListenerImpl.cpp } }
|
Subscriber工程從父工程dcpsexe_with_tcp繼承,這里直接使用idl工程中定義好的Demo.idl文件。
之后在Demo目錄下新建三個文件:Subscriber.cpp、DataReaderListenerImpl.h、DataReaderListenerImpl.cpp,分別用來編寫訂閱端邏輯部分代碼,并再次使用如下命令來生成Vs2008工程:
生成完成之后,使用Vs2008打開Demo.sln,就可以修改訂閱端代碼了:
2、初始化參與者:
初始化訂閱端參與者代碼同發(fā)布端是完全一樣的,在Subscriber.cpp文件中增加如下內容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
int main(int argc, char *argv[]) { try { DDS::DomainParticipantFactory_var dpf = TheParticipantFactoryWithArgs(argc, argv);
DDS::DomainParticipant_var participant = dpf->create_participant(42, // Domain ID PARTICIPANT_QOS_DEFAULT, 0, // No listener required OpenDDS::DCPS::DEFAULT_STATUS_MASK); if (!participant) { std::cerr << "create_participant failed." << std::endl; return 1 ; }
|
3、注冊數據類型并創(chuàng)建主題:
接下來,初始化數據類型和主題:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
Demo::PosTypeSupport_var mts = new Demo::PosTypeSupportImpl(); if (DDS::RETCODE_OK != mts->register_type(participant, "")) { std::cerr << "Failed to register the PosTypeSupport." << std::endl; return 1; }
CORBA::String_var type_name = mts->get_type_name(); DDS::Topic_var topic = participant->create_topic("Pos Demo", type_name, TOPIC_QOS_DEFAULT, 0, // No listener required OpenDDS::DCPS::DEFAULT_STATUS_MASK); if (!topic) { std::cerr << "Failed to create_topic." << std::endl; return 1; }
|
4、創(chuàng)建訂閱者:
調用create_subscriber()操作創(chuàng)建一個帶有默認QoS策略的訂閱者:
1 2 3 4 5 6 7 8
|
DDS::Subscriber_var sub = participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT, 0, // No listener required OpenDDS::DCPS::DEFAULT_STATUS_MASK); if (!sub) { std::cerr << "Failed to create_subscriber." << std::endl; return 1; }
|
5、創(chuàng)建數據讀者及監(jiān)聽者:
訂閱端需要給數據讀者關聯一個監(jiān)聽者,用來接收數據的到達,下面的代碼定義了一個監(jiān)聽者對象,類DataReaderListenerImpl的實現將在下一部分介紹。
1
|
DDS::DataReaderListener_var listener(new DataReaderListenerImpl);
|
現在采用默認的QoS策略創(chuàng)建數據讀者,并將它與主題、剛剛創(chuàng)建的監(jiān)聽者對象相關聯起來:
1 2 3 4 5 6 7 8 9
|
DDS::DataReader_var dr = sub->create_datareader(topic, DATAREADER_QOS_DEFAULT, listener, OpenDDS::DCPS::DEFAULT_STATUS_MASK); if (!dr) { std::cerr << "create_datareader failed." << std::endl; 大專欄%20基于OpenDDS應用程序開發(fā)(3)訂閱端實現ne"> return 1; }
|
之后,主線程就可以自由的去處理其它工作了,當有數據到達時,OpenDDS會調用監(jiān)聽者對象的回調接口通知,只需要在DataReaderListenerImpl類的回調函數中接收需要的數據就可以了。
6、數據讀者監(jiān)聽者實現:
監(jiān)聽者類繼承自DDS規(guī)范的DDS::DataReaderListener接口,該接口定義了一些回調函數,每個回調函數被調用時,就是一個事件的通知,如:斷開、重連等,以下是DataReaderListener接口的定義:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
module DDS {
local interface DataReaderListener : Listener {
void on_requested_deadline_missed(in DataReader reader, in RequestedDeadlineMissedStatus status);
void on_requested_incompatible_qos(in DataReader reader, in RequestedIncompatibleQosStatus status);
void on_sample_rejected(in DataReader reader, in SampleRejectedStatus status);
void on_liveliness_changed(in DataReader reader, in LivelinessChangedStatus status);
void on_data_available(in DataReader reader);
void on_subscription_matched(in DataReader reader, in SubscriptionMatchedStatus status);
void on_sample_lost(in DataReader reader, in SampleLostStatus status);
}; };
|
在本例的DataReaderListenerImpl類中真正需要的實現的回調接口是on_data_available(),它也是我們需要重新派生該類的唯一成員函數:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
|
void DataReaderListenerImpl::on_data_available(DDS::DataReader_ptr reader) {
num_reads_ ++;
try{
Demo::PosDataReader_var reader_i = Demo::PosDataReader::_narrow(reader); if (!reader_i) { std::cerr << "read: _narrow failed." << std::endl; return; }
Demo::Pos pos; DDS::SampleInfo si ; DDS::ReturnCode_t status = reader_i->take_next_sample(pos, si) ; if (status == DDS::RETCODE_OK) { if (si.valid_data == 1) { std::cout << " Pos:pos_id = " << pos. pos_id << std::endl << " pos_x = " << pos. pos_x << std::endl << " pos_y = " << pos. pos_y << std::endl; } else if (si.instance_state == DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE) { std::cout << "instance is disposed" << std::endl; } else if (si.instance_state == DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE) { std::cout << "instance is unregistered" << std::endl; } else { std::cerr << "ERROR: received unknown instance state " << si.instance_state << std::endl; } } else if (status == DDS::RETCODE_NO_DATA) { cerr << "ERROR: reader received DDS::RETCODE_NO_DATA!" << std::endl; } else { cerr << "ERROR: read Pos: " << status << std::endl; }
|
上面的代碼將樣本從數據讀者中取出,如果成功并能返回有效數據,就打印出接收到數據的每一個字段。
每當有樣本數據到達時,該函數就會被調用。
7、實體清理:
在訂閱完數據以后,需要清理與OpenDDS相關聯的資源:
1 2 3
|
participant->delete_contained_entities(); dpf->delete_participant(participant); TheServiceParticipant->shutdown();
|
調用域參與者的delete_contained_entities()操作刪除所有該參與者創(chuàng)建的主題、訂閱者。一旦執(zhí)行完該操作,就可以使用域參與者工廠刪除域參與者了。
8、示例程序運行:
修改完以上代碼并編譯完成,就可以運行訂閱端應用程序了,需要先運行DDS的信息倉庫,開始中打開一個CMD窗口,執(zhí)行如下命令:
1
|
%DDS_ROOT%/bin/DCPSInfoRepo -ORBListenEndpoints iiop://localhost:12345
|
再次打開一個CMD窗口,cd到Demo目錄下,執(zhí)行如下命令:
1
|
subscriber -DCPSInfoRepo corbaloc::localhost:12345/DCPSInfoRepo
|
至此,訂閱端應用程序就開發(fā)完成并運行起來了。
有關OpenDDS的相關問題歡迎發(fā)送郵件至lyingbo@aliyun.com一起討論
總結
以上是生活随笔為你收集整理的基于OpenDDS应用程序开发(3)订阅端实现的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。