日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

如何编写一个 Pulsar Broker Interceptor 插件

發布時間:2023/12/24 windows 32 coder
生活随笔 收集整理的這篇文章主要介紹了 如何编写一个 Pulsar Broker Interceptor 插件 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

背景

之前寫過一篇文章 VictoriaLogs:一款超低占用的 ElasticSearch 替代方案講到了我們使用 Victorialogs 來存儲 Pulsar 消息隊列的消息 trace 信息。

而其中的關鍵的埋點信息是通過 Pulsar 的 BrokerInterceptor 實現的,后面就有朋友咨詢這塊代碼是否開源,目前是沒有開源的,不過借此機會可以聊聊如何實現一個 BrokerInterceptor 插件,當前還沒有相關的介紹文檔。

其實當時我在找 BrokerInterceptor 的相關資料時就發現官方并沒有提供對應的開發文檔。

只有一個 additional servlet的開發文檔,而 BrokerInterceptor 只在 YouTube 上找到了一個社區分享的視頻。

雖說看視頻可以跟著實現,但總歸是沒有文檔方便。


在這之前還是先講講 BrokerInterceptor 有什么用?

其實從它所提供的接口就能看出,在消息到達 Broker 后的一些關鍵節點都提供了相關的接口,實現這些接口就能做很多事情了,比如我這里所需要的消息追蹤。

創建項目

下面開始如何使用 BrokerInterceptor
首先是創建一個 Maven 項目,然后引入相關的依賴:

<dependency>  
<groupId>org.apache.pulsar</groupId>  
<artifactId>pulsar-broker</artifactId>  
<version>${pulsar.version}</version>  
<scope>provided</scope>  
</dependency>

實現接口

然后我們便可以實現 org.apache.pulsar.broker.intercept.BrokerInterceptor 來完成具體的業務了。

在我們做消息追蹤的場景下,我們實現了以下幾個接口:

  • messageProduced
  • messageDispatched
  • messageAcked

messageProduced 為例,需要解析出消息ID,然后拼接成一個字符串寫入 Victorialogs 存儲中,其余的兩個埋點也是類似的。

@Override  
public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId, long entryId,  
                            Topic.PublishContext publishContext) {  
    String ns = getNs(producer.getTopic().getName());  
    if (!LogSender.checkNamespace(ns)) {  
        return;  
    }    String topic = producer.getTopic().getName();  
    String partition = getPartition(topic);  
    String msgId = String.format("%s:%s:%s", ledgerId, entryId, partition);  
    String s = new Event.Publish(msgId, producer.getClientAddress(), System.currentTimeMillis(),  
            producer.getProducerName(), topic).toString();  
    LogSender.send(s);  
}

編寫項目描述文件

我們需要創建一個項目描述文件,路徑如下:
src/main/resources/META-INF/services/broker_interceptor.yml
名字也是固定的,broker 會在啟動的時候讀取這個文件,其內容如下:

name: interceptor-name
description: description
interceptorClass: com.xx.CustomInterceptor

重點是填寫自定義實現類的全限定名。

配置打包插件

<build>  
  <finalName>${project.artifactId}</finalName>  
  <plugins>  
    <plugin>  
      <groupId>org.apache.nifi</groupId>  
      <artifactId>nifi-nar-maven-plugin</artifactId>  
      <version>1.2.0</version>  
      <extensions>true</extensions>  
      <configuration>  
        <finalName>${project.artifactId}-${project.version}</finalName>  
      </configuration>  
      <executions>  
        <execution>  
          <id>default-nar</id>  
          <phase>package</phase>  
          <goals>  
            <goal>nar</goal>  
          </goals>  
        </execution>  
      </executions>  
    </plugin>  
  </plugins>  
</build>

由于 Broker 識別的是 nar 包,所以我們需要配置 nar 包插件,之后使用 mvn package 就會生成出 nar 包。

配置 broker.conf

我們還需要在 broker.conf 中配置:

brokerInterceptors: "interceptor-name"

也就是剛才配置的插件名稱。

不過需要注意的是,如果你是使用 helm 安裝的 pulsar,在 3.1 版本之前需要手動將brokerInterceptors 寫入到 broker.conf 中。

FROM apachepulsar/pulsar-all:3.0.1  
COPY target/interceptor-1.0.1.nar /pulsar/interceptors/  
RUN echo "\n" >> /pulsar/conf/broker.conf  
RUN echo "brokerInterceptors=" >> /pulsar/conf/broker.conf

不然在最終容器中的 broker.conf 中是讀取不到這個配置的,導致插件沒有生效。

我們是重新基于官方鏡像打的一個包含自定義插件的鏡像,最終使用這個鏡像進行部署。

https://github.com/apache/pulsar/pull/20719
我在這個 PR 中已經將配置加入進去了,但得在 3.1 之后才能生效;也就是在 3.1 之前都得加上加上這行:

RUN echo "\n" >> /pulsar/conf/broker.conf  
RUN echo "brokerInterceptors=" >> /pulsar/conf/broker.conf

目前來看 Pulsar 的 BrokerInterceptor 應該使用不多,不然使用 helm 安裝時是不可能生效的;而且官方文檔也沒用相關的描述。

總結

以上是生活随笔為你收集整理的如何编写一个 Pulsar Broker Interceptor 插件的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。