日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

自定义Flume拦截器,并将收集的日志存储到Kafka中(案例)

發布時間:2024/9/27 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 自定义Flume拦截器,并将收集的日志存储到Kafka中(案例) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1.引入POM文件

如果想調用Flume,需要引入flume相關的jar包依賴,jar包依賴如下:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>cn.com.toto.stormlogPro</artifactId><groupId>stormlogPro</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>cn.com.toto.flume</artifactId><dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.6.0</version><!-- 設置打包的時候,剔除依賴--><scope>provided</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass>cn.com.toto.stromlogpro.log4j.LogInfoBuilder</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.7</source><target>1.7</target></configuration></plugin></plugins></build> </project>

2.自定義的攔截器的代碼

package cn.com.toto.stromlogpro.flume;import org.apache.commons.lang.StringUtils; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.List;/*** 自定義一個點擊流收集的攔截器* * 1、實現一個Interceptor.Builder接口。* 2、Interceptor.Builder中有個configuref方法,通過configure獲取配置文件中的相應key。* 3、Interceptor.Builder中有個builder方法,通過builder創建一個自定義的AppInterceptor* 4、AppInterceptor中有兩個方法,一個是批處理,一個單條處理,將批處理的邏輯轉換為單條處理* 5、需要在單條數據中添加 appid,由于appid是變量。需要在AppInterceptor的構造器中傳入一些參數。* 6、為自定義的AppInterceptor創建有參構造器,將需要的參數傳入進來。** @author tuzq* @create 2017-06-25 12:48*/ public class AppInterceptor implements Interceptor{//4.定義成員變量appId,用來接收從配置文件中讀取的信息private String appId;public AppInterceptor(String appId) {this.appId = appId;}/*** 單條數據進行處理,通過這個方式為日志添加上系統id* @param event* @return*/@Overridepublic Event intercept(Event event) {String message = null;try {message = new String(event.getBody(), "utf-8");} catch (UnsupportedEncodingException e) {message = new String(event.getBody());}//處理邏輯if (StringUtils.isNotBlank(message)) {message = "aid:"+appId+"||msg:" +message;event.setBody(message.getBytes());//正常邏輯應該執行到這里return event;}return event;}/*** 批量數據進行處理* @param list* @return*/@Overridepublic List<Event> intercept(List<Event> list) {List<Event> resultList = new ArrayList<Event>();for (Event event : list) {Event r = intercept(event);if (r != null) {resultList.add(r);}}return resultList;}@Overridepublic void initialize() {}@Overridepublic void close() {}public static class AppInterceptorBuilder implements Interceptor.Builder{//1、獲取配置文件的appIdprivate String appId;@Overridepublic Interceptor build() {//3、構造攔截器return new AppInterceptor(appId);}@Overridepublic void configure(Context context) {//2、當出現default之后,就是點擊流告警系統this.appId = context.getString("appId","default");System.out.println("appId:"+appId);}} }

LogInfoBuilder的代碼如下:

package cn.com.toto.stromlogpro.log4j;import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.logging.Logger;/*** 通過這個工程模擬創建日志內容** @author tuzq* @create 2017-06-25 13:51*/ public class LogInfoBuilder {private final static Logger logger = Logger.getLogger("msg");public static void main(String[] args) {Random random = new Random();List<String> list = logInfoList();while(true) {logger.info(list.get(random.nextInt(list.size())));}}private static List<String> logInfoList() {List list = new ArrayList<String>();list.add("aid:1||msg:error: Caused by: java.lang.NoClassDefFoundError: com/starit/gejie/dao/SysNameDao");list.add("java.sql.SQLException: You have an error in your SQL syntax;");list.add("error Unable to connect to any of the specified MySQL hosts.");list.add("error:Servlet.service() for servlet action threw exception java.lang.NullPointerException");list.add("error:Exception in thread main java.lang.ArrayIndexOutOfBoundsException: 2");list.add("error:NoSuchMethodError: com/starit/.");list.add("error:java.lang.NoClassDefFoundError: org/coffeesweet/test01/Test01");list.add("error:java.lang.NoClassDefFoundError: org/coffeesweet/test01/Test01");list.add("error:Java.lang.IllegalStateException");list.add("error:Java.lang.IllegalMonitorStateException");list.add("error:Java.lang.NegativeArraySizeException");list.add("error:java.sql.SQLException: You have an error in your SQL syntax;");list.add("error:Java.lang.TypeNotPresentException ");list.add("error:Java.lang.UnsupprotedOperationException ");list.add("error Java.lang.IndexOutOfBoundsException");list.add("error Java.lang.ClassNotFoundException");list.add("error java.lang.ExceptionInInitializerError ");list.add("error:java.lang.IncompatibleClassChangeError ");list.add("error:java.lang.LinkageError ");list.add("error:java.lang.OutOfMemoryError ");list.add("error java.lang.StackOverflowError");list.add("error: java.lang.UnsupportedClassVersionError");list.add("error java.lang.ClassCastException");list.add("error: java.lang.CloneNotSupportedException");list.add("error: java.lang.EnumConstantNotPresentException ");list.add("error java.lang.IllegalMonitorStateException ");list.add("error java.lang.IllegalStateException ");list.add("error java.lang.IndexOutOfBoundsException ");list.add("error java.lang.NumberFormatException ");list.add("error java.lang.RuntimeException ");list.add("error java.lang.TypeNotPresentException ");list.add("error MetaSpout.java:9: variable i might not have been initialized");list.add("error MyEvaluator.java:1: class Test1 is public, should be declared in a file named Test1.java ");list.add("error Main.java:5: cannot find symbol ");list.add("error NoClassDefFoundError: asa wrong name: ASA ");list.add("error Test1.java:54: 'void' type not allowed here");list.add("error Test5.java:8: missing return statement");list.add("error:Next.java:66: cannot find symbol ");list.add("error symbol : method createTempFile(java.lang.String,java.lang.String,java.lang.String) ");list.add("error invalid method declaration; return type required");list.add("error array required, but java.lang.String found");list.add("error Exception in thread main java.lang.NumberFormatException: null 20. .");list.add("error non-static method cannot be referenced from a static context");list.add("error Main.java:5: non-static method fun1() cannot be referenced from a static context");list.add("error continue outside of loop");list.add("error MyAbstract.java:6: missing method body, or declare abstract");list.add("error Main.java:6: Myabstract is abstract; cannot be instantiated");list.add("error MyInterface.java:2: interface methods cannot have body ");list.add("error Myabstract is abstract; cannot be instantiated");list.add("error asa.java:3: modifier static not allowed here");list.add("error possible loss of precision found: long required:byte var=varlong");list.add("error java.lang.NegativeArraySizeException ");list.add("error java.lang.ArithmeticException: by zero");list.add("error java.lang.ArithmeticException");list.add("error java.lang.ArrayIndexOutOfBoundsException");list.add("error java.lang.ClassNotFoundException");list.add("error java.lang.IllegalArgumentException");list.add("error fatal error C1010: unexpected end of file while looking for precompiled header directive");list.add("error fatal error C1083: Cannot open include file: R…….h: No such file or directory");list.add("error C2011:C……clas type redefinition");list.add("error C2018: unknown character 0xa3");list.add("error C2057: expected constant expression");list.add("error C2065: IDD_MYDIALOG : undeclared identifier IDD_MYDIALOG");list.add("error C2082: redefinition of formal parameter bReset");list.add("error C2143: syntax error: missing : before ");list.add("error C2146: syntax error : missing ';' before identifier dc");list.add("error C2196: case value '69' already used");list.add("error C2509: 'OnTimer' : member function not declared in 'CHelloView'");list.add("error C2555: 'B::f1': overriding virtual function differs from 'A::f1' only by return type or calling convention");list.add("error C2511: 'reset': overloaded member function 'void (int)' not found in 'B'");list.add("error C2660: 'SetTimer' : function does not take 2 parameters");list.add("error warning C4035: 'f……': no return value");list.add("error warning C4553: '= =' : operator has no effect; did you intend '='");list.add("error C4716: 'CMyApp::InitInstance' : must return a value");list.add("error LINK : fatal error LNK1168: cannot open Debug/P1.exe for writing");list.add("error LNK2001: unresolved external symbol public: virtual _ _thiscall C (void)");list.add("error java.lang.IllegalArgumentException: Path index.jsp does not start with");list.add("error org.apache.struts.action.ActionServlet.process(ActionServlet.java:148");list.add("error org.apache.jasper.JasperException: Exception in JSP");list.add("error The server encountered an internal error () that prevented it from fulfilling this request");list.add("error org.apache.jasper.servlet.JspServletWrapper.handleJspException(JspServletWrapper.java:467");list.add("error javax.servlet.http.HttpServlet.service(HttpServlet.java:803)");list.add("error javax.servlet.jsp.JspException: Cannot find message resources under key org.apache.struts.action.MESSAGE");list.add("error Stacktrace: org.apache.jasper.servlet.JspServletWrapper.handleJspException(JspServletWrapper.java:467)");list.add("error javax.servlet.ServletException: Cannot find bean org.apache.struts.taglib.html.BEAN in any scope");list.add("error no data found");list.add("error exception in thread main org.hibernate.MappingException: Unknown entity:.");list.add("error using namespace std;");list.add("error C2065: 'cout' : undeclared identifier");list.add("error main already defined in aaa.obj");list.add("error syntax error : missing ';' before '}'");list.add("error cout : undeclared identifier");list.add("error weblogic.servlet.internal.WebAppServletContext$ServletInvocationAction.run(WebAp ");list.add("error Caused by: java.lang.reflect.InvocationTargetException");list.add("error Caused by: java.lang.NoClassDefFoundError: com/starit/gejie/dao/SysNameDao");list.add("error at com.starit.gejie.Util.Trans.BL_getSysNamesByType(Trans.java:220)");return list;} }

MyDailyRollingFileAppender的代碼如下:

package cn.com.toto.stromlogpro.log4j;/*** Created by toto on 2017/6/25.*/import org.apache.log4j.DailyRollingFileAppender; import org.apache.log4j.Priority;/*** @author tuzq* @create 2017-06-25 13:58*/ public class MyDailyRollingFileAppender extends DailyRollingFileAppender {@Overridepublic boolean isAsSevereAsThreshold(Priority priority) {return getThreshold().equals(priority);} }

MyRollingFileAppender的代碼如下:

package cn.com.toto.stromlogpro.log4j;/*** Created by toto on 2017/6/25.*/import org.apache.log4j.Priority; import org.apache.log4j.RollingFileAppender;/*** @author tuzq* @create 2017-06-25 14:01*/ public class MyRollingFileAppender extends RollingFileAppender {@Overridepublic boolean isAsSevereAsThreshold(Priority priority) {return getThreshold().equals(priority);} }

3.在Flume中的conf配置文件,并將收集的日志下沉到kafka中

a1.sources = r1 a1.channels = c1 a1.sinks = k1a1.sources.r1.type = exec a1.sources.r1.command = tail -F /export/data/flume_sources/click_log/info.log a1.sources.r1.channels = c1 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = cn.com.toto.stromlogpro.flume.AppInterceptor$AppInterceptorBuilder #通過這個參數向自定義的Flume攔截器中傳遞參數(即系統編號) a1.sources.r1.interceptors.i1.appId = 1a1.channels.c1.type=memory a1.channels.c1.capacity=10000 a1.channels.c1.transactionCapacity=100a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = log_monitor a1.sinks.k1.brokerList = hadoop1:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 20 a1.sinks.k1.channel = c1

總結

以上是生活随笔為你收集整理的自定义Flume拦截器,并将收集的日志存储到Kafka中(案例)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 蜜桃久久av | 亚洲青草视频 | 永久黄网站色视频免费观看w | 免费国产羞羞网站视频 | 最近更新中文字幕 | 亚欧洲精品在线视频免费观看 | 迈开腿让我尝尝你的小草莓 | 色综合狠狠爱 | 69天堂网| 调教一区| 五十路黄色片 | 麻豆69| 麻豆黄色网址 | av无码一区二区三区 | 国产精品极品白嫩 | 高清av免费 | 国语对白一区二区 | 大学生高潮无套内谢视频 | 亚洲AV成人无码精电影在线 | 人妻熟女aⅴ一区二区三区汇编 | 成人深夜在线观看 | 五个女闺蜜把我玩到尿失禁 | 草莓巧克力香氛动漫的观看方法 | 在线免费视频一区二区 | 国产山村乱淫老妇女视频 | 9色视频 | 久久伊人操 | 欧美一级黄色片视频 | 包射屋| 日韩一区二区三区视频在线 | 日韩精品系列 | 久久婷婷五月综合 | 顶级黑人搡bbw搡bbbb搡 | 免费在线a | xxxwww国产 | 在线精品小视频 | 欧美精品成人 | 成人在线观看小视频 | 伊人av在线 | 毛片网络| 亚洲AV无码久久精品色三人行 | 成人99 | 亚洲福利视频一区二区三区 | 97理伦| 宇都宫紫苑在线播放 | 国产亚洲自拍av | 亚洲精品系列 | 日韩专区中文字幕 | 黄色字幕网 | 国产自产一区二区 | 最近免费中文字幕中文高清百度 | 亚洲四虎av | 伊人中文网 | 国产毛片aaa | 91美女在线观看 | 少妇喷白浆 | 在线看片黄 | 国产噜噜噜噜噜久久久久久久久 | 久久99热人妻偷产国产 | 国产免费高清 | 热久久最新 | 黄色喷水网站 | 国产一区二区精品在线 | av免费入口 | 午夜私人影院 | 亚洲第一香蕉网 | 黄色成人免费观看 | 一级黄色大毛片 | 国产13页| 美女被草视频在线观看 | 国产精品一级二级三级 | 小视频国产 | 日韩av片在线 | 牛牛视频在线观看 | 日日爱99 | 国产麻豆精品久久一二三 | 成人激情综合网 | 欧美图片第一页 | www.黄色网| 高清视频免费在线观看 | 日韩精品高清在线 | 欧美视频导航 | 美脚の诱脚舐め脚视频播放 | 欧美亚洲在线 | 影音先锋国产精品 | 国产学生美女无遮拦高潮视频 | 国产成人久久精品麻豆二区 | 一本色道久久88综合日韩精品 | 成人免费看片视频 | 久久免费看少妇高潮v片特黄 | av播播 | 婷婷在线综合 | 午夜久久久久久噜噜噜噜 | 91大神网址 | 天天操操夜夜操操 | 丁香七月激情 | 亚洲一区二区三区在线免费观看 | 久草黄色| 男女做那个的全过程 |