NET中解决KafKa多线程发送多主题的问题
一般在KafKa消費(fèi)程序中消費(fèi)可以設(shè)置多個(gè)主題,那在同一程序中需要向KafKa發(fā)送不同主題的消息,如異常需要發(fā)到異常主題,正常的發(fā)送到正常的主題,這時(shí)候就需要實(shí)例化多個(gè)主題,然后逐個(gè)發(fā)送。
在NET中用RdKafka組件來(lái)做消息處理,在Nuget中引用。
在程序中初始化Producer,并創(chuàng)建多個(gè)Topic
private string comtopic = "topic1";
? ? ? ? private string errtopic = "topic2";
? ? ? ? private string kfkip = "192.168.80.32:9092";
? ? ? ? Topic topic = null;
? ? ? ? Topic errTopic = null;
? ? ? ? public ExcuteFlow()
? ? ? ? {
? ? ? ? ? ? try
? ? ? ? ? ? {
? ? ? ? ? ? ? ? Producer producer = new Producer(kfkip);
? ? ? ? ? ? ? ? topic = producer.Topic(comtopic);
? ? ? ? ? ? ? ? errTopic = producer.Topic(errtopic);
? ? ? ? ? ? }
? ? ? ? ? ? catch (RdKafkaException ex)
? ? ? ? ? ? {
? ? ? ? ? ? ? ? LogHelper.Error("KafKa初始化KafKa異常 ", ex);
? ? ? ? ? ? }
? ? ? ? ? ? catch (Exception ex)
? ? ? ? ? ? {
? ? ? ? ? ? ? ? LogHelper.Error("KafKa初始化異常", ex);
? ? ? ? ? ? }
? ? ? ? }
在程序中發(fā)送其中一個(gè)主題:
? ? ? ? ? try
? ? ? ? ? ? {
? ? ? ? ? ? ? ? if (topic != null)
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? byte[] datas = Encoding.UTF8.GetBytes(JsonHelper.ToJson(flowCommond));
? ? ? ? ? ? ? ? ? ? Task<DeliveryReport> deliveryReport = topic.Produce(datas);
? ? ? ? ? ? ? ? ? ? var unused = deliveryReport.ContinueWith(task =>
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? LogHelper.Info("內(nèi)容:{flowCommond.ID} 發(fā)送到分區(qū):{task.Result.Partition}, Offset 為: {task.Result.Offset}");
? ? ? ? ? ? ? ? ? ? });
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? else
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? throw new Exception("發(fā)送消息到KafKa topic 為空");
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? catch (RdKafkaException ex)
? ? ? ? ? ? {
? ? ? ? ? ? ? ? LogHelper.Error("發(fā)送消息到KafKa ?KafKa異常", ex);
? ? ? ? ? ? }
? ? ? ? ? ? catch (Exception ex)
? ? ? ? ? ? {
? ? ? ? ? ? ? ? LogHelper.Error("發(fā)送消息到KafKa異常", ex);
? ? ? ? ? ? }
flowCommond為要發(fā)送的對(duì)象內(nèi)容,格式化為Json字符串再發(fā)送。
另一個(gè)主題一樣處理。
? 這里實(shí)現(xiàn)一個(gè)線程里面發(fā)送多個(gè)主題,那下面實(shí)現(xiàn)多個(gè)線程中如何發(fā)送多個(gè)主題。
多線程中如果每個(gè)線程都new Producer(kfkip) 一次,那KafKa的連接很快會(huì)被占滿。
那這里就用單例模式來(lái)解決這個(gè)問(wèn)題,每次要用到Producer時(shí)檢查一下是否已經(jīng)存在Producer實(shí)例,若存在則直接用不用再生成。
/// <summary>
? ? /// 單例模式的實(shí)現(xiàn)
? ? /// </summary>
? ? public class SingleProduct : Producer
? ? {
? ? ? ? // 定義一個(gè)靜態(tài)變量來(lái)保存類的實(shí)例
? ? ? ? private static SingleProduct uniqueInstance;
? ? ? ? // 定義一個(gè)標(biāo)識(shí)確保線程同步
? ? ? ? private static readonly object locker = new object();
? ? ? ? // 定義私有構(gòu)造函數(shù),使外界不能創(chuàng)建該類實(shí)例
? ? ? ? private SingleProduct(string brokerList) : base(brokerList)
? ? ? ? {
? ? ? ? }
? ? ? ? /// <summary>
? ? ? ? /// 定義公有方法提供一個(gè)全局訪問(wèn)點(diǎn),同時(shí)你也可以定義公有屬性來(lái)提供全局訪問(wèn)點(diǎn)
? ? ? ? /// </summary>
? ? ? ? /// <returns></returns>
? ? ? ? public static SingleProduct GetInstance()
? ? ? ? {
? ? ? ? ? ? // 當(dāng)?shù)谝粋€(gè)線程運(yùn)行到這里時(shí),此時(shí)會(huì)對(duì)locker對(duì)象 "加鎖",
? ? ? ? ? ? // 當(dāng)?shù)诙€(gè)線程運(yùn)行該方法時(shí),首先檢測(cè)到locker對(duì)象為"加鎖"狀態(tài),該線程就會(huì)掛起等待第一個(gè)線程解鎖
? ? ? ? ? ? // lock語(yǔ)句運(yùn)行完之后(即線程運(yùn)行完之后)會(huì)對(duì)該對(duì)象"解鎖"
? ? ? ? ? ? if (uniqueInstance == null)
? ? ? ? ? ? {
? ? ? ? ? ? ? ? lock (locker)
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? // 如果類的實(shí)例不存在則創(chuàng)建,否則直接返回
? ? ? ? ? ? ? ? ? ? if (uniqueInstance == null)
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? string kfkip = System.Configuration.ConfigurationManager.AppSettings["KfkIP"];
? ? ? ? ? ? ? ? ? ? ? ? try
? ? ? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? ? ? uniqueInstance = new SingleProduct(kfkip);
? ? ? ? ? ? ? ? ? ? ? ? ? ? LogHelper.Error("單例模式 實(shí)例化 SingleProduct");
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? catch (RdKafkaException ex)
? ? ? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? ? ? LogHelper.Error("單例模式 KafKa初始化KafKa異常 ", ex);
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? catch (Exception ex)
? ? ? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? ? ? LogHelper.Error("單例模式 KafKa初始化異常", ex);
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? return uniqueInstance;
? ? ? ? }
? ? }
然后在初始化的代碼中替換Producer producer = new Producer(kfkip);為?Producer producer = SingleProduct.GetInstance();
OK!以上就完成了多線程多主題的消息發(fā)送。
相關(guān)文章:
消息隊(duì)列 Kafka 的基本知識(shí)及 .NET Core 客戶端
.net Kafka.Client多個(gè)Consumer Group對(duì)Topic消費(fèi)不能完全覆蓋研究總結(jié)(一)
.net Kafka.Client多個(gè)Consumer Group對(duì)Topic消費(fèi)不能完全覆蓋研究總結(jié)(二)
原文地址:http://www.cnblogs.com/zhangs1986/p/7285525.html
.NET社區(qū)新聞,深度好文,微信中搜索dotNET跨平臺(tái)或掃描二維碼關(guān)注
總結(jié)
以上是生活随笔為你收集整理的NET中解决KafKa多线程发送多主题的问题的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: IdentityServer4 实现自定
- 下一篇: Visual Studio 2017 1