日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka REST Proxy for MapR Streams入门

發布時間:2023/12/3 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka REST Proxy for MapR Streams入门 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

介紹

MapR生態系統軟件包2.0(MEP)隨附了一些與MapR流有關的新功能:

  • MapR Streams的Kafka REST代理為MapR Streams和Kafka集群提供RESTful接口,以使用和生成消息并執行管理操作。
  • Kafka Connect for MapR Streams是一個實用程序,用于在MapR Streams與Apache Kafka和其他存儲系統之間流式傳輸數據。

MapR生態系統軟件包(MEP)是一種提供與核心升級脫鉤的生態系統升級的方法-允許您獨立于聚合數據平臺升級工具。 您可以在本文中進一步了解MEP 2.0。

在此博客中,我們描述了如何使用REST代理向MapR Streams發布消息和從MapR Streams使用消息。 REST代理是對MapR融合數據平臺的重要補充,允許任何編程語言使用MapR流。

MapR Streams工具隨附的Kafka REST Proxy可以與MapR Streams一起使用(默認),也可以與Apache Kafka混合使用。 在本文中,我們將重點介紹MapR流。 <!–更多–>

先決條件

  • 具有MEP 2.0的MapR融合數據平臺5.2
    • 使用MapR Streams工具
  • curl,wget或任何HTTP / REST客戶端工具

創建MapR流和主題

流是主題的集合,您可以通過以下方式將其作為一個組進行管理:

  • 設置適用于該流中所有主題的安全策略
  • 為流中創建的每個新主題設置默認的分區數
  • 為流中每個主題中的消息設置生存時間
  • 您可以在文檔中找到有關MapR Streams概念的更多信息。

    在您的Mapr群集或沙盒上,運行以下命令:

    $ maprcli stream create -path /apps/iot-stream -produceperm p -consumeperm p -topicperm p$ maprcli stream topic create -path /apps/iot-stream -topic sensor-json -partitions 3$ maprcli stream topic create -path /apps/iot-stream -topic sensor-binary -partitions 3

    啟動Kafka控制臺的生產者和消費者

    打開兩個終端窗口,并使用以下命令運行使用者的Kafka實用程序:

    消費者

    • 主題傳感器-json
    $ /opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-consumer.sh --new-consumer --bootstrap-server this.will.be.ignored:9092 --topic /apps/iot-stream:sensor-json
    • 主題傳感器二進制
    $ /opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-consumer.sh --new-consumer --bootstrap-server this.will.be.ignored:9092 --topic /apps/iot-stream:sensor-binary

    這兩個終端窗口可讓您查看有關不同主題的消息

    使用Kafka REST代理

    檢查主題元數據

    端點/topics/[topic_name]允許您獲取有關該主題的一些信息。 在MapR Streams中,主題是路徑標識的流的一部分; 要使用REST API使用主題,您必須使用完整路徑,并將其編碼在URL中; 例如:

    • /apps/iot-stream:sensor-json將使用%2Fapps%2Fiot-stream%3Asensor-json進行編碼

    運行以下命令,以獲取有關sensor-json主題的信息

    $ curl -X GET http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json

    注意:為簡單起見,我從運行Kafka REST代理的節點上運行命令,因此可以使用localhost 。

    您可以通過添加以下Python命令,以一種漂亮的方式打印JSON:

    $ curl -X GET http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json | python -m json.tool

    默認流

    如上所述,流路徑是您必須在命令中使用的主題名稱的一部分。 但是可以將MapR Kafka REST代理配置為使用默認流。 為此,您應該在/opt/mapr/kafka-rest/kafka-rest-2.0.1/config/kafka-rest.properties文件中添加以下屬性:

    • streams.default.stream=/apps/iot-stream更改Kafka REST代理配置時,必須使用maprcli或MCS重新啟動服務。使用streams.default.stream屬性的streams.default.stream是簡化URL使用的URL。以應用為例
      • 通過streams.default.stream ,可以使用curl -X GET http://localhost:8082/topics/

      在本文中,所有URL都包含編碼的流名稱,就像您可以開始使用Kafka REST代理而無需更改配置,也可以將其用于其他流。

    發布消息

    用于MapR流的Kafka REST代理允許應用程序將消息發布到MapR流。 消息可以作為JSON或二進制內容(base64編碼)發送。

    要發送JSON消息:

    • 查詢應該是HTTP POST
    • 內容類型應為: application/vnd.kafka.json.v1+json
    • 身體:
    {"records":[{"value":{"temp" : 10 ,"speed" : 40 ,"direction" : "NW"} }] }

    完整的請求是:

    curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \--data '{"records":[{"value": {"temp" : 10 , "speed" : 40 , "direction" : "NW"} }]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json

    您應該在運行/apps/iot-stream:sensor-json使用者的終端窗口中看到打印的消息。

    要發送二進制消息:

    • 查詢應該是HTTP POST
    • 內容類型應為: application/vnd.kafka.binary.v1+json
    • 身體:
    {"records":[{"value":"SGVsbG8gV29ybGQ="}] }

    請注意, SGVsbG8gV29ybGQ=是在Base64中編碼的字符串“ Hello World”。

    完整的請求是:

    curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \--data '{"records":[{"value":"SGVsbG8gV29ybGQ="}]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary

    您應該在運行/apps/iot-stream:sensor-binary使用者的終端窗口中看到打印的消息。

    發送多條消息

    HTTP正文的records字段允許您發送多個消息,例如,您可以發送:

    curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \--data '{"records":[{"value": {"temp" : 12 , "speed" : 42 , "direction" : "NW"} }, {"value": {"temp" : 10 , "speed" : 37 , "direction" : "N"} } ]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json

    該命令將發送2條消息,并將偏移量增加2。您可以對二進制內容執行相同的操作,只需在JSON數組中添加新元素即可; 例如:

    curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \--data '{"records":[{"value":"SGVsbG8gV29ybGQ="}, {"value":"Qm9uam91cg=="}]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary

    您可能知道,可以為消息設置密鑰,以確保所有具有相同密鑰的消息都將到達同一分區。 為此,將key屬性添加到消息中,如下所示:

    {"records":[{"key": "K001","value":{"temp" : 10 ,"speed" : 40 ,"direction" : "NW"} }] }

    既然您知道如何使用REST代理將消息發布到MapR Stream主題,那么讓我們看看如何使用消息。

    消費信息

    REST代理還可以用于消費主題消息。 為此,您需要:

  • 創建使用者實例。
  • 使用第一次調用返回的URL來閱讀消息。
  • 如果需要,請刪除所引用的使用者。
  • 創建使用者實例

    以下請求創建使用者實例:

    curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \--data '{"name": "iot_json_consumer", "format": "json", "auto.offset.reset": "earliest"}' \http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json

    服務器的響應如下所示:

    {"instance_id":"iot_json_consumer","base_uri":"http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer" }

    請注意,我們已使用/consumers/[topic_name]創建使用者。

    后續請求將使用base_uri從主題獲取消息。 與任何MapR Streams / Kafka使用者一樣, auto.offset.reset定義其行為。 在此示例中,該值設置為earliest ,這意味著使用者將從頭開始閱讀消息。 您可以在MapR Streams文檔中找到有關使用者配置的更多信息。

    消費信息

    要使用這些消息,只需將Mapr Streams主題添加到使用者實體的URL。

    以下請求使用了該主題的消息:

    curl -X GET -H "Accept: application/vnd.kafka.json.v1+json" \ http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-json

    此調用返回JSON文檔中的消息:

    [{"key":null,"value":{"temp":10,"speed":40,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":1},{"key":null,"value":{"temp":12,"speed":42,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":2},{"key":null,"value":{"temp":10,"speed":37,"direction":"N"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":3} ]

    每次對API的調用都會根據上一次調用的偏移量返回發布的新消息。

    請注意,消費者將被銷毀:

    • 由consumer.instance.timeout.ms實例。超時。毫秒設置的空閑時間后(默認值設置為300000毫秒/ 5分鐘)
    • 使用REST API調用銷毀它(見下文)。

    消費二進制格式的消息

    如果需要使用二進制消息,則需要更改格式并接受標頭,該方法是相同的。

    調用此URL為二進制主題創建使用者實例:

    curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \--data '{"name": "iot_binary_consumer", "format": "binary", "auto.offset.reset": "earliest"}' \http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary

    然后使用消息,accept標頭設置為application/vnd.kafka.binary.v1+json :

    curl -X GET -H "Accept: application/vnd.kafka.binary.v1+json" \ http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-binary

    該調用返回JSON文檔中的消息,并且該值在Base64中編碼

    [{"key":null,"value":"SGVsbG8gV29ybGQ=","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":1},{"key":null,"value":"Qm9uam91cg==","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":2} ]

    刪除使用者實例

    如前所述,使用者將根據REST Proxy的consumer.instance.timeout.ms配置自動銷毀。 也可以使用使用者實例URI和HTTP DELETE調用銷毀實例,如下所示:

    curl -X DELETE http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer

    結論

    在本文中,您學習了如何對MapR流使用Kafka REST代理,該代理允許任何應用程序使用在MapR融合數據平臺中發布的消息。

    您可以在MapR文檔和以下資源中找到有關Kafka REST代理的更多信息:

    • MapR Streams入門
    • Ted Dunning和Ellen Friedman撰寫的“流式傳輸體系結構:使用Apache Kafka和MapR流的新設計”電子書

    翻譯自: https://www.javacodegeeks.com/2017/01/getting-started-kafka-rest-proxy-mapr-streams.html

    總結

    以上是生活随笔為你收集整理的Kafka REST Proxy for MapR Streams入门的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。