日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Logstash同步mysql一对多数据到ES(踩坑日记系列)

發布時間:2025/3/19 数据库 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Logstash同步mysql一对多数据到ES(踩坑日记系列) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

場景:

Logstash 、Kibana、ES版本:6.3.1。

使用Logstash從mysql同步用戶和用戶所有的寵物到ES中。

希望的格式:

"register_name": "孟林潔","id": 80469531,"pets": [{"breed_name": "萬能梗","birthday": null,"pet_id": 999044,"name": "一只狗","images": "{\"result\":[\"https://petkit-img3.oss-cn-hangzhou.aliyuncs.com/img/tmp_6f4c8e92de0c53ab355fdb69214d4bf3.jpg\"]}","breed_id": 130},{"breed_name": "萬能梗","birthday": null,"pet_id": 999097,"name": "一只狗2","images": "{\"result\":[\"https://petkit-img3.oss-cn-hangzhou.aliyuncs.com/img/tmp_6f4c8e92de0c53ab355fdb69214d4bf3.jpg\"]}","breed_id": 130}],"mobile": "*******","avatar": null,"pet_list": [999044,999097]

問題

  • logstash同步nested嵌套類型到ES中。
  • logstash同步嵌套數組對象時,聚合過程中數據丟失(用戶寵物會隨機丟失,偶而數據不丟失)。
  • logstash同步時少同步一條數據,在停止logstash服務時才進行同步
  • (更新) mysql的多條數據同步到es只有一條
  • 解決:

    1、解決logstash同步nested嵌套類型到ES中
    先創建索引,并且修改索引類型為nested

    創建索引:PUT /user 修改索引映射: PUT /user/_mapping/doc {"doc": {"properties": {"avatar": {"type": "text"},"id": {"type": "long"},"mobile": {"type": "text"},"pets": {"type": "nested","properties": {"birthday": {"type": "date"},"breed_id": {"type": "long"},"breed_name": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"},"images": {"type": "text"},"name": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"},"pet_id": {"type": "long"}}},"register_name": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"}}} }

    使用logstash的過濾器中aggregate插件進行數據聚合。

    配置文件jdbc3.conf

    input {stdin {}jdbc {jdbc_driver_library => "../mysql-connector-java-6.0.6.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://****.com:3306/food-dev"jdbc_user => "****"jdbc_password => "****"#jdbc_paging_enabled => "true"#jdbc_page_size => "50"clean_run => trueuse_column_value => truerecord_last_run => "true"tracking_column => "id"schedule => "*/1 * * * *"#last_run_metadata_path => "/Users/menglinjie/ES-node/testdata.text"statement => "select u.id,u.register_name,u.mobile,u.avatar,u.status,svp.id as pet_id,svp.name,svp.images,svp.gender,svp.birthday,pb.id as breed_id,pb.name as breed_name from user u left join store_vip_pet svp on svp.user_id = u.id and svp.pet_status = 1 left join pet_breed pb on svp.breed_id = pb.id order by u.id desc"}}filter { #這里做聚合aggregate {task_id => "%{id}"code => "map['id'] = event.get('id')map['register_name'] = event.get('register_name')map['mobile'] = event.get('mobile')map['avatar'] = event.get('avatar')map['pet_list'] ||=[]map['pets'] ||=[]if (event.get('pet_id') != nil)if !(map['pet_list'].include? event.get('pet_id')) map['pet_list'] << event.get('pet_id') map['pets'] << {'pet_id' => event.get('pet_id'),'name' => event.get('name'),'images' => event.get('images'),'breed_id' => event.get('breed_id'),'breed_name' => event.get('breed_name'),'birthday' => event.get('birthday')}endendevent.cancel()"push_previous_map_as_event => truetimeout => 5}json {source => "message"remove_field => ["message"]#remove_field => ["message", "type", "@timestamp", "@version"]}mutate {#將不需要的JSON字段過濾,且不會被存入 ES 中remove_field => ["tags", "@timestamp", "@version"]} }output {stdout {#codec => json_lines}elasticsearch {hosts => ["127.0.0.1:9200"]index => "user"document_id => "%{id}"} }

    2、解決聚合過程中子數組對象丟失

    剛開始考慮到是否是sql查詢分頁問題,導致多寵物沒有一起聚合,而是分開聚合。但是通過看日志和數據發現每次丟失的數據不同,沒有任何規律性。

    隨機性的問題讓我想到多線程,然后查找logstash配置。在config/logstash.yml中有以下配置:

    # ------------ Pipeline Settings -------------- # # The ID of the pipeline. # 管道id # pipeline.id: test1 # # Set the number of workers that will, in parallel, execute the filters+outputs # stage of the pipeline. # # This defaults to the number of the host's CPU cores. #output 和 filter的線程數,默認是cpu核數 # pipeline.workers: 1 # # How many events to retrieve from inputs before sending to filters+workers # # pipeline.batch.size: 1000 # # How long to wait in milliseconds while polling for the next event # before dispatching an undersized batch to filters+outputs # # pipeline.batch.delay: 50

    問題定位:多線程跑聚合過程中,同一個用戶的多個寵物可能被分配到不通過的線程,分別做不同的聚合,導致一個用戶存在多條數據,分別擁有不同的寵物,然后多線程的進行輸出到ES,ES保存過程中會把存在的數據給更新掉,這就是我的寵物丟失的原因,多線程分配的隨機性導致數據也隨機丟失。

    嘗試修改線程數,驗證猜想是否正確。

    指定配置文件運行會使logstash忽略logstash.yml配置。所以在logstash.yml指定配置文件,運行logstash時不用指定配置,logstash會自動尋找logstash.yml配置

    # path.config: /Users/menglinjie/ES-node/logstash-6.3.1/conf.d

    驗證后確認猜想正確。

    回想剛才把線程數設置為1,這樣肯定會影響性能的吧,萬一以后我有不需要聚合的的數據時完全可以多線程跑。Logstash提供的pipelines.yml可以配置多管道,使不同的同步任務綁定不同管道配置。

    這里pipeline.workers: 4,pipeline.output.workers: 3,那么執行聚合的filter就是1,這樣可以單線程聚合,多線程輸出。

    多個任務可以配置多個管道,pipeline.id標示管道唯一性。

    - pipeline.id: user_pipelinepipeline.workers: 4pipeline.batch.size: 1000# 輸出pipeline.output.workers: 3# 配置文件位置path.config: "/Users/menglinjie/ES-node/logstash-6.3.1/conf.d/*.conf"# 對基于磁盤的排隊進行“持久化”。默認值是內存queue.type: persisted

    更新:
    影響聚合結果的還有sql語句!!
    sql語句必須根據聚合task_id排序,也就是需要聚合的數據必須排在一起。否則map[‘pets’]會被覆蓋掉,導致數據丟失。

    3、logstash同步時少同步一條數據,在停止logstash服務時才進行同步

    在filter 聚合配置中添加:

    timeout => 3

    filter aggregate 創建中 event map 并不知道我、這次事件是不是應該結束,也就是它也不知道到那一條才是最后一條, 因此設置一個 timeout 告訴它這個時間執行多少秒就結束繼續執行第二個。但這樣并不是很嚴謹,因為你也不確定你的 event map 到底要執行多久 。最好的方式是 我們應該給定一個 task end 的條件 ES官網關于 aggregate 的說明

    4、es 配置id的問題,必須有唯一性,否則被覆蓋

    參考鏈接:

    https://segmentfault.com/a/1190000016592277

    https://segmentfault.com/q/1010000016861266

    https://blog.csdn.net/weixin_33910460/article/details/88719101

    https://elasticsearch.cn/question/6648

    總結

    以上是生活随笔為你收集整理的Logstash同步mysql一对多数据到ES(踩坑日记系列)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。