日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

吊炸天的 Kafka 图形化工具 Eagle,必须推荐给你!

發布時間:2025/3/16 编程问答 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 吊炸天的 Kafka 图形化工具 Eagle,必须推荐给你! 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Kafka是當下非常流行的消息中間件,據官網透露,已有成千上萬的公司在使用它。最近實踐了一波Kafka,確實很好很強大。今天我們來從三個方面學習下Kafka:Kafaka在Linux下的安裝,Kafka的可視化工具,Kafka和SpringBoot結合使用。希望大家看完后能快速入門Kafka,掌握這個流行的消息中間件!

?

Kafka簡介

Kafka是由LinkedIn公司開發的一款開源分布式消息流平臺,由Scala和Java編寫。主要作用是為處理實時數據提供一個統一、高吞吐、低延遲的平臺,其本質是基于發布訂閱模式的消息引擎系統。

Kafka具有以下特性:

  • 高吞吐、低延遲:Kafka收發消息非常快,使用集群處理消息延遲可低至2ms。

  • 高擴展性:Kafka可以彈性地擴展和收縮,可以擴展到上千個broker,數十萬個partition,每天處理數萬億條消息。

  • 永久存儲:Kafka可以將數據安全地存儲在分布式的,持久的,容錯的群集中。

  • 高可用性:Kafka在可用區上可以有效地擴展群集,某個節點宕機,集群照樣能夠正常工作。

?

Kafka安裝

我們將采用Linux下的安裝方式,安裝環境為CentOS 7.6。此處沒有采用Docker來安裝部署,個人感覺直接安裝更簡單(主要是官方沒提供Docker鏡像)!

  • 首先我們需要下載Kafka的安裝包,下載地址:https://mirrors.bfsu.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz

  • 下載完成后將Kafka解壓到指定目錄:

cd?/mydata/kafka/ tar?-xzf?kafka_2.13-2.8.0.tgz
  • 解壓完成后進入到解壓目錄:

cd?kafka_2.13-2.8.0
  • 雖然有消息稱Kafka即將移除Zookeeper,但是在Kafka最新版本中尚未移除,所以啟動Kafka前還是需要先啟動Zookeeper;

  • 啟動Zookeeper服務,服務將運行在2181端口;

#?后臺運行服務,并把日志輸出到當前文件夾下的zookeeper-out.file文件中 nohup?bin/zookeeper-server-start.sh?config/zookeeper.properties?>?zookeeper-out.file?2>&1?&
  • 由于目前Kafka是部署在Linux服務器上的,外網如果想要訪問,需要修改Kafka的配置文件config/server.properties,修改下Kafka的監聽地址,否則會無法連接;

############################# Socket Server Settings ############################## The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://192.168.5.78:9092
  • 最后啟動Kafka服務,服務將運行在9092端口。

#?后臺運行服務,并把日志輸出到當前文件夾下的kafka-out.file文件中 nohup?bin/kafka-server-start.sh?config/server.properties?>?kafka-out.file?2>&1?&


Kafka命令行操作

接下來我們使用命令行來操作下Kafka,熟悉下Kafka的使用。

  • 首先創建一個叫consoleTopic的Topic;

bin/kafka-topics.sh?--create?--topic?consoleTopic?--bootstrap-server?192.168.5.78:9092
  • 接下來查看Topic;

bin/kafka-topics.sh?--describe?--topic?consoleTopic?--bootstrap-server?192.168.5.78:9092
  • 會顯示如下Topic信息;

Topic:?consoleTopic?TopicId:?tJmxUQ8QRJGlhCSf2ojuGw?PartitionCount:?1?ReplicationFactor:?1?Configs:?segment.bytes=1073741824Topic:?consoleTopic?Partition:?0?Leader:?0?Replicas:?0?Isr:?0
  • 向Topic中發送消息:

bin/kafka-console-producer.sh?--topic?consoleTopic?--bootstrap-server?192.168.5.78:9092
  • 直接在命令行中輸入信息即可發送;

  • 重新打開一個窗口,通過如下命令可以從Topic中獲取消息:

bin/kafka-console-consumer.sh?--topic?consoleTopic?--from-beginning?--bootstrap-server?192.168.5.78:9092


?

Kafka可視化

使用命令行操作Kafka確實有點麻煩,接下來我們試試可視化工具kafka-eagle。

安裝JDK

如果你使用的是CentOS的話,默認沒有安裝完整版的JDK,需要自行安裝!

  • 下載JDK 8,下載地址:https://mirrors.tuna.tsinghua.edu.cn/AdoptOpenJDK/

  • 下載完成后將JDK解壓到指定目錄;

cd?/mydata/java tar?-zxvf?OpenJDK8U-jdk_x64_linux_xxx.tar.gz mv?OpenJDK8U-jdk_x64_linux_xxx.tar.gz?jdk1.8
  • 在/etc/profile文件中添加環境變量JAVA_HOME。

vi?/etc/profile #?在profile文件中添加 export?JAVA_HOME=/mydata/java/jdk1.8 export?PATH=$PATH:$JAVA_HOME/bin #?使修改后的profile文件生效 .?/etc/profile

安裝kafka-eagle

  • 下載kafka-eagle的安裝包,下載地址:https://github.com/smartloli/kafka-eagle-bin/releases

  • 下載完成后將kafka-eagle解壓到指定目錄;

cd?/mydata/kafka/ tar?-zxvf?kafka-eagle-web-2.0.5-bin.tar.gz
  • 在/etc/profile文件中添加環境變量KE_HOME;

vi?/etc/profile #?在profile文件中添加 export?KE_HOME=/mydata/kafka/kafka-eagle-web-2.0.5 export?PATH=$PATH:$KE_HOME/bin #?使修改后的profile文件生效 .?/etc/profile
  • 安裝MySQL并添加數據庫ke,kafka-eagle之后會用到它;

  • 修改配置文件$KE_HOME/conf/system-config.properties,主要是修改Zookeeper的配置和數據庫配置,注釋掉sqlite配置,改為使用MySQL;

###################################### # multi zookeeper & kafka cluster list ###################################### kafka.eagle.zk.cluster.alias=cluster1 cluster1.zk.list=localhost:2181###################################### # kafka eagle webui port ###################################### kafka.eagle.webui.port=8048###################################### # kafka sqlite jdbc driver address ###################################### # kafka.eagle.driver=org.sqlite.JDBC # kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db # kafka.eagle.username=root # kafka.eagle.password=www.kafka-eagle.org###################################### # kafka mysql jdbc driver address ###################################### kafka.eagle.driver=com.mysql.cj.jdbc.Driver kafka.eagle.url=jdbc:mysql://localhost:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull kafka.eagle.username=root kafka.eagle.password=root
  • 使用如下命令啟動kafka-eagle;

$KE_HOME/bin/ke.sh?start
  • 命令執行完成后會顯示如下信息,但并不代表服務已經啟動成功,還需要等待一會;

  • 再介紹幾個有用的kafka-eagle命令:

#?停止服務 $KE_HOME/bin/ke.sh?stop #?重啟服務 $KE_HOME/bin/ke.sh?restart #?查看服務運行狀態 $KE_HOME/bin/ke.sh?status #?查看服務狀態 $KE_HOME/bin/ke.sh?stats #?動態查看服務輸出日志 tail?-f?$KE_HOME/logs/ke_console.out
  • 啟動成功可以直接訪問,輸入賬號密碼admin:123456,訪問地址:http://192.168.5.78:8048/

  • 登錄成功后可以訪問到Dashboard,界面還是很棒的!

可視化工具使用

  • 之前我們使用命令行創建了Topic,這里可以直接通過界面來創建;

  • 我們還可以直接通過kafka-eagle來發送消息;

  • 我們可以通過命令行來消費Topic中的消息;

bin/kafka-console-consumer.sh?--topic?testTopic?--from-beginning?--bootstrap-server?192.168.5.78:9092
  • 控制臺獲取到信息顯示如下;

  • 還有一個很有意思的功能叫KSQL,可以通過SQL語句來查詢Topic中的消息;

  • 可視化工具自然少不了監控,如果你想開啟kafka-eagle對Kafka的監控功能的話,需要修改Kafka的啟動腳本,暴露JMX的端口;

vi?kafka-server-start.sh #?暴露JMX端口 if?[?"x$KAFKA_HEAP_OPTS"?=?"x"?];?thenexport?KAFKA_HEAP_OPTS="-server?-Xms2G?-Xmx2G?-XX:PermSize=128m?-XX:+UseG1GC?-XX:MaxGCPauseMillis=200?-XX:ParallelGCThreads=8?-XX:ConcGCThreads=5?-XX:InitiatingHeapOccupancyPercent=70"export?JMX_PORT="9999" fi
  • 來看下監控圖表界面;

  • 還有一個很騷氣的監控大屏功能;

  • 還有Zookeeper的命令行功能,總之功能很全,很強大!

?

?

SpringBoot整合Kafka

在SpringBoot中操作Kafka也是非常簡單的,比如Kafka的消息模式很簡單,沒有隊列,只有Topic。

  • 首先在應用的pom.xml中添加Spring Kafka依賴;

<!--Spring整合Kafka--> <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.7.1</version> </dependency>
  • 修改應用配置文件application.yml,配置Kafka服務地址及consumer的group-id;

server:port:?8088 spring:kafka:bootstrap-servers:?'192.168.5.78:9092'consumer:group-id:?"bootGroup"
  • 創建一個生產者,用于向Kafka的Topic中發送消息;

/***?Kafka消息生產者*?Created?by?macro?on?2021/5/19.*/ @Component public?class?KafkaProducer?{@Autowiredprivate?KafkaTemplate?kafkaTemplate;public?void?send(String?message){kafkaTemplate.send("bootTopic",message);} }
  • 創建一個消費者,用于從Kafka中獲取消息并消費;

/***?Kafka消息消費者*?Created?by?macro?on?2021/5/19.*/ @Slf4j @Component public?class?KafkaConsumer?{@KafkaListener(topics?=?"bootTopic")public?void?processMessage(String?content)?{log.info("consumer?processMessage?:?{}",content);}}
  • 創建一個發送消息的接口,調用生產者去發送消息;

/***?Kafka功能測試*?Created?by?macro?on?2021/5/19.*/ @Api(tags?=?"KafkaController",?description?=?"Kafka功能測試") @Controller @RequestMapping("/kafka") public?class?KafkaController?{@Autowiredprivate?KafkaProducer?kafkaProducer;@ApiOperation("發送消息")@RequestMapping(value?=?"/sendMessage",?method?=?RequestMethod.GET)@ResponseBodypublic?CommonResult?sendMessage(@RequestParam?String?message)?{kafkaProducer.send(message);return?CommonResult.success(null);} }
  • 直接在Swagger中調用接口進行測試;

  • 項目控制臺會輸出如下信息,表明消息已經被接收并消費掉了。

2021-05-19?16:59:21.016??INFO?2344?---?[ntainer#0-0-C-1]?c.m.mall.tiny.component.KafkaConsumer????:?consumer?processMessage?:?Spring?Boot?message!

?

?

總結

通過本文的一波實踐,大家基本就能入門Kafka了。安裝、可視化工具、結合SpringBoot,這些基本都是和開發者相關的操作,也是學習Kafka的必經之路。

參考資料

  • Kafka官方文檔:https://kafka.apache.org/quickstart

  • kafka-eagle官方文檔:http://www.kafka-eagle.org/articles/docs/introduce/getting-started.html

  • Kafka相關概念:https://juejin.cn/post/6844903495670169607

有道無術,術可成;有術無道,止于術

歡迎大家關注Java之道公眾號

好文章,我在看??

總結

以上是生活随笔為你收集整理的吊炸天的 Kafka 图形化工具 Eagle,必须推荐给你!的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。