《从0到1学习Flink》—— Flink 写入数据到 ElasticSearch
前言
前面 FLink 的文章中我們已經介紹了說 Flink 已經有很多自帶的 Connector。
1、《從0到1學習Flink》—— Data Source 介紹
2、《從0到1學習Flink》—— Data Sink 介紹
其中包括了 Source 和 Sink 的,后面我也講了下如何自定義自己的 Source 和 Sink。
那么今天要做的事情是啥呢?就是介紹一下 Flink 自帶的 ElasticSearch Connector,我們今天就用他來做 Sink,將 Kafka 中的數據經過 Flink 處理后然后存儲到 ElasticSearch。
準備
安裝 ElasticSearch,這里就忽略,自己找我以前的文章,建議安裝 ElasticSearch 6.0 版本以上的,畢竟要跟上時代的節奏。
下面就講解一下生產環境中如何使用 Elasticsearch Sink 以及一些注意點,及其內部實現機制。
Elasticsearch Sink
添加依賴
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId><version>${flink.version}</version> </dependency>上面這依賴版本號請自己根據使用的版本對應改變下。
下面所有的代碼都沒有把 import 引入到這里來,如果需要查看更詳細的代碼,請查看我的 GitHub 倉庫地址:
https://github.com/zhisheng17/flink-learning/tree/master/flink-learning-connectors/flink-learning-connectors-es6
這個 module 含有本文的所有代碼實現,當然越寫到后面自己可能會做一些抽象,所以如果有代碼改變很正常,請直接查看全部項目代碼。
ElasticSearchSinkUtil 工具類
這個工具類是自己封裝的,getEsAddresses 方法將傳入的配置文件 es 地址解析出來,可以是域名方式,也可以是 ip + port 形式。addSink 方法是利用了 Flink 自帶的 ElasticsearchSink 來封裝了一層,傳入了一些必要的調優參數和 es 配置參數,下面文章還會再講些其他的配置。
ElasticSearchSinkUtil.java
public class ElasticSearchSinkUtil {/*** es sink** @param hosts es hosts* @param bulkFlushMaxActions bulk flush size* @param parallelism 并行數* @param data 數據* @param func* @param <T>*/public static <T> void addSink(List<HttpHost> hosts, int bulkFlushMaxActions, int parallelism,SingleOutputStreamOperator<T> data, ElasticsearchSinkFunction<T> func) {ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(hosts, func);esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions);data.addSink(esSinkBuilder.build()).setParallelism(parallelism);}/*** 解析配置文件的 es hosts** @param hosts* @return* @throws MalformedURLException*/public static List<HttpHost> getEsAddresses(String hosts) throws MalformedURLException {String[] hostList = hosts.split(",");List<HttpHost> addresses = new ArrayList<>();for (String host : hostList) {if (host.startsWith("http")) {URL url = new URL(host);addresses.add(new HttpHost(url.getHost(), url.getPort()));} else {String[] parts = host.split(":", 2);if (parts.length > 1) {addresses.add(new HttpHost(parts[0], Integer.parseInt(parts[1])));} else {throw new MalformedURLException("invalid elasticsearch hosts format");}}}return addresses;} }Main 啟動類
Main.java
public class Main {public static void main(String[] args) throws Exception {//獲取所有參數final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);//準備好環境StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);//從kafka讀取數據DataStreamSource<Metrics> data = KafkaConfigUtil.buildSource(env);//從配置文件中讀取 es 的地址List<HttpHost> esAddresses = ElasticSearchSinkUtil.getEsAddresses(parameterTool.get(ELASTICSEARCH_HOSTS));//從配置文件中讀取 bulk flush size,代表一次批處理的數量,這個可是性能調優參數,特別提醒int bulkSize = parameterTool.getInt(ELASTICSEARCH_BULK_FLUSH_MAX_ACTIONS, 40);//從配置文件中讀取并行 sink 數,這個也是性能調優參數,特別提醒,這樣才能夠更快的消費,防止 kafka 數據堆積int sinkParallelism = parameterTool.getInt(STREAM_SINK_PARALLELISM, 5);//自己再自帶的 es sink 上一層封裝了下ElasticSearchSinkUtil.addSink(esAddresses, bulkSize, sinkParallelism, data,(Metrics metric, RuntimeContext runtimeContext, RequestIndexer requestIndexer) -> {requestIndexer.add(Requests.indexRequest().index(ZHISHENG + "_" + metric.getName()) //es 索引名.type(ZHISHENG) //es type.source(GsonUtil.toJSONBytes(metric), XContentType.JSON)); });env.execute("flink learning connectors es6");} }配置文件
配置都支持集群模式填寫,注意用 , 分隔!
kafka.brokers=localhost:9092 kafka.group.id=zhisheng-metrics-group-test kafka.zookeeper.connect=localhost:2181 metrics.topic=zhisheng-metrics stream.parallelism=5 stream.checkpoint.interval=1000 stream.checkpoint.enable=false elasticsearch.hosts=localhost:9200 elasticsearch.bulk.flush.max.actions=40 stream.sink.parallelism=5運行結果
執行 Main 類的 main 方法,我們的程序是只打印 flink 的日志,沒有打印存入的日志(因為我們這里沒有打日志):
所以看起來不知道我們的 sink 是否有用,數據是否從 kafka 讀取出來后存入到 es 了。
你可以查看下本地起的 es 終端或者服務器的 es 日志就可以看到效果了。
es 日志如下:
上圖是我本地 Mac 電腦終端的 es 日志,可以看到我們的索引了。
如果還不放心,你也可以在你的電腦裝個 kibana,然后更加的直觀查看下 es 的索引情況(或者直接敲 es 的命令)
我們用 kibana 查看存入 es 的索引如下:
程序執行了一會,存入 es 的數據量就很大了。
擴展配置
上面代碼已經可以實現你的大部分場景了,但是如果你的業務場景需要保證數據的完整性(不能出現丟數據的情況),那么就需要添加一些重試策略,因為在我們的生產環境中,很有可能會因為某些組件不穩定性導致各種問題,所以這里我們就要在數據存入失敗的時候做重試操作,這里 flink 自帶的 es sink 就支持了,常用的失敗重試配置有:
1、bulk.flush.backoff.enable 用來表示是否開啟重試機制2、bulk.flush.backoff.type 重試策略,有兩種:EXPONENTIAL 指數型(表示多次重試之間的時間間隔按照指數方式進行增長)、CONSTANT 常數型(表示多次重試之間的時間間隔為固定常數)3、bulk.flush.backoff.delay 進行重試的時間間隔4、bulk.flush.backoff.retries 失敗重試的次數5、bulk.flush.max.actions: 批量寫入時的最大寫入條數6、bulk.flush.max.size.mb: 批量寫入時的最大數據量7、bulk.flush.interval.ms: 批量寫入的時間間隔,配置后則會按照該時間間隔嚴格執行,無視上面的兩個批量寫入配置看下啦,就是如下這些配置了,如果你需要的話,可以在這個地方配置擴充了。
FailureHandler 失敗處理器
寫入 ES 的時候會有這些情況會導致寫入 ES 失敗:
1、ES 集群隊列滿了,報如下錯誤
12:08:07.326 [I/O dispatcher 13] ERROR o.a.f.s.c.e.ElasticsearchSinkBase - Failed Elasticsearch item request: ElasticsearchException[Elasticsearch exception [type=es_rejected_execution_exception, reason=rejected execution of org.elasticsearch.transport.TransportService$7@566c9379 on EsThreadPoolExecutor[name = node-1/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@f00b373[Running, pool size = 4, active threads = 4, queued tasks = 200, completed tasks = 6277]]]]是這樣的,我電腦安裝的 es 隊列容量默認應該是 200,我沒有修改過。我這里如果配置的 bulk flush size * 并發 sink 數量 這個值如果大于這個 queue capacity ,那么就很容易導致出現這種因為 es 隊列滿了而寫入失敗。
當然這里你也可以通過調大點 es 的隊列。參考:https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-threadpool.html
2、ES 集群某個節點掛了
這個就不用說了,肯定寫入失敗的。跟過源碼可以發現 RestClient 類里的 performRequestAsync 方法一開始會隨機的從集群中的某個節點進行寫入數據,如果這臺機器掉線,會進行重試在其他的機器上寫入,那么當時寫入的這臺機器的請求就需要進行失敗重試,否則就會把數據丟失!
3、ES 集群某個節點的磁盤滿了
這里說的磁盤滿了,并不是磁盤真的就沒有一點剩余空間的,是 es 會在寫入的時候檢查磁盤的使用情況,在 85% 的時候會打印日志警告。
這里我看了下源碼如下圖:
如果你想繼續讓 es 寫入的話就需要去重新配一下 es 讓它繼續寫入,或者你也可以清空些不必要的數據騰出磁盤空間來。
解決方法
DataStream<String> input = ...;input.addSink(new ElasticsearchSink<>(config, transportAddresses,new ElasticsearchSinkFunction<String>() {...},new ActionRequestFailureHandler() {@Overridevoid onFailure(ActionRequest action,Throwable failure,int restStatusCode,RequestIndexer indexer) throw Throwable {if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {// full queue; re-add document for indexingindexer.add(action);} else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {// malformed document; simply drop request without failing sink} else {// for all other failures, fail the sink// here the failure is simply rethrown, but users can also choose to throw custom exceptionsthrow failure;}} }));如果僅僅只是想做失敗重試,也可以直接使用官方提供的默認的 RetryRejectedExecutionFailureHandler ,該處理器會對 EsRejectedExecutionException 導致到失敗寫入做重試處理。如果你沒有設置失敗處理器(failure handler),那么就會使用默認的 NoOpFailureHandler 來簡單處理所有的異常。
總結
本文寫了 Flink connector es,將 Kafka 中的數據讀取并存儲到 ElasticSearch 中,文中講了如何封裝自帶的 sink,然后一些擴展配置以及 FailureHandler 情況下要怎么處理。(這個問題可是線上很容易遇到的)
關注我
轉載請務必注明原創地址為:http://www.54tianzhisheng.cn/2018/12/30/Flink-ElasticSearch-Sink/
微信公眾號:zhisheng
另外我自己整理了些 Flink 的學習資料,目前已經全部放到微信公眾號了。你可以加我的微信:zhisheng_tian,然后回復關鍵字:Flink 即可無條件獲取到。
Github 代碼倉庫
https://github.com/zhisheng17/flink-learning/
以后這個項目的所有代碼都將放在這個倉庫里,包含了自己學習 flink 的一些 demo 和博客
相關文章
1、《從0到1學習Flink》—— Apache Flink 介紹
2、《從0到1學習Flink》—— Mac 上搭建 Flink 1.6.0 環境并構建運行簡單程序入門
3、《從0到1學習Flink》—— Flink 配置文件詳解
4、《從0到1學習Flink》—— Data Source 介紹
5、《從0到1學習Flink》—— 如何自定義 Data Source ?
6、《從0到1學習Flink》—— Data Sink 介紹
7、《從0到1學習Flink》—— 如何自定義 Data Sink ?
8、《從0到1學習Flink》—— Flink Data transformation(轉換)
9、《從0到1學習Flink》—— 介紹Flink中的Stream Windows
10、《從0到1學習Flink》—— Flink 中的幾種 Time 詳解
11、《從0到1學習Flink》—— Flink 寫入數據到 ElasticSearch
12、《從0到1學習Flink》—— Flink 項目如何運行?
13、《從0到1學習Flink》—— Flink 寫入數據到 Kafka
轉載于:https://www.cnblogs.com/zhisheng/p/10326692.html
總結
以上是生活随笔為你收集整理的《从0到1学习Flink》—— Flink 写入数据到 ElasticSearch的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【BZOJ 1001】[BJOI2006
- 下一篇: Mac 上 Sublime Text3-