日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka消息模拟器

發布時間:2024/4/17 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka消息模拟器 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
package clickstream import java.util.{Properties, Random, UUID} import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import org.codehaus.jettison.json.JSONObject/** * Created by 郭飛 on 2016/5/31. */ object KafkaMessageGenerator {private val random = new Random()private var pointer = -1private val os_type = Array("Android", "IPhone OS","None", "Windows Phone")def click() : Double = {random.nextInt(10)}def getOsType() : String = {pointer = pointer + 1if(pointer >= os_type.length) {pointer = 0os_type(pointer)} else {os_type(pointer)}}def main(args: Array[String]): Unit = {val topic = "user_events"//本地虛擬機ZK地址val brokers = "hadoop1:9092,hadoop2:9092,hadoop3:9092"val props = new Properties()props.put("metadata.broker.list", brokers)props.put("serializer.class", "kafka.serializer.StringEncoder")val kafkaConfig = new ProducerConfig(props)val producer = new Producer[String, String](kafkaConfig)while(true) {// prepare event dataval event = new JSONObject()event.put("uid", UUID.randomUUID())//隨機生成用戶id.put("event_time", System.currentTimeMillis.toString) //記錄時間發生時間.put("os_type", getOsType) //設備類型.put("click_count", click) //點擊次數// produce event messageproducer.send(new KeyedMessage[String, String](topic, event.toString))println("Message sent: " + event)Thread.sleep(200)}} }作者:MichaelFly 鏈接:http://www.jianshu.com/p/ccba410462ba 來源:簡書 著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。

?

轉載于:https://www.cnblogs.com/rocky-AGE-24/p/7404754.html

總結

以上是生活随笔為你收集整理的Kafka消息模拟器的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。