drools动态配置规则_基于 Flink 和 Drools 的实时日志处理
背景
日志系統接入的日志種類多、格式復雜多樣,主流的有以下幾種日志:
- filebeat采集到的文本日志,格式多樣
- winbeat采集到的操作系統日志
- 設備上報到logstash的syslog日志
- 接入到kafka的業務日志
以上通過各種渠道接入的日志,存在2個主要的問題:
- 格式不統一、不規范、標準化不夠
- 如何從各類日志中提取出用戶關心的指標,挖掘更多的業務價值
為了解決上面2個問題,我們基于flink和drools規則引擎做了實時的日志處理服務。
系統架構
架構比較簡單,架構圖如下:
各類日志都是通過kafka匯總,做日志中轉。
flink消費kafka的數據,同時通過API調用拉取drools規則引擎,對日志做解析處理后,將解析后的數據存儲到Elasticsearch中,用于日志的搜索和分析等業務。
為了監控日志解析的實時狀態,flink會將日志處理的統計數據,如每分鐘處理的日志量,每種日志從各個機器IP來的日志量寫到Redis中,用于監控統計。
模塊介紹
系統項目命名為eagle。
eagle-api:基于springboot,作為drools規則引擎的寫入和讀取API服務。
eagle-common:通用類模塊。
eagle-log:基于flink的日志處理服務。
重點講一下eagle-log:
對接kafka、ES和Redis
對接kafka和ES都比較簡單,用的官方的connector(flink-connector-kafka-0.10和flink-connector-elasticsearch6),詳見代碼。
對接Redis,最開始用的是org.apache.bahir提供的redis connector,后來發現靈活度不夠,就使用了Jedis。
在將統計數據寫入redis的時候,最開始用的keyby分組后緩存了分組數據,在sink中做統計處理后寫入,參考代碼如下:
String?name?=?"redis-agg-log";????????DataStream>>?keyedStream?=?dataSource.keyBy((KeySelector)?log?->?log.getIndex())
????????????????.timeWindow(Time.seconds(windowTime)).trigger(new?CountTriggerWithTimeout<>(windowCount,?TimeCharacteristic.ProcessingTime))
????????????????.process(new?ProcessWindowFunction>,?String,?TimeWindow>()?{@Overridepublic?void?process(String?s,?Context?context,?Iterable?iterable,?Collector>>?collector)?{
????????????????????????ArrayList?logs?=?Lists.newArrayList(iterable);if?(logs.size()?>?0)?{
????????????????????????????collector.collect(new?Tuple2(s,?logs));
????????????????????????}
????????????????????}
????????????????}).setParallelism(redisSinkParallelism).name(name).uid(name);
后來發現這樣做對內存消耗比較大,其實不需要緩存整個分組的原始數據,只需要一個統計數據就OK了,優化后:
String?name?=?"redis-agg-log";????????DataStream?keyedStream?=?dataSource.keyBy((KeySelector)?log?->?log.getIndex())
????????????????.timeWindow(Time.seconds(windowTime))
????????????????.trigger(new?CountTriggerWithTimeout<>(windowCount,?TimeCharacteristic.ProcessingTime))
????????????????.aggregate(new?LogStatAggregateFunction(),?new?LogStatWindowFunction())
????????????????.setParallelism(redisSinkParallelism).name(name).uid(name);
這里使用了flink的聚合函數和Accumulator,通過flink的agg操作做統計,減輕了內存消耗的壓力。
使用broadcast廣播drools規則引擎
1、drools規則流通過broadcast map state廣播出去。
2、kafka的數據流connect規則流處理日志。
//廣播規則流env.addSource(new?RuleSourceFunction(ruleUrl)).name(ruleName).uid(ruleName).setParallelism(1)
????????????????.broadcast(ruleStateDescriptor);
//kafka數據流
FlinkKafkaConsumer010?source?=?new?FlinkKafkaConsumer010<>(kafkaTopic,?new?LogSchema(),?properties);env.addSource(source).name(kafkaTopic).uid(kafkaTopic).setParallelism(kafkaParallelism);//數據流connect規則流處理日志
BroadcastConnectedStream?connectedStreams?=?dataSource.connect(ruleSource);
connectedStreams.process(new?LogProcessFunction(ruleStateDescriptor,?ruleBase)).setParallelism(processParallelism).name(name).uid(name);
具體細節參考開源代碼。
小結
本系統提供了一個基于flink的實時數據處理參考,對接了kafka、redis和elasticsearch,通過可配置的drools規則引擎,將數據處理邏輯配置化和動態化。
對于處理后的數據,也可以對接到其他sink,為其他各類業務平臺提供數據的解析、清洗和標準化服務。
項目地址:
https://github.com/luxiaoxun/eagle
作者:阿凡盧?
出處:http://www.cnblogs.com/luxiaoxun/
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的drools动态配置规则_基于 Flink 和 Drools 的实时日志处理的全部內容,希望文章能夠幫你解決所遇到的問題。