logstash 获取多个kafka_logstash 配置详解
簡介
ELK技術棧中的logstash起到了管道的作用,其主要的功能可以看做input-filter-output這幾樣,因此可以看住logstash在數據傳輸中的作用就是對數據進行一定程度的修改。一般我們是用此來使數據格式更加規范,符合我們期望的格式,從而在kibana或者ES檢索更加方便。
主要組成
logstash下載即用,對環境的要求是jre,11以上就能完全使用了。其中bin目錄是主文件,config目錄是配置目錄,有關屬性配置,性能調優都在這一塊。一般主要用到的是logstash-sample.conf,input-filter-output都寫在這一塊。
顧名思義,input就是獲取數據,filter就是過濾器,也是我們處理數據的地方,output則是輸出到指定地區。本筆記以從kafka獲取數據,輸出到ES為示例。
input
input區域較為簡潔,我們從kafka獲取數據可以這樣按下示例子寫。具體配置參見這里
input{ kafka{ #一個input中可以寫多個kafka bootstrap_servers => "localhost:9092" #這里是kafka的地址,如果要從多個kafka獲取數據的話可以以逗號分隔 topic => "nginx" #指定從kafka的哪個topic獲取數據 codec => json{ charset => "UTF-8" } #如果數據是json格式可直接在這里進行轉換 consumer_threads => 2 #使用多少個消費者線程,盡量與分區數一致 enable_auto_commit => true #是否啟用自動告訴kafka成功消費的偏移量(或索引),下次會自動以該偏移量為消耗的初始偏移量 auto_offset_reset => "latest" #初始時將偏移量移到最新偏移量 auto_commit_interval_ms => "1000" #定時提交偏移量的時間間隔 group_id => "logstash" #指定該消費者的group_id。可用于負載均衡 add_field => { "[@metadata][type]" => "nginx" #以@metadata來存放數據的是不會被output的,除某些被配置了metadata => true的配置項 }#增加數據的字段 }}filter
filter區是我們處理數據的主力區域,數據的處理流程都在這里寫就。filter是通過一個個插件組合實現的,具體的插件以及參數可以從這里看。目前插件數量有46種,常用的有date drop grok json mutate。filter區域的寫法如下,其格式與input的大致相同的。
filter{ plugin{ ...some param }}需要重視的是各個插件的組合使用。下文涉及的所有tag都會放在最外層的tags的列表里。插件通用繼承了的且常用方法有add_field add_tag id remove_field remove_tag這些方法在任意一個filter中都可以使用。
date
filter{ date{ match=>["client_time","yyyy-MM-dd HH:mm:ss"] #若client_time字段與后者匹配則替換target,會變成UTC時間,標準時區 target=>"real time" #target默認為@timestamp }}drop
filter{ if [Level] == "DEBUG"{#利用[filed]的語法可訪問字段,可嵌套 drop {} #drop后日志就會被丟棄 }}grok
grok是十分重要的插件,用于按正則提取信息后自動賦值到字段。更進一步的使用規則可以點擊這里。
filter{ match =>{ prtterns_dir => ["./patterns"] #如果你有需要可以自己定義一個規則 "message" => "%{IP:client} %{Number:times:int}" #match會按字段=>正則的形式提取信息,其%{IP:client}中,IP是內置規則,client是欲賦值的字段名,默認值均為字符串,如果已知類型可以加以轉換,如在后面加上:int來轉化為整數型 }}內置的規則你可以從這里查看。
json
filter{ json { source => "message" #指定需要解碼的字段 target => "jsoncontent" #指定存放解碼后內容的字段,如果不寫將會把所有字段釋放在最外層 remove_field => "test" #刪除名為test的字段 }#如果解碼失敗會產生一個"_jsonparsefailure"的tag,當然你也可以用tag_on_failure來指定}mutate
mutate是用于數據修改、字符串處理、類型轉換等重要功能的插件。
filter{ mutate { split => ["hostname","."] #會被原地替換 add_field => { "shortHostname" => "%{hostname[0]}" } } mutate { rename => ["shortHostname","hostname"] #替換 } mutate { convert => { "fieldname" => "integer" "booleanfield" => "boolean" } }}由于功能繁多,建議一定要去看官方文檔。
output
output的格式與input的格式類似。
output{ elasticsearch{ hosts => ["localhost:9200"] }}更多的設置,如修改index等,可參考官方文檔使用。
性能調優
logstash的性能與內存占用向來為人所詬病,因此我們要關注的是單獨抽離logstash,使其獨立成為集群。輕量級的beat的為此提供了解決方案,不需要依賴,內存與cpu消耗極低,而kafka則是緩解了生產者與消費者時間-速率不一致的矛盾。從而我們可以通過拓展logstash集群從而達到提高處理速率的目的。其中,我們也可以通過調整logstash的參數來提高其性能。
- worker 調整worker數量,可以提高filter的效率,一般可設置為略高于CPU數
- batch_size 調整worker一次批量處理的條數,默認為150,其與ES的bulk寫入次數相關。
- Output.flush_size&Output.idle_flush_time 控制lush的大小與頻率
- Filter.grok 盡量用合適的類型分割字段,盡量避免使用DATA,增加一定的錨點可提高grok性能
總結
以上是生活随笔為你收集整理的logstash 获取多个kafka_logstash 配置详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 名媛蜜语3事业爱情攻略有哪些
- 下一篇: f分布表完整图_分布式计算引擎之星——S