日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

1.16.Flink Window和Time详解、TimeWindow的应用、Window聚合分类之全量聚合、全量聚合状态变化过程-求最大值、Time介绍、EventTime和Watermarks等

發布時間:2024/9/27 编程问答 41 豆豆

1.16.Flink Window和Time詳解
1.16.1.Window(窗口)
1.16.2.Window的類型
1.16.3.Window類型匯總
1.16.4.TimeWindow的應用
1.16.5.CountWindow的應用
1.16.6.Window聚合分類
1.16.7.Window聚合分類之增量聚合
1.16.7.1.增量聚合狀態變化過程-累加求和
1.16.7.2.reduce(reduceFunction)
1.16.7.3.aggregate(aggregateFunction)
1.16.8.Window聚合分類之全量聚合
1.16.8.1.全量聚合狀態變化過程-求最大值
1.16.8.2.apply(windowFunction)
1.16.8.3.process(processWindowFunction)
1.16.9.Time介紹
1.16.9.1.設置Time類型
1.16.9.2.EventTime和Watermarks
1.16.9.3.有序的流的watermarks
1.16.9.4.無序的流的watermarks
1.16.9.5.多并行度流的watermarks
1.16.9.6.watermarks的生成方式
1.16.9.7.Flink應該如何設置最大亂序時間?
1.16.9.8.Flink應該如何設置最大亂序時間?

1.16.Flink Window和Time詳解

1.16.1.Window(窗口)

?聚合事件(比如計數、求和)在流上的工作方式與批處理不同。

  • ?比如,對流中的所有元素進行計數是不可能的,因為通常流是無限的(無界的)。所以,流上的聚合需要由 window 來劃定范圍,比如 “計算過去的5分鐘” ,或者 “最后100個元素的和”。
  • ?window是一種可以把無限數據切割為有限數據塊的手段。
    ?窗口可以是 時間驅動的 【Time Window】(比如:每30秒)或者 數據驅動的【Count Window】 (比如:每100個元素)。

1.16.2.Window的類型

?窗口通常被區分為不同的類型:
一:tumbling windows:滾動窗口 【沒有重疊】

二:sliding windows:滑動窗口 【有重疊】

三:session windows:會話窗口

1.16.3.Window類型匯總

TimeWindow和CountWindow都可以有tumbling windows和sliding wndows

1.16.4.TimeWindow的應用

1.16.5.CountWindow的應用

1.16.6.Window聚合分類

?增量聚合
?全量聚合

import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;/*** window** Created by xxxx on 2020/10/09 .*/ public class SocketDemoFullCount {public static void main(String[] args) throws Exception{//獲取需要的端口號int port;try {ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch (Exception e){System.err.println("No port set. use default port 9000--java");port = 9000;}//獲取flink的運行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String hostname = "hadoop100";String delimiter = "\n";//連接socket獲取輸入的數據DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);DataStream<Tuple2<Integer,Integer>> intData = text.map(new MapFunction<String, Tuple2<Integer,Integer>>() {@Overridepublic Tuple2<Integer,Integer> map(String value) throws Exception {return new Tuple2<>(1,Integer.parseInt(value));}});intData.keyBy(0).timeWindow(Time.seconds(5)).process(new ProcessWindowFunction<Tuple2<Integer,Integer>, String, Tuple, TimeWindow>() {@Overridepublic void process(Tuple key, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<String> out)throws Exception {System.out.println("執行process。。。");long count = 0;for(Tuple2<Integer,Integer> element: elements){count++;}out.collect("window:"+context.window()+",count:"+count);}}).print();//這一行代碼一定要實現,否則程序不執行env.execute("Socket window count");}} import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time;/*** window** Created by xxxx on 2020/10/09 .*/ public class SocketDemoIncrAgg {public static void main(String[] args) throws Exception{//獲取需要的端口號int port;try {ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch (Exception e){System.err.println("No port set. use default port 9000--java");port = 9000;}//獲取flink的運行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String hostname = "hadoop100";String delimiter = "\n";//連接socket獲取輸入的數據DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);DataStream<Tuple2<Integer,Integer>> intData = text.map(new MapFunction<String, Tuple2<Integer,Integer>>() {@Overridepublic Tuple2<Integer,Integer> map(String value) throws Exception {return new Tuple2<>(1,Integer.parseInt(value));}});intData.keyBy(0).timeWindow(Time.seconds(5)).reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {@Overridepublic Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {System.out.println("執行reduce操作:"+value1+","+value2);return new Tuple2<>(value1.f0,value1.f1+value2.f1);}}).print();//這一行代碼一定要實現,否則程序不執行env.execute("Socket window count");}} import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector;/*** 滑動窗口計算** 通過socket模擬產生單詞數據* flink對數據進行統計計算** 需要實現每隔1秒對最近2秒內的數據進行匯總計算*** Created by xxxx on 2020/10/09 .*/ public class SocketWindowWordCountJava {public static void main(String[] args) throws Exception{//獲取需要的端口號int port;try {ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch (Exception e){System.err.println("No port set. use default port 9000--java");port = 9000;}//獲取flink的運行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String hostname = "hadoop100";String delimiter = "\n";//連接socket獲取輸入的數據DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);// a a c// a 1// a 1// c 1DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {public void flatMap(String value, Collector<WordWithCount> out) throws Exception {String[] splits = value.split("\\s");for (String word : splits) {out.collect(new WordWithCount(word, 1L));}}}).keyBy("word").timeWindow(Time.seconds(2), Time.seconds(1))//指定時間窗口大小為2秒,指定時間間隔為1秒.sum("count");//在這里使用sum或者reduce都可以/*.reduce(new ReduceFunction<WordWithCount>() {public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {return new WordWithCount(a.word,a.count+b.count);}})*///把數據打印到控制臺并且設置并行度windowCounts.print().setParallelism(1);//這一行代碼一定要實現,否則程序不執行env.execute("Socket window count");}public static class WordWithCount{public String word;public long count;public WordWithCount(){}public WordWithCount(String word,long count){this.word = word;this.count = count;}@Overridepublic String toString() {return "WordWithCount{" +"word='" + word + '\'' +", count=" + count +'}';}}} import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector;/*** checkpoint** Created by xxxx on 2020/10/09 .*/ public class SocketWindowWordCountJavaCheckPoint {public static void main(String[] args) throws Exception{//獲取需要的端口號int port;try {ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch (Exception e){System.err.println("No port set. use default port 9000--java");port = 9000;}//獲取flink的運行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每隔1000 ms進行啟動一個檢查點【設置checkpoint的周期】env.enableCheckpointing(1000);// 高級選項:// 設置模式為exactly-once (這是默認值)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// 檢查點必須在一分鐘內完成,或者被丟棄【checkpoint的超時時間】env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一時間只允許進行一個檢查點env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint【詳細解釋見備注】//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink處理程序被cancel后,會刪除Checkpoint數據,只有job執行失敗的時候才會保存checkpointenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//設置statebackend//env.setStateBackend(new MemoryStateBackend());//env.setStateBackend(new FsStateBackend("hdfs://hadoop100:9000/flink/checkpoints"));//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));String hostname = "hadoop100";String delimiter = "\n";//連接socket獲取輸入的數據DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);// a a c// a 1// a 1// c 1DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {public void flatMap(String value, Collector<WordWithCount> out) throws Exception {String[] splits = value.split("\\s");for (String word : splits) {out.collect(new WordWithCount(word, 1L));}}}).keyBy("word").timeWindow(Time.seconds(2), Time.seconds(1))//指定時間窗口大小為2秒,指定時間間隔為1秒.sum("count");//在這里使用sum或者reduce都可以/*.reduce(new ReduceFunction<WordWithCount>() {public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {return new WordWithCount(a.word,a.count+b.count);}})*///把數據打印到控制臺并且設置并行度windowCounts.print().setParallelism(1);//這一行代碼一定要實現,否則程序不執行env.execute("Socket window count");}public static class WordWithCount{public String word;public long count;public WordWithCount(){}public WordWithCount(String word,long count){this.word = word;this.count = count;}@Overridepublic String toString() {return "WordWithCount{" +"word='" + word + '\'' +", count=" + count +'}';}}} import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;/*** 把collection集合作為數據源** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class StreamingFromCollection {public static void main(String[] args) throws Exception {//獲取Flink的運行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();ArrayList<Integer> data = new ArrayList<>();data.add(10);data.add(15);data.add(20);//指定數據源DataStreamSource<Integer> collectionData = env.fromCollection(data);//通map對數據進行處理DataStream<Integer> num = collectionData.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer value) throws Exception {return value + 1;}});//直接打印num.print().setParallelism(1);env.execute("StreamingFromCollection");} }

另外的Scala案例:

import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time/*** 滑動窗口計算** 每隔1秒統計最近2秒內的數據,打印到控制臺** Created by xxxx on 2020/10/09 .*/ object SocketWindowWordCountScala {def main(args: Array[String]): Unit = {//獲取socket端口號val port: Int = try {ParameterTool.fromArgs(args).getInt("port")}catch {case e: Exception => {System.err.println("No port set. use default port 9000--scala")}9000}//獲取運行環境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//鏈接socket獲取輸入數據val text = env.socketTextStream("hadoop100",port,'\n')//解析數據(把數據打平),分組,窗口計算,并且聚合求sum//注意:必須要添加這一行隱式轉行,否則下面的flatmap方法執行會報錯import org.apache.flink.api.scala._val windowCounts = text.flatMap(line => line.split("\\s"))//打平,把每一行單詞都切開.map(w => WordWithCount(w,1))//把單詞轉成word , 1這種形式.keyBy("word")//分組.timeWindow(Time.seconds(2),Time.seconds(1))//指定窗口大小,指定間隔時間.sum("count");// sum或者reduce都可以//.reduce((a,b)=>WordWithCount(a.word,a.count+b.count))//打印到控制臺windowCounts.print().setParallelism(1);//執行任務env.execute("Socket window count");}case class WordWithCount(word: String,count: Long)} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ object StreamingFromCollectionScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉換import org.apache.flink.api.scala._val data = List(10,15,20)val text = env.fromCollection(data)//針對map接收到的數據執行加1的操作val num = text.map(_+1)num.print().setParallelism(1)env.execute("StreamingFromCollectionScala")}}

1.16.7.Window聚合分類之增量聚合

窗口中每進入一條數據,就進行一次計算

reduce(reduceFunction) aggregate(aggregateFunction) sum(),min(),max()

1.16.7.1.增量聚合狀態變化過程-累加求和

1.16.7.2.reduce(reduceFunction)

1.16.7.3.aggregate(aggregateFunction)

1.16.8.Window聚合分類之全量聚合

?全量聚合

  • ?等屬于窗口的數據到齊,才開始進行聚合計算【可以實現對窗口內的數據進行排序等需求】
  • ?apply(windowFunction)
  • ?process(processWindowFunction)
    ?processWindowFunction比windowFunction提供了更多的上下文信息。

1.16.8.1.全量聚合狀態變化過程-求最大值

1.16.8.2.apply(windowFunction)

1.16.8.3.process(processWindowFunction)


1.16.9.Time介紹

?針對stream數據中的時間,可以分為以下三種

  • ?Event Time:事件產生的時間,它通常由事件中的時間戳描述。
  • ?Ingestion time:事件進入Flink的時間
  • ?Processing Time:事件被處理時當前系統的時間。

    ?處理時間(processing time):處理時間是指執行相應操作的機器的系統時間。
    當流處理程序基于處理時間運行時,所有基于時間的操作(如時間窗口)將使用運行相應運算符的機器的系統時鐘。每小時處理時間窗口將包括在系統時鐘指示整個小時之間到達特定運算符的所有記錄。 例如,如果應用程序在上午9:15開始運行,則第一個每小時處理時間窗口將包括在上午9:15到10:00之間處理的事件,下一個窗口將包括在上午10:00到11:00之間處理的事件,以此類推。

處理時間是最簡單的時間概念,不需要流和機器之間的協調。 它提供最佳性能和最低延遲。 但是,在分布式和異步環境中,處理時間不提供確定性,因為它容易受到記錄到達系統的速度(例如從消息隊列),記錄在系統內的運算符之間流動的速度的影響,以及停電(計劃或其他)。
?事件時間(event time):事件時間是每個事件在其生產設備上發生的時間。此時間通常在進入Flink之前嵌入記錄中,并且可以從每個記錄中提取該事件時間戳。 在事件時間,時間的進展取決于數據,而不是任何時鐘。 事件時間程序必須指定如何生成事件時間水印,這是表示事件時間進度的機制。 該水印機制在下面的后面部分中描述。

在一個完美的世界中,事件時間處理將產生完全一致和確定的結果,無論事件何時到達或其它們的順序。 但是,除非事件已知按順序到達(按時間戳),否則事件時間處理會在等待無序事件時產生一些延遲。 由于只能等待一段有限的時間,因此限制了確定性事件時間應用程序的運行方式。

假設所有數據都已到達,事件時間操作將按預期運行,即使在處理無序或延遲事件或重新處理歷史數據時也會產生正確且一致的結果。 例如,每小時事件時間窗口將包含帶有落入該小時的事件時間戳的所有記錄,無論它們到達的順序如何,或者何時處理它們。 (有關更多信息,請參閱有關遲到事件的部分。)

請注意,有時基于事件時間的程序處理實時數據時,它們將使用一些處理時間(processing time)操作,以保證它們及時進行。
?進入時間(Ingestion time): 進入時間是事件進入Flink的時間。 在源運算符處,每個記錄將源的當前時間作為時間戳,并且基于時間的操作(如時間窗口)引用該時間戳。
進入時間在概念上位于事件時間和處理時間之間。與處理時間相比,它代價稍高,但可以提供更可預測的結果。 因為進入時間使用穩定的時間戳(在源處分配一次),所以對記錄的不同窗口操作將引用相同的時間戳,而在處理時間中,每個窗口操作符可以將記錄分配給不同的窗口(基于本地系統時鐘和 任何傳輸延誤)。

與事件時間相比,進入時間程序無法處理任何無序事件或延遲數據,但程序不必指定如何生成水印。

在內部,攝取時間與事件時間非常相似,但具有自動分配時間戳和自動生成水印功能。

1.16.9.1.設置Time類型

?Flink中,默認Time類似是ProcessingTime
?可以在代碼中設置

1.16.9.2.EventTime和Watermarks

?在使用eventTime的時候如何處理亂序數據?
?我們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的。雖然大部分情況下,流到operator的數據都是按照事件產生的時間順序來的,但是也不排除由于網絡延遲等原因,導致亂序的產生,特別是使用kafka的話,多個分區的數據無法保證有序。所以在進行window計算的時候,我們又不能無限期的等下去,必須要有個機制來保證一個特定的時間后,必須觸發window去進行計算了。這個特別的機制,就是watermark,watermark是用于處理亂序事件的。
?watermark可以翻譯為水位線

1.16.9.3.有序的流的watermarks

1.16.9.4.無序的流的watermarks

1.16.9.5.多并行度流的watermarks

注意:多并行度的情況下,watermark對齊會取所有channel最小的watermark

1.16.9.6.watermarks的生成方式

?通常,在接收到source的數據后,應該立刻生成watermark;但是,也可以在source后,應用簡單的map或者filter操作后,再生成watermark。
?注意:如果指定多次watermark,后面指定的會覆蓋前面的值。
?生成方式

  • ?With Periodic Watermarks
    1、周期性的觸發watermark的生成和發送,默認是100ms
    2、每隔N秒自動向流里注入一個WATERMARK 時間間隔由ExecutionConfig.setAutoWatermarkInterval 決定. 每次調用getCurrentWatermark 方法, 如果得到的WATERMARK 不為空并且比之前的大就注入流中。
    3、可以定義一個最大允許亂序的時間,這種比較常用
    4、實現AssignerWithPeriodicWatermarks接口

  • ?With Punctuated Watermarks
    1、基于某些事件觸發watermark的生成和發送
    2、基于事件向流里注入一個WATERMARK,每一個元素都有機會判斷是否生成一個WATERMARK. 如果得到的WATERMARK 不為空并且比之前的大就注入流中。
    3、實現AssignerWithPunctuatedWatermarks接口

import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;import javax.annotation.Nullable; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List;/**** Watermark 案例** Created by xxxx on 2020/10/09.*/ public class StreamingWindowWatermark {public static void main(String[] args) throws Exception {//定義socket的端口號int port = 9000;//獲取運行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//設置使用eventtime,默認是使用processtimeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//設置并行度為1,默認并行度是當前機器的cpu數量env.setParallelism(1);//連接socket獲取輸入的數據DataStream<String> text = env.socketTextStream("hadoop100", port, "\n");//解析輸入的數據DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {String[] arr = value.split(",");return new Tuple2<>(arr[0], Long.parseLong(arr[1]));}});//抽取timestamp和生成watermarkDataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {Long currentMaxTimestamp = 0L;final Long maxOutOfOrderness = 10000L;// 最大允許的亂序時間是10sSimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");/*** 定義生成watermark的邏輯* 默認100ms被調用一次*/@Nullable@Overridepublic Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);}//定義如何提取timestamp@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {long timestamp = element.f1;currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);long id = Thread.currentThread().getId();System.out.println("currentThreadId:"+id+",key:"+element.f0+",eventtime:["+element.f1+"|"+sdf.format(element.f1)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]");return timestamp;}});DataStream<String> window = waterMarkStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口,和調用TimeWindow效果一樣.apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {/*** 對window內的數據進行排序,保證數據的順序* @param tuple* @param window* @param input* @param out* @throws Exception*/@Overridepublic void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {String key = tuple.toString();List<Long> arrarList = new ArrayList<Long>();Iterator<Tuple2<String, Long>> it = input.iterator();while (it.hasNext()) {Tuple2<String, Long> next = it.next();arrarList.add(next.f1);}Collections.sort(arrarList);SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");String result = key + "," + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1))+ "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd());out.collect(result);}});//測試-把結果打印到控制臺即可window.print();//注意:因為flink是懶加載的,所以必須調用execute方法,上面的代碼才會執行env.execute("eventtime-watermark");}} import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag;import javax.annotation.Nullable; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List;/**** Watermark 案例** sideOutputLateData 收集遲到的數據** Created by xxxx on 2020/10/09.*/ public class StreamingWindowWatermark2 {public static void main(String[] args) throws Exception {//定義socket的端口號int port = 9000;//獲取運行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//設置使用eventtime,默認是使用processtimeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//設置并行度為1,默認并行度是當前機器的cpu數量env.setParallelism(1);//連接socket獲取輸入的數據DataStream<String> text = env.socketTextStream("hadoop100", port, "\n");//解析輸入的數據DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {String[] arr = value.split(",");return new Tuple2<>(arr[0], Long.parseLong(arr[1]));}});//抽取timestamp和生成watermarkDataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {Long currentMaxTimestamp = 0L;final Long maxOutOfOrderness = 10000L;// 最大允許的亂序時間是10sSimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");/*** 定義生成watermark的邏輯* 默認100ms被調用一次*/@Nullable@Overridepublic Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);}//定義如何提取timestamp@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {long timestamp = element.f1;currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);System.out.println("key:"+element.f0+",eventtime:["+element.f1+"|"+sdf.format(element.f1)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]");return timestamp;}});//保存被丟棄的數據OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-data"){};//注意,由于getSideOutput方法是SingleOutputStreamOperator子類中的特有方法,所以這里的類型,不能使用它的父類dataStream。SingleOutputStreamOperator<String> window = waterMarkStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口,和調用TimeWindow效果一樣//.allowedLateness(Time.seconds(2))//允許數據遲到2秒.sideOutputLateData(outputTag).apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {/*** 對window內的數據進行排序,保證數據的順序* @param tuple* @param window* @param input* @param out* @throws Exception*/@Overridepublic void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {String key = tuple.toString();List<Long> arrarList = new ArrayList<Long>();Iterator<Tuple2<String, Long>> it = input.iterator();while (it.hasNext()) {Tuple2<String, Long> next = it.next();arrarList.add(next.f1);}Collections.sort(arrarList);SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");String result = key + "," + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1))+ "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd());out.collect(result);}});//把遲到的數據暫時打印到控制臺,實際中可以保存到其他存儲介質中DataStream<Tuple2<String, Long>> sideOutput = window.getSideOutput(outputTag);sideOutput.print();//測試-把結果打印到控制臺即可window.print();//注意:因為flink是懶加載的,所以必須調用execute方法,上面的代碼才會執行env.execute("eventtime-watermark");}}

scala案例:

import java.text.SimpleDateFormatimport org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collectorimport scala.collection.mutable.ArrayBuffer import scala.util.Sorting/*** Watermark 案例* Created by xxxx on 2020/10/09*/ object StreamingWindowWatermarkScala {def main(args: Array[String]): Unit = {val port = 9000val env = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)val text = env.socketTextStream("hadoop100",port,'\n')val inputMap = text.map(line=>{val arr = line.split(",")(arr(0),arr(1).toLong)})val waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {var currentMaxTimestamp = 0Lvar maxOutOfOrderness = 10000L// 最大允許的亂序時間是10sval sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");override def getCurrentWatermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness)override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long) = {val timestamp = element._2currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)val id = Thread.currentThread().getIdprintln("currentThreadId:"+id+",key:"+element._1+",eventtime:["+element._2+"|"+sdf.format(element._2)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+ sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp+"|"+sdf.format(getCurrentWatermark().getTimestamp)+"]")timestamp}})val window = waterMarkStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(3))) //按照消息的EventTime分配窗口,和調用TimeWindow效果一樣.apply(new WindowFunction[Tuple2[String, Long], String, Tuple, TimeWindow] {override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]) = {val keyStr = key.toStringval arrBuf = ArrayBuffer[Long]()val ite = input.iteratorwhile (ite.hasNext){val tup2 = ite.next()arrBuf.append(tup2._2)}val arr = arrBuf.toArraySorting.quickSort(arr)val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");val result = keyStr + "," + arr.length + "," + sdf.format(arr.head) + "," + sdf.format(arr.last)+ "," + sdf.format(window.getStart) + "," + sdf.format(window.getEnd)out.collect(result)}})window.print()env.execute("StreamingWindowWatermarkScala")} } import java.text.SimpleDateFormatimport org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collectorimport scala.collection.mutable.ArrayBuffer import scala.util.Sorting/*** Watermark 案例** sideOutputLateData 收集遲到的數據** Created by xxxx on 2020/10/09*/ object StreamingWindowWatermarkScala2 {def main(args: Array[String]): Unit = {val port = 9000val env = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)val text = env.socketTextStream("hadoop100",port,'\n')val inputMap = text.map(line=>{val arr = line.split(",")(arr(0),arr(1).toLong)})val waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {var currentMaxTimestamp = 0Lvar maxOutOfOrderness = 10000L// 最大允許的亂序時間是10sval sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");override def getCurrentWatermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness)override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long) = {val timestamp = element._2currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)val id = Thread.currentThread().getIdprintln("currentThreadId:"+id+",key:"+element._1+",eventtime:["+element._2+"|"+sdf.format(element._2)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+ sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp+"|"+sdf.format(getCurrentWatermark().getTimestamp)+"]")timestamp}})val outputTag = new OutputTag[Tuple2[String,Long]]("late-data"){}val window = waterMarkStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(3))) //按照消息的EventTime分配窗口,和調用TimeWindow效果一樣//.allowedLateness(Time.seconds(2))//允許數據遲到2秒.sideOutputLateData(outputTag).apply(new WindowFunction[Tuple2[String, Long], String, Tuple, TimeWindow] {override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]) = {val keyStr = key.toStringval arrBuf = ArrayBuffer[Long]()val ite = input.iteratorwhile (ite.hasNext){val tup2 = ite.next()arrBuf.append(tup2._2)}val arr = arrBuf.toArraySorting.quickSort(arr)val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");val result = keyStr + "," + arr.length + "," + sdf.format(arr.head) + "," + sdf.format(arr.last)+ "," + sdf.format(window.getStart) + "," + sdf.format(window.getEnd)out.collect(result)}})val sideOutput: DataStream[Tuple2[String, Long]] = window.getSideOutput(outputTag)sideOutput.print()window.print()env.execute("StreamingWindowWatermarkScala")}}

1.16.9.7.Flink應該如何設置最大亂序時間?

這個要結合自己的業務以及數據情況去設置。如果maxOutOfOrderness設置的太小,而自身數據發送時由于網絡等原因導致亂序或者late太多,那么最終的結果就是會有很多單條的數據在window中被觸發,數據的正確性影響太大。

對于嚴重亂序的數據,需要嚴格統計數據最大延遲時間,才能保證計算的數據準確,延時設置太小會影響數據準確性,延時設置太大不僅影響數據的實時性,更加會加重Flink作業的負擔,不是對eventTime要求特別嚴格的數據,盡量不要采用eventTime方式來處理,會有丟數據的風險。

1.16.9.8.Flink應該如何設置最大亂序時間?

這個要結合自己的業務以及數據情況去設置。如果maxOutOfOrderness設置的太小,而自身數據發送時由于網絡等原因導致亂序或者late太多,那么最終的結果就是會有很多單條的數據在window中被觸發,數據的正確性影響太大。

對于嚴重亂序的數據,需要嚴格統計數據最大延遲時間,才能保證計算的數據準確,延時設置太小會影響數據準確性,延時設置太大不僅影響數據的實時性,更加會加重Flink作業的負擔,不是對eventTime要求特別嚴格的數據,盡量不要采用eventTime方式來處理,會有丟數據的風險。

與50位技術專家面對面20年技術見證,附贈技術全景圖

總結

以上是生活随笔為你收集整理的1.16.Flink Window和Time详解、TimeWindow的应用、Window聚合分类之全量聚合、全量聚合状态变化过程-求最大值、Time介绍、EventTime和Watermarks等的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

婷婷成人在线 | 欧美片网站yy | 国产婷婷在线观看 | 啪啪小视频网站 | 国产91大片 | 亚洲aaa毛片| 国产真实精品久久二三区 | 九九热国产视频 | 久久成人毛片 | 999久久久精品视频 日韩高清www | 午夜12点| 成人三级视频 | 国产 在线观看 | 日韩天堂在线观看 | 久久综合欧美精品亚洲一区 | 成年人免费观看在线视频 | 国产成人精品免高潮在线观看 | 手机看片1042 | 久草www| av在线之家电影网站 | 久久久久夜色 | 正在播放亚洲精品 | 日韩二区三区在线观看 | 少妇高潮流白浆在线观看 | 在线播放精品一区二区三区 | 日日干,天天干 | 天天射天天搞 | 99r精品视频在线观看 | 超碰成人av| 激情网站网址 | 欧美激情视频一二三区 | 国产99一区视频免费 | 久久久91精品国产一区二区精品 | 欧美成人va| 欧美一区日韩一区 | 黄色大全免费观看 | 久久精品亚洲一区二区三区观看模式 | 日韩网站中文字幕 | av资源中文字幕 | 精品久久一区二区三区 | 日韩久久电影 | a天堂中文在线 | av高清一区二区三区 | 国产馆在线播放 | 日韩高清无线码2023 | 天天干com | 午夜体验区| 国产一区欧美一区 | 91免费在线视频 | 免费又黄又爽的视频 | 亚在线播放中文视频 | 亚洲欧美色婷婷 | 久久伦理 | 色美女在线 | 麻豆成人精品视频 | 成人精品亚洲 | 小草av在线播放 | 91九色porn在线资源 | 国产黄色大片免费看 | 国产在线不卡 | 97电影院在线观看 | 在线观看mv的中文字幕网站 | 成人免费看黄 | 国语黄色片 | 国产手机在线观看视频 | 五月婷婷综合激情 | 国产精品久久久久永久免费观看 | 国产网站在线免费观看 | 国产又粗又猛又爽又黄的视频免费 | 日韩亚洲在线视频 | www.黄色片网站 | 伊人狠狠色 | 久视频在线 | 精品一区二三区 | 最近中文字幕高清字幕在线视频 | 人人玩人人爽 | 国内精品久久久久影院一蜜桃 | 黄色一及电影 | 日韩二区三区在线 | 亚洲日本在线视频观看 | 成人91视频| 91热精品| 国产精品成人免费精品自在线观看 | 国产又黄又爽又猛视频日本 | 久久久久免费网站 | 日韩大陆欧美高清视频区 | 日韩精品一区二区三区中文字幕 | 亚洲成年人在线播放 | 亚洲精品欧美精品 | 欧美黑人巨大xxxxx | 日本狠狠干 | 亚洲理论影院 | 国产另类av| 亚洲电影自拍 | 久久在线免费视频 | 久久综合狠狠综合 | 久久久久国产精品免费网站 | 波多野结衣在线视频免费观看 | 亚洲综合网| 亚洲天堂网在线视频观看 | www.久久com | 在线 国产 亚洲 欧美 | 韩日色视频 | 亚洲精品中文字幕在线观看 | 久久网址| 精品麻豆 | 在线播放一区 | 成 人 黄 色 视频播放1 | 日韩在线免费视频 | 天堂av在线免费 | 成人v| 手机成人在线 | 国产91影院 | 婷婷久久综合网 | 欧美久久久久久久久久久久久 | 麻豆国产露脸在线观看 | 美女网站在线观看 | av中文字幕在线看 | 九九免费在线观看视频 | 夜夜视频资源 | 狠狠操操网 | 91一区一区三区 | 97中文字幕 | 在线观影网站 | 国产精品69久久久久 | 99热高清| 一区二区三区福利 | 99久久综合狠狠综合久久 | 高清免费av在线 | 久久综合精品国产一区二区三区 | 日韩精品免费一区二区三区 | 99精品黄色 | 人人干人人搞 | 免费亚洲婷婷 | 免费观看一级成人毛片 | 国产精品久久久免费看 | 在线观看成人国产 | 免费国产在线观看 | 亚洲精品中文字幕视频 | 亚洲国产欧洲综合997久久, | 亚洲va韩国va欧美va精四季 | 8x成人免费视频 | 青青射| 色网站在线观看 | 99精品欧美一区二区三区 | 久久久精品亚洲 | 91chinese在线 | 亚洲欧美经典 | 天天亚洲 | 亚洲情影院| 亚洲最大av在线播放 | av丝袜在线 | a久久免费视频 | 中文字幕综合在线 | 日韩美一区二区三区 | 中文字幕av全部资源www中文字幕在线观看 | 色婷婷国产精品一区在线观看 | 国产香蕉97碰碰久久人人 | 欧美性色黄 | 亚洲动漫在线观看 | 国产精品理论片在线观看 | 日韩性久久 | 日日夜av| 91av在线电影 | 久久国内免费视频 | 久久精品一二区 | 亚洲精品午夜国产va久久成人 | 97成人免费视频 | 欧美精品九九99久久 | 六月丁香婷婷网 | 国产探花在线看 | 91精品国产一区二区在线观看 | 美女久久网站 | 97国产大学生情侣白嫩酒店 | 国际精品久久 | 狠狠躁日日躁 | 激情电影影院 | 日韩三级中文字幕 | 91香蕉国产 | 日韩免费高清 | 日韩欧美在线视频一区二区三区 | 久久狠狠亚洲综合 | 久久人人爽人人爽人人片av免费 | 免费合欢视频成人app | 免费视频久久久 | 在线电影 你懂得 | 亚洲国产中文字幕 | 黄网在线免费观看 | 一区二区三区视频 | 中文不卡视频 | 国产精品久久久久久久久软件 | 亚洲涩涩一区 | 极品久久久久 | 国产91成人在在线播放 | 亚洲国产精久久久久久久 | 午夜久久网站 | 亚洲精品成人av在线 | 狠狠狠狠狠狠狠狠干 | 美女av电影 | 色婷婷激情| 中文字幕亚洲欧美日韩2019 | 91完整版观看 | 九草视频在线 | 成人免费毛片aaaaaa片 | 亚洲成人精品在线观看 | 欧美性大战久久久久 | 99久久99久久精品国产片果冰 | 国产视频精选在线 | 99精品国产99久久久久久福利 | 久久精品高清视频 | 精品一区二区在线播放 | 国产aaa免费视频 | 一级黄色av | 亚洲国产精品成人精品 | 在线观看国产91 | 毛片区 | 免费的国产精品 | 最近字幕在线观看第一季 | www视频在线观看 | 久久精品99久久久久久 | 1024手机基地在线观看 | 中文字幕av在线电影 | 日韩av一卡二卡三卡 | av黄色一级片 | 美女禁18| 热久久精品在线 | av综合 日韩| 最近中文字幕大全中文字幕免费 | 中文字幕电影在线 | 性色在线视频 | 中文字幕2021 | 久久久久久久福利 | 亚洲日韩中文字幕在线播放 | av在线播放快速免费阴 | 久久久国产一区二区三区四区小说 | 夜夜澡人模人人添人人看 | 免费视频 三区 | 91在线在线观看 | 国产美女网站在线观看 | 激情欧美国产 | 日韩欧美在线播放 | 久久国产视屏 | 草莓视频在线观看免费观看 | 中文字幕在线看视频国产中文版 | 精品视频| 久久国色夜色精品国产 | 激情伊人五月天久久综合 | 午夜av免费看 | 园产精品久久久久久久7电影 | 成片免费观看视频999 | 亚洲精品自拍 | 黄a网| 一区二区精品在线 | 九草视频在线观看 | 五月天久久狠狠 | 亚洲麻豆精品 | 福利av影院 | 91成年视频| 狠狠躁夜夜躁人人爽超碰97香蕉 | 亚洲精品国产精品乱码不99热 | 青青啪| 少妇啪啪av入口 | 91视频在线网址 | 人人澡超碰碰97碰碰碰软件 | 国产视频精品网 | 五月天国产精品 | 丁香综合 | 久久久久久久久久福利 | 久久久色| 久久久久免费精品视频 | 国产成人久久av免费高清密臂 | 激情网站免费观看 | 美女精品久久久 | 国产97免费| 欧美成人影音 | 91高清视频免费 | 黄色福利网 | 天天爱天天干天天爽 | 视频一区二区在线观看 | 亚洲一级黄色片 | 精品国产亚洲一区二区麻豆 | 丝袜美女在线 | 干天天| 久久精品导航 | 久久久久久久久网站 | av综合网址| 日韩精品中文字幕在线播放 | 夜色成人av | 国产精品九九九九九 | 天堂av网址 | 国产精品黄 | 欧美日韩在线观看一区二区三区 | 国产99精品 | 亚洲乱码国产乱码精品天美传媒 | 久久精品一二三区 | www.色婷婷 | 国产成人一区三区 | 欧美a级在线 | 韩国一区在线 | 免费在线观看毛片网站 | 亚洲精品国精品久久99热 | 欧美日韩中文在线观看 | 伊人亚洲综合 | 精品综合久久久 | 91视频下载 | 麻豆视频在线播放 | 一区二区三区在线观看免费 | 精品欧美乱码久久久久久 | 在线免费观看成人 | 91精品人成在线观看 | 97超碰资源站 | 亚洲黄色免费网站 | 毛片a级片 | 亚洲精品高清视频在线观看 | 国产中文字幕在线播放 | 国产高清av在线播放 | 久久久免费| 狠狠网 | 国产高清视频免费最新在线 | 大胆欧美gogo免费视频一二区 | 99久久日韩精品免费热麻豆美女 | 手机av在线网站 | 99久热在线精品 | 正在播放国产一区 | 91精品国自产在线观看 | 91精品视频一区 | 激情五月综合网 | 国产在线污 | 丁香婷婷网 | 国产精品一区二区三区观看 | 五月天国产| 91精品国产91久久久久 | 久久久精品亚洲 | 日韩综合色 | 久久久久国产精品www | 91久久精品一区二区二区 | 免费高清av在线看 | 黄色av影视| 99在线观看视频 | 国产精品99精品 | 国产成人精品午夜在线播放 | 在线 高清 中文字幕 | 天天操夜夜操 | 国产在线永久 | 色五月激情五月 | 国产精品午夜久久 | 久久久久成人精品免费播放动漫 | 国产精品美女免费看 | 日韩欧美精品在线视频 | 久久久免费毛片 | 国产精品1024 | 欧美aaa一级 | 怡红院av久久久久久久 | 欧美另类老妇 | 日韩黄色免费电影 | 极品中文字幕 | 亚洲欧美在线观看视频 | 99精品一级欧美片免费播放 | 在线观看视频97 | 国产日韩欧美在线播放 | av成人亚洲 | www.五月天色| 国产精品久久久久久久久久了 | www久久久| 成人在线播放网站 | 最新的av网站 | 国产区av在线 | 国产日韩视频在线观看 | 国产96在线观看 | 久久国产精品99久久久久久进口 | 国产一卡二卡在线 | 一区二区三区四区久久 | 亚洲精区二区三区四区麻豆 | 天天干天天做 | 激情在线五月天 | 天海冀一区二区三区 | 在线国产精品视频 | 久久久久久欧美二区电影网 | 亚洲精品男人天堂 | 亚洲欧洲av在线 | 97在线看片| 日韩在线免费 | 911国产 | 91黄色在线视频 | 亚洲精品在线网站 | 一区二区精品在线 | 久久婷婷久久 | 毛片永久新网址首页 | 婷婷激情站| 国产伦精品一区二区三区在线 | 超碰97.com | 亚洲乱码精品 | 免费看av片网站 | www.色五月.com| 中文区中文字幕免费看 | 黄色大片中国 | 久久经典视频 | 国产真实精品久久二三区 | 极品久久久久久久 | 九九综合九九综合 | 欧美日韩a视频 | 在线观看视频国产 | 456成人精品影院 | 国产精品99久久99久久久二8 | 久草免费在线 | 久久久久久久久影院 | 99精品视频在线观看免费 | 成人一级片在线观看 | av夜夜操| 丁香六月色 | 欧美日韩在线看 | 国产成人精品亚洲精品 | 国产精品99久久久久久武松影视 | 久久高清免费观看 | www.在线看片.com| 福利视频导航网址 | 成人免费视频网站 | 国产视频在线播放 | 成年人免费电影 | 一区二区三区电影大全 | 国产 日韩 欧美 自拍 | 亚洲精品字幕在线观看 | 97人人模人人爽人人喊中文字 | 2024国产在线| 亚洲精品一区二区网址 | 免费观看久久久 | 国产精成人品免费观看 | 69视频国产 | 九色福利视频 | 国产精品一级在线 | 99婷婷| 天天天插 | 亚洲欧美少妇 | 九九精品视频在线 | 一区三区视频 | 97在线免费视频观看 | 91丨九色丨蝌蚪丨对白 | 国产高清 不卡 | 99超碰在线播放 | 日本性高潮视频 | 美女视频免费一区二区 | 最近中文字幕在线 | 在线国产一区二区 | 国产在线久久久 | 精品国产aⅴ麻豆 | 免费观看国产精品视频 | 国内成人综合 | 欧美成人h版 | 久久久久久草 | 91精彩在线视频 | 一区二区不卡视频在线观看 | 麻豆系列在线观看 | 伊人手机在线 | 91视视频在线直接观看在线看网页在线看 | 麻豆高清免费国产一区 | 2019免费中文字幕 | 国产视频一区二区在线观看 | 天天综合视频在线观看 | 欧美国产日韩一区二区三区 | 日韩精品久久久免费观看夜色 | 久久久久久精 | 免费看片网址 | 制服丝袜天堂 | 麻豆影视在线播放 | 91女子私密保健养生少妇 | 日韩久久激情 | 欧美一级片免费播放 | 久久99精品国产一区二区三区 | 麻豆av一区二区三区在线观看 | 色91在线| 91网免费观看| 亚洲.www| 亚洲国产剧情av | 超碰97中文| 99精品久久久 | 久久精品这里都是精品 | 久久视频一区二区 | 日韩高清久久 | 国产一区精品在线 | 午夜精品一区二区三区在线观看 | 一区二区三区 中文字幕 | 精品久久久久久久久久国产 | aaawww| 99精品免费视频 | 精品国产不卡 | 国产一区二区视频在线播放 | 成人一区二区三区中文字幕 | 黄色成人影视 | 青青河边草观看完整版高清 | 亚洲精品视频网站在线观看 | 青青久草在线视频 | 91桃色免费观看 | 成 人 a v天堂 | 深爱综合网| 国产亚洲精品久久久久久无几年桃 | 国产精品久久久区三区天天噜 | 色久av| 国产一级特黄电影 | 成人av网站在线播放 | 狠狠操操操 | 欧美日韩视频免费 | 国产黄色看片 | 丁香综合 | 美女视频黄免费 | 国产色啪 | 国产视频一区在线免费观看 | a电影在线观看 | 在线精品视频免费播放 | 高潮久久久久久 | 91视频观看免费 | 亚洲精品1区2区3区 超碰成人网 | 亚洲国产视频在线 | 91av视频在线观看免费 | 亚洲综合小说电影qvod | 成人免费共享视频 | 久久久久久久久久久久久久av | 美女精品久久久 | 国产黄色片久久久 | 国产精品一区在线观看 | 天天综合五月天 | 日本美女xx| 24小时日本在线www免费的 | 亚洲精品日韩av | 免费观看成人av | 色妞久久福利网 | 免费看污污视频的网站 | 亚洲黄色三级 | 欧美日韩观看 | 久久看免费视频 | 精品国产乱码久久久久久三级人 | 日韩av女优视频 | 丁香五香天综合情 | 天干啦夜天干天干在线线 | 久久综合久色欧美综合狠狠 | 一区二区欧美日韩 | av在线免费观看不卡 | 日韩电影中文 | 日韩中文免费视频 | 九色视频网 | 国产精品一区二区在线观看免费 | 伊人六月 | 久久理论电影 | 91视频在线网址 | 国产成人在线免费观看 | 人人干狠狠干 | 福利视频午夜 | 欧美日韩另类在线 | 九九一级片 | 久久你懂的 | 成人国产精品久久久春色 | 免费高清男女打扑克视频 | 日韩系列在线观看 | 国产aa精品 | 婷婷去俺也去六月色 | 美女免费黄网站 | 日本三级在线观看中文字 | 91av视频免费观看 | 欧美黄色特级片 | 成人xxxx | 99精品国产福利在线观看免费 | 久久精品一 | 国产精品99久久久久人中文网介绍 | 亚洲精品视频播放 | 成人网看片| 国产在线精品区 | 伊人黄色网 | 成 人 黄 色 视频播放1 | 91成人精品国产刺激国语对白 | 色999五月色| 午夜在线免费观看 | 国产五月天婷婷 | 国产成人高清在线 | 91视频 - v11av| 天天射天天操天天 | 国产成人精品在线播放 | 国产精品久久久久av免费 | 久久久国产精品视频 | 日韩黄色免费 | 免费精品久久久 | 国产一区二区视频在线播放 | 国产尤物一区二区三区 | 人人射人人爱 | 韩国av免费在线 | 亚洲一级影院 | 免费看污在线观看 | 天天精品视频 | 高清不卡一区二区在线 | 久久一级片 | www操操| 日韩高清在线观看 | 91av视频在线观看免费 | 国产日产欧美在线观看 | 久久人人爽人人爽人人片av软件 | 欧美国产视频在线 | 99精品国自产在线 | 免费网址在线播放 | 久久久久欠精品国产毛片国产毛生 | 99热99| 成全在线视频免费观看 | 免费色视频在线 | 免费观看高清 | 天天夜操 | 懂色av一区二区三区蜜臀 | 国产激情小视频在线观看 | 亚洲乱码中文字幕综合 | 香蕉视频国产在线观看 | 久久不卡免费视频 | 在线综合 亚洲 欧美在线视频 | 久久综合狠狠综合 | 91在线免费观看网站 | 特级毛片在线免费观看 | av女优中文字幕在线观看 | www.夜夜操| 日本视频久久久 | 免费看片成人 | 少妇bbbb揉bbbb日本 | 日韩视频a | 免费看片在线观看 | 五月天高清欧美mv | 五月婷网站 | 久久亚洲区 | 国产精品国产三级国产 | 91精品爽啪蜜夜国产在线播放 | 十八岁以下禁止观看的1000个网站 | 91在线免费播放 | 久久精品综合一区 | www.黄色片网站 | 久久伦理影院 | 毛片99 | 日韩在线视频观看免费 | 国产亚洲视频在线免费观看 | 久久精品播放 | av在线日韩 | 碰超在线观看 | 亚洲一区不卡视频 | 亚洲免费不卡 | 999ZYZ玖玖资源站永久 | 精品在线99 | 精品久久久久久亚洲综合网站 | 在线免费观看国产视频 | 三级黄色大片在线观看 | 日韩在线观看三区 | 九九免费在线看完整版 | 91丨九色丨蝌蚪丨对白 | 亚洲午夜不卡 | 中国黄色一级大片 | 日日爱网址 | 五月婷婷在线视频观看 | 国产精品免费久久久久 | 欧美另类亚洲 | 视频在线观看入口黄最新永久免费国产 | 91av在线看 | 丁香色综合| 亚洲视频久久久 | 天天干天天操av | 日韩高清av | 国产成人精品一区二区在线 | 亚洲精品高清视频 | 美女国内精品自产拍在线播放 | 久草精品视频在线播放 | 毛片网在线观看 | av线上看 | 日批视频在线播放 | 久久艹人人 | 丁香婷婷激情国产高清秒播 | 天天射天天射 | 一区 在线观看 | 久久99精品国产一区二区三区 | 亚洲欧洲成人精品av97 | 国产91粉嫩白浆在线观看 | 欧美性生爱 | 国产中文字幕网 | 91亚洲狠狠婷婷综合久久久 | 免费精品在线观看 | 国产成人精品av在线 | 国产最新网站 | 久久久久久国产精品亚洲78 | 天天干天天怕 | 精品视频专区 | 婷婷午夜| 欧美一二三在线 | 激情综合网婷婷 | 91视频免费国产 | 日韩毛片久久久 | 国产黄色a | 日韩在线观看高清 | 免费久久99精品国产 | 日韩精品一卡 | 午夜黄色影院 | 免费看色的网站 | 亚洲理论在线观看 | 亚洲精品美女在线观看 | 久久久久国产精品一区 | 超碰免费在线公开 | 国产一区视频在线 | 久草精品视频在线看网站免费 | 在线成人高清电影 | av夜夜操 | 最近中文字幕国语免费高清6 | 一区二区精品 | 在线观看中文字幕 | 久久免费99精品久久久久久 | 亚洲国产三级在线 | 色综合在| 亚洲午夜av久久乱码 | 四虎影视精品永久在线观看 | 久久久久久久久久久网 | 久久久久久久久久久久久国产精品 | 久久精品视频中文字幕 | 国产黄色精品网站 | 91传媒在线播放 | 国产精品刺激对白麻豆99 | 亚洲精品国产精品国自产在线 | 在线激情电影 | 一区二区视频免费在线观看 | 成人一级在线 | 国产一区二区在线播放 | 日韩av区| 一区二区三区精品久久久 | 91视频久久久久久 | 欧美日韩中文国产一区发布 | 亚洲精选99| 国产美女精品人人做人人爽 | 97看片网 | 日韩精品一区二区三区电影 | 色婷婷在线观看视频 | 99精品免费在线观看 | 久久福利剧场 | 天天干人人 | 欧美精品一二三 | 亚洲综合爱 | 日本狠狠色 | 成人免费看视频 | 国产97色 | 国产精品一区专区欧美日韩 | 天天看天天干 | 成人h视频在线播放 | 97免费| 久章草在线观看 | 波多野结衣网址 | 婷婷六月久久 | 精品欧美一区二区在线观看 | 国产99区 | 日韩系列在线 | 免费成人短视频 | 五月婷婷视频 | 久久免费大片 | 国产中文伊人 | 久久成人高清视频 | 国产中文a| www.久草视频 | 色偷偷88888欧美精品久久久 | 日韩免费专区 | 波多野结衣精品在线 | 亚洲精品福利在线 | 玖玖玖国产精品 | 99久久精品久久久久久动态片 | 国内精品久久久久影院一蜜桃 | 色com| 国产精品18久久久久白浆 | 日韩三级.com | 欧美日韩午夜在线 | 91亚洲欧美| 日韩亚洲在线 | 亚洲专区视频在线观看 | 午夜 久久 tv | 欧美欧美 | 日韩一区二区三区视频在线 | 毛片无卡免费无播放器 | 亚洲 欧美 成人 | 国产精品免费久久久久久久久久中文 | 欧美日韩国产三级 | avwww在线观看 | 国产精品一区二区麻豆 | 色国产精品一区在线观看 | 亚洲电影一级黄 | 天天天天天天干 | 精品在线播放 | 在线免费av播放 | av黄色免费在线观看 | 午夜久久久久久久 | 在线播放一区二区三区 | 91精品国产欧美一区二区成人 | 深爱激情五月婷婷 | 午夜久久福利 | 国产三级国产精品国产专区50 | 日色在线视频 | 狠狠躁夜夜躁人人爽超碰91 | 久久综合五月天婷婷伊人 | 午夜精品999| 国产人成免费视频 | 色的网站在线观看 | 欧美福利视频一区 | 欧美在线视频免费 | 久久不卡国产精品一区二区 | 日韩精品无| 日韩欧美视频在线免费观看 | 天天操天天操 | 在线观看成人av | 成人黄色大片在线观看 | 一级片黄色片网站 | 欧美亚洲成人免费 | 夜夜骑日日 | 99草视频 | 91重口视频 | 在线日韩| 国产电影一区二区三区四区 | 久久69av | 国产一区二区三区黄 | 91丝袜美腿 | 欧美午夜a| 日韩在线国产精品 | 天天干天天干天天干 | 国产久草在线观看 | www.午夜色.com| 久久久男人的天堂 | 涩涩成人在线 | 日韩一级精品 | www日日 | 欧美精品二 | www日 | 欧美精品免费视频 | 欧美色精品天天在线观看视频 | 91精品久久久久久久久 | 九九九九精品九九九九 | 五月视频| 日日摸日日碰 | 国产专区精品视频 | 在线播放 亚洲 | 激情av在线播放 | www久| 天天在线免费视频 | 久草久草久草久草 | 天天干天天操天天操 | 字幕网资源站中文字幕 | 亚洲精品人人 | 黄网站色成年免费观看 | 国产男男gay做爰 | 久久久久久久久久久网 | 国产免费黄视频在线观看 | 亚洲精品乱码久久久久久蜜桃91 | 久久久久免费精品视频 | 91看片淫黄大片一级在线观看 | 色婷五月天| 国产高清在线免费 | 亚洲综合干| 操操操综合 | 黄色免费视频在线观看 | 国产经典三级 | wwwwww黄 | 在线岛国av| 国产精品自产拍在线观看桃花 | 欧美 激情 国产 91 在线 | 国产成人精品一区二区三区福利 | www.com.日本一级 | 国产精品久久99综合免费观看尤物 | 怡春院av| 美女视频黄免费网站 | 毛片一级免费一级 | av高清影院 | 丝袜少妇在线 | 91在线视频观看免费 | 91成版人在线观看入口 | 日韩综合一区二区 | 人人爽人人香蕉 | 国产又粗又长的视频 | av高清一区 | 国产在线观看,日本 | 婷五月激情 | 欧美成人精品欧美一级乱黄 | 狠狠狠狠狠色综合 | 日韩综合第一页 | 亚洲 中文 在线 精品 | 99久久综合狠狠综合久久 | 久久久久久久久久久精 | 久久综合偷偷噜噜噜色 | 波多野结衣在线中文字幕 | 中文在线最新版天堂 | 日日操日日 | 欧美在线视频一区二区三区 | 国产伦理精品一区二区 | 久久精品视频在线看 | 亚洲成 人精品 | 日韩高清在线一区二区 | 天天色天天骑天天射 | 亚洲精品国产日韩 | 色婷婷久久一区二区 | 亚洲毛片一区二区三区 | 毛片精品免费在线观看 | 天天操天天干天天操天天干 | 日韩精品不卡 | 超级av在线 | 国产精品久久久 | 日韩极品视频在线观看 | 91精彩视频在线观看 | 日韩色av色资源 | 夜夜躁日日躁狠狠久久88av | 青草视频在线播放 | 狠狠狠狠狠狠 | 伊人五月天av | 欧美另类高清 | 蜜桃av久久久亚洲精品 | 99午夜| 91九色综合 | 六月丁香激情综合色啪小说 | 91精品视频在线免费观看 | 美女在线免费视频 | 成人免费视频播放 | 草免费视频 | 国内精品久久久久影院优 | 91视频网址入口 | 99精品视频在线播放免费 | 在线91播放 | 欧美精品在线视频 | 一区三区视频在线观看 | 日韩亚洲精品电影 | 99精品国产免费久久久久久下载 | 久久免费看视频 | 久久久久久久久免费 | 日韩激情中文字幕 | 国产色视频网站2 | 欧美va天堂在线电影 | 天天插日日插 | 天天综合狠狠精品 | 日韩精品久久一区二区 | 丁香综合五月 | 久草视频视频在线播放 | 丰满少妇高潮在线观看 | 手机av永久免费 | 99国产精品一区 | 日韩美女高潮 | 四虎永久免费在线观看 | 欧美精品你懂的 | 99久久99视频 | 中文在线a天堂 | 你操综合 | 成人av免费在线观看 | 国产国产人免费人成免费视频 | 国产综合片 | 成人国产精品久久久久久亚洲 | 在线一二区 | 欧美人牲 | 久草在线久草在线2 | 久草在线中文视频 | 国产无套一区二区三区久久 | 久久精品国产精品亚洲 | 天天爽人人爽夜夜爽 | 中文字幕在线日亚洲9 | 日本一区二区免费在线观看 | 久久人视频| 欧美日韩xxx | 国产精品 中文在线 | 伊人手机在线 | 亚洲好视频 | 免费看片成人 | 久久久久久国产精品久久 | 亚洲精品国产视频 | 麻豆91小视频 | 国产成人a亚洲精品v | 精品一区二区免费视频 | 国产高清小视频 | 91成人看片 | 亚洲毛片视频 | 久久精品视频国产 | 免费亚洲黄色 | 国产黄色一级大片 | 少妇bbb | 中文网丁香综合网 | 精品国产乱码一区二区三区在线 | 中文字幕a∨在线乱码免费看 | 黄污网站在线 | 国产精品理论片 | 国产精品国产亚洲精品看不卡 | 狠狠干网址| 国产精品成人一区二区三区吃奶 | 国产亚洲高清视频 | 亚欧洲精品视频在线观看 | 波多野结衣视频一区二区 | 99高清视频有精品视频 | 69av在线播放 | 久草在线99 | 亚洲成人xxx | 国产美女精彩久久 | 国产啊v在线观看 | 国产黄在线看 | 日韩在线一级 | 久久精品日本啪啪涩涩 | 国产日产精品一区二区三区四区的观看方式 | 黄色网址中文字幕 | 97av视频| 91插插插网站 | 在线观看黄色的网站 | 中文字幕资源在线 | 日本久久中文 | 午夜色站 | 不卡的一区二区三区 | 一区中文字幕在线观看 | 丝袜美腿在线视频 |