日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

ZeroMQ之Publish/Subscribe (Java)

發(fā)布時間:2025/3/13 java 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ZeroMQ之Publish/Subscribe (Java) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

前面的文章介紹了比較簡單的Request/Subscribe模式, 這篇文章介紹更為經(jīng)典的Publish/Subscribe通信模式用來ZeroMQ的實現(xiàn),其通信方式如下圖:



客戶端(subscriber)向服務器(publisher)訂閱消息,然后服務器可以將消息推送到所有訂閱了消息的客戶端,這里也可以理解為廣播吧。。。。


好了,閑話不多說了,直接上用ZeroMQ實現(xiàn)這種通信模式的代碼吧:

(1)服務端(publisher):

[java]?view plaincopy
  • package?pubsub;??
  • ??
  • ??
  • import?org.zeromq.ZMQ;??
  • ??
  • public?class?Publisher?{??
  • ????public?static?void?main(String?args[])?{??
  • ??????
  • ????????ZMQ.Context?context?=?ZMQ.context(1);??//創(chuàng)創(chuàng)建包含一個I/O線程的context??
  • ????????ZMQ.Socket?publisher?=?context.socket(ZMQ.PUB);???//創(chuàng)建一個publisher類型的socket,他可以向所有訂閱的subscriber廣播數(shù)據(jù)??
  • ??????????
  • ????????publisher.bind("tcp://*:5555");??//將當前publisher綁定到5555端口上,可以接受subscriber的訂閱??
  • ??????????
  • ????????while?(!Thread.currentThread?().isInterrupted?())?{??
  • ????????????String?message?=?"fjs?hello";??//最開始可以理解為pub的channel,subscribe需要訂閱fjs這個channel才能接收到消息??
  • ????????????publisher.send(message.getBytes());??
  • ????????}??
  • ??
  • ????????publisher.close();??
  • ????????context.term();??
  • ????}??
  • }??

  • 代碼很簡單吧,這里publisher來充當服務端,所有的subscriber都需要建立于publisher的連接。,,。,


    (2)客戶端(subscriber)代碼:

    [java]?view plaincopy
  • package?pubsub;??
  • ??
  • import?org.zeromq.ZMQ;??
  • ??
  • public?class?Subscriber?{??
  • ????public?static?void?main(String?args[])?{??
  • ????????for?(int?j?=?0;?j?<?100;?j++)?{??
  • ????????????new?Thread(new?Runnable(){??
  • ??
  • ????????????????public?void?run()?{??
  • ????????????????????//?TODO?Auto-generated?method?stub??
  • ????????????????????ZMQ.Context?context?=?ZMQ.context(1);??//創(chuàng)建1個I/O線程的上下文??
  • ????????????????????ZMQ.Socket?subscriber?=?context.socket(ZMQ.SUB);?????//創(chuàng)建一個sub類型,也就是subscriber類型的socket??
  • ????????????????????subscriber.connect("tcp://127.0.0.1:5555");????//與在5555端口監(jiān)聽的publisher建立連接??
  • ????????????????????subscriber.subscribe("fjs".getBytes());?????//訂閱fjs這個channel??
  • ??????????????????????
  • ????????????????????for?(int?i?=?0;?i?<?100;?i++)?{??
  • ????????????????????????byte[]?message?=?subscriber.recv();??//接收publisher發(fā)送過來的消息??
  • ????????????????????????System.out.println("receive?:?"?+?new?String(message));??
  • ????????????????????}??
  • ????????????????????subscriber.close();??
  • ????????????????????context.term();??
  • ????????????????}??
  • ??????????????????
  • ????????????}).start();??
  • ????????}??
  • ??????????
  • ??????????
  • ????}??
  • }??

  • 這里需要注意訂閱的channel問題,如果這里錯了的話,subscriber是不會受到publisher發(fā)送過來的數(shù)據(jù)的


    好了,到這里publish/subscribe的實現(xiàn)就算ok了

    轉(zhuǎn)載于:https://www.cnblogs.com/jym-sunshine/p/5441470.html

    總結(jié)

    以上是生活随笔為你收集整理的ZeroMQ之Publish/Subscribe (Java)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。