flink常见算子的一些操作
常見Transformation操作
map和filter
flatMap,keyBy和sum
/** * 滑動窗口實現單詞計數 * 數據源:socket * 需求:每隔1秒計算最近2秒單詞出現的次數 * * 練習算子: * flatMap * keyBy: * dataStream.keyBy("someKey") // 指定對象中的 "someKey"字段作為分組key * dataStream.keyBy(0) //指定Tuple中的第一個元素作為分組key * sum */ public class WindowWordCountJava {public static void main(String[] args) throws Exception {int port;try{ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch (Exception e){System.err.println("no port set,user default port 9988");port=9988;}//步驟一:獲取flink運行環境(stream)StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();String hostname="10.126.88.226";String delimiter="\n";//步驟二:獲取數據源DataStreamSource<String> textStream = env.socketTextStream(hostname, port, delimiter);//步驟三:執行transformation操作SingleOutputStreamOperator<WordCount> wordCountStream = textStream.flatMap(new FlatMapFunction<String, WordCount>() {public void flatMap(String line, Collector<WordCount> out) throws Exception {String[] fields = line.split("\t");for (String word : fields) {out.collect(new WordCount(word, 1L));}}}).keyBy("word").timeWindow(Time.seconds(2), Time.seconds(1))//每隔1秒計算最近2秒.sum("count");wordCountStream.print().setParallelism(1);//打印并設置并行度//步驟四:運行程序env.execute("socket word count");}public static class WordCount{public String word;public long count;public WordCount(){}public WordCount(String word,long count){this.word=word;this.count=count;}@Overridepublic String toString() {return "WordCount{" +"word='" + word + '\'' +", count=" + count +'}';}} }union
/*** 合并多個流,新的流會包含所有流中的數據,但是union是一個限制,就是所有合并的流類型必須是一致的*/ public class unionDemo {public static void main(String[] args) throws Exception {//獲取Flink的運行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//獲取數據源DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,并行度只能設置為1DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);//把text1和text2組裝到一起DataStream<Long> text = text1.union(text2);DataStream<Long> num = text.map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {System.out.println("原始接收到數據:" + value);return value;}});//每2秒鐘處理一次數據DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);//打印結果sum.print().setParallelism(1);String jobName = unionDemo.class.getSimpleName();env.execute(jobName);} }connect,conMap和conFlatMap
/*** 和union類似,但是只能連接兩個流,兩個流的數據類型可以不同,會對兩個流中的數據應用不同的處理方法*/ public class ConnectionDemo {public static void main(String[] args) throws Exception {//獲取Flink的運行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//獲取數據源DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,并行度只能設置為1DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "str_" + value;}});ConnectedStreams<Long, String> connectStream = text1.connect(text2_str);SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String, Object>() {@Overridepublic Object map1(Long value) throws Exception {return value;}@Overridepublic Object map2(String value) throws Exception {return value;}});//打印結果result.print().setParallelism(1);String jobName = ConnectionDemo.class.getSimpleName();env.execute(jobName);} }Split和Select
/*** 根據規則把一個數據流切分為多個流應用場景:* 可能在實際工作中,源數據流中混合了多種類似的數據,多種類型的數據處理規則不一樣,所以就可以在根據一定的規則,* 把一個數據流切分成多個數據流,這樣每個數據流就可以使用不用的處理邏輯了*/ public class SplitDemo {public static void main(String[] args) throws Exception {//獲取Flink的運行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//獲取數據源DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,并行度只能設置為1//對流進行切分,按照數據的奇偶性進行區分SplitStream<Long> splitStream = text.split(new OutputSelector<Long>() {@Overridepublic Iterable<String> select(Long value) {ArrayList<String> outPut = new ArrayList<>();if (value % 2 == 0) {outPut.add("even");//偶數} else {outPut.add("odd");//奇數}return outPut;}});//選擇一個或者多個切分后的流DataStream<Long> evenStream = splitStream.select("even");DataStream<Long> oddStream = splitStream.select("odd");DataStream<Long> moreStream = splitStream.select("odd","even");//打印結果evenStream.print().setParallelism(1);String jobName = SplitDemo.class.getSimpleName();env.execute(jobName);} }常見sink操作
print() / printToErr()
打印每個元素的toString()方法的值到標準輸出或者標準錯誤輸出流中
writeAsText()
自定義sink
<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency>自定義redis sink
/*** 把數據寫入redis*/ public class SinkForRedisDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> text = env.socketTextStream("hadoop100", 9000, "\n");//lpsuh l_words word//對數據進行組裝,把string轉化為tuple2<String,String>DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(String value) throws Exception {return new Tuple2<>("l_words", value);}});//創建redis的配置FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop110").setPort(6379).build();//創建redissinkRedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());l_wordsData.addSink(redisSink);env.execute("StreamingDemoToRedis");}public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>> {//表示從接收的數據中獲取需要操作的redis key@Overridepublic String getKeyFromData(Tuple2<String, String> data) {return data.f0;}//表示從接收的數據中獲取需要操作的redis value@Overridepublic String getValueFromData(Tuple2<String, String> data) {return data.f1;}@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}} }DataSet算子操作(Sparkcore)
source
基于文件
readTextFile(path)
基于集合
fromCollection(Collection)
transform
Map:輸入一個元素,然后返回一個元素,中間可以做一些清洗轉換等操作
FlatMap:輸入一個元素,可以返回零個,一個或者多個元素
MapPartition>:類似map,一次處理一個分區的數據【如果在進行map處理的時候需要獲取第三方資源鏈接,建議使用MapPartition】
Filter:過濾函數,對傳入的數據進行判斷,符合條件的數據會被留下
Reduce:對數據進行聚合操作,結合當前元素和上一次reduce返回的值進行聚合操作,然后返回一個新的值
Aggregate:sum、max、min等
Distinct:返回一個數據集中去重之后的元素,data.distinct()
Join:內連接
OuterJoin:外鏈接
Cross:獲取兩個數據集的笛卡爾積
Union:返回兩個數據集的總和,數據類型需要一致
First-n:獲取集合中的前N個元素
Sort Partition:在本地對數據集的所有分區進行排序,通過sortPartition()的鏈接調用來完成對多個字段的排序
MapPartition
public class MapPartitionDemo {public static void main(String[] args) throws Exception{//獲取運行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();ArrayList<String> data = new ArrayList<>();data.add("hello you");data.add("hello me");DataSource<String> text = env.fromCollection(data);/*text.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {//獲取數據庫連接--注意,此時是每過來一條數據就獲取一次鏈接//處理數據//關閉連接return value;}});*/DataSet<String> mapPartitionData = text.mapPartition(new MapPartitionFunction<String, String>() {@Overridepublic void mapPartition(Iterable<String> values, Collector<String> out) throws Exception {//獲取數據庫連接--注意,此時是一個分區的數據獲取一次連接【優點,每個分區獲取一次鏈接】//values中保存了一個分區的數據//處理數據Iterator<String> it = values.iterator();while (it.hasNext()) {String next = it.next();String[] split = next.split("\\W+");for (String word : split) {out.collect(word);}}//關閉鏈接}});mapPartitionData.print();} }distinct
/*** 對數據進行去重*/ public class DistinctDemo {public static void main(String[] args) throws Exception{//獲取運行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();ArrayList<String> data = new ArrayList<>();data.add("you jump");data.add("i jump");DataSource<String> text = env.fromCollection(data);FlatMapOperator<String, String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.toLowerCase().split("\\W+");for (String word : split) {System.out.println("單詞:"+word);out.collect(word);}}});flatMapData.distinct()// 對數據進行整體去重.print();}}join
/*** 對數據進行join*/ public class JoinDemo {public static void main(String[] args) throws Exception{//獲取運行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//tuple2<用戶id,用戶姓名>ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>();data1.add(new Tuple2<>(1,"zs"));data1.add(new Tuple2<>(2,"ls"));data1.add(new Tuple2<>(3,"ww"));//tuple2<用戶id,用戶所在城市>ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>();data2.add(new Tuple2<>(1,"beijing"));data2.add(new Tuple2<>(2,"shanghai"));data2.add(new Tuple2<>(3,"guangzhou"));DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1);DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2);text1.join(text2).where(0)//指定第一個數據集中需要進行比較的元素角標.equalTo(0)//指定第二個數據集中需要進行比較的元素角標.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second)throws Exception {return new Tuple3<>(first.f0,first.f1,second.f1);}}).print();System.out.println("==================================");//注意,這里用map和上面使用的with最終效果是一致的。/*text1.join(text2).where(0)//指定第一個數據集中需要進行比較的元素角標.equalTo(0)//指定第二個數據集中需要進行比較的元素角標.map(new MapFunction<Tuple2<Tuple2<Integer,String>,Tuple2<Integer,String>>, Tuple3<Integer,String,String>>() {@Overridepublic Tuple3<Integer, String, String> map(Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>> value) throws Exception {return new Tuple3<>(value.f0.f0,value.f0.f1,value.f1.f1);}}).print();*/}}OutJoin
/*** 外連接:* 左外連接* 右外連接* 全外連接*/ public class OuterJoinDemo {public static void main(String[] args) throws Exception{//獲取運行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//tuple2<用戶id,用戶姓名>ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>();data1.add(new Tuple2<>(1,"zs"));data1.add(new Tuple2<>(2,"ls"));data1.add(new Tuple2<>(3,"ww"));//tuple2<用戶id,用戶所在城市>ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>();data2.add(new Tuple2<>(1,"beijing"));data2.add(new Tuple2<>(2,"shanghai"));data2.add(new Tuple2<>(4,"guangzhou"));DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1);DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2);/*** 左外連接** 注意:second這個tuple中的元素可能為null**/text1.leftOuterJoin(text2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {if(second==null){return new Tuple3<>(first.f0,first.f1,"null");}else{return new Tuple3<>(first.f0,first.f1,second.f1);}}}).print();System.out.println("=============================================================================");/*** 右外連接** 注意:first這個tuple中的數據可能為null**/text1.rightOuterJoin(text2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {if(first==null){return new Tuple3<>(second.f0,"null",second.f1);}return new Tuple3<>(first.f0,first.f1,second.f1);}}).print();System.out.println("=============================================================================");/*** 全外連接** 注意:first和second這兩個tuple都有可能為null**/text1.fullOuterJoin(text2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {if(first==null){return new Tuple3<>(second.f0,"null",second.f1);}else if(second == null){return new Tuple3<>(first.f0,first.f1,"null");}else{return new Tuple3<>(first.f0,first.f1,second.f1);}}}).print();} }Cross
/*** 笛卡爾積*/ public class CrossDemo {public static void main(String[] args) throws Exception{//獲取運行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//tuple2<用戶id,用戶姓名>ArrayList<String> data1 = new ArrayList<>();data1.add("zs");data1.add("ww");//tuple2<用戶id,用戶所在城市>ArrayList<Integer> data2 = new ArrayList<>();data2.add(1);data2.add(2);DataSource<String> text1 = env.fromCollection(data1);DataSource<Integer> text2 = env.fromCollection(data2);CrossOperator.DefaultCross<String, Integer> cross = text1.cross(text2);cross.print();} }First-n 和 SortPartition
/*** TopN*/ import java.util.ArrayList;public class FirstNDemo {public static void main(String[] args) throws Exception{//獲取運行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();data.add(new Tuple2<>(2,"zs"));data.add(new Tuple2<>(4,"ls"));data.add(new Tuple2<>(3,"ww"));data.add(new Tuple2<>(1,"xw"));data.add(new Tuple2<>(1,"aw"));data.add(new Tuple2<>(1,"mw"));DataSource<Tuple2<Integer, String>> text = env.fromCollection(data);//獲取前3條數據,按照數據插入的順序text.first(3).print();System.out.println("==============================");//根據數據中的第一列進行分組,獲取每組的前2個元素text.groupBy(0).first(2).print();System.out.println("==============================");//根據數據中的第一列分組,再根據第二列進行組內排序[升序],獲取每組的前2個元素text.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print();System.out.println("==============================");//不分組,全局排序獲取集合中的前3個元素,針對第一個元素升序,第二個元素倒序text.sortPartition(0,Order.ASCENDING).sortPartition(1,Order.DESCENDING).first(3).print();} }partition
/*** HashPartition** RangePartition*/ public class HashRangePartitionDemo {public static void main(String[] args) throws Exception{//獲取運行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();data.add(new Tuple2<>(1,"hello1"));data.add(new Tuple2<>(2,"hello2"));data.add(new Tuple2<>(2,"hello3"));data.add(new Tuple2<>(3,"hello4"));data.add(new Tuple2<>(3,"hello5"));data.add(new Tuple2<>(3,"hello6"));data.add(new Tuple2<>(4,"hello7"));data.add(new Tuple2<>(4,"hello8"));data.add(new Tuple2<>(4,"hello9"));data.add(new Tuple2<>(4,"hello10"));data.add(new Tuple2<>(5,"hello11"));data.add(new Tuple2<>(5,"hello12"));data.add(new Tuple2<>(5,"hello13"));data.add(new Tuple2<>(5,"hello14"));data.add(new Tuple2<>(5,"hello15"));data.add(new Tuple2<>(6,"hello16"));data.add(new Tuple2<>(6,"hello17"));data.add(new Tuple2<>(6,"hello18"));data.add(new Tuple2<>(6,"hello19"));data.add(new Tuple2<>(6,"hello20"));data.add(new Tuple2<>(6,"hello21"));DataSource<Tuple2<Integer, String>> text = env.fromCollection(data);/*text.partitionByHash(0).mapPartition(new MapPartitionFunction<Tuple2<Integer,String>, Tuple2<Integer,String>>() {@Overridepublic void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {Iterator<Tuple2<Integer, String>> it = values.iterator();while (it.hasNext()){Tuple2<Integer, String> next = it.next();System.out.println("當前線程id:"+Thread.currentThread().getId()+","+next);}}}).print();*/text.partitionByRange(0).mapPartition(new MapPartitionFunction<Tuple2<Integer,String>, Tuple2<Integer,String>>() {@Overridepublic void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {Iterator<Tuple2<Integer, String>> it = values.iterator();while (it.hasNext()){Tuple2<Integer, String> next = it.next();System.out.println("當前線程id:"+Thread.currentThread().getId()+","+next);}}}).print();} } /*** broadcast廣播變量* */ public class BroadCastDemo {public static void main(String[] args) throws Exception{//獲取運行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//1:準備需要廣播的數據ArrayList<Tuple2<String, Integer>> broadData = new ArrayList<>();broadData.add(new Tuple2<>("zs",18));broadData.add(new Tuple2<>("ls",20));broadData.add(new Tuple2<>("ww",17));DataSet<Tuple2<String, Integer>> tupleData = env.fromCollection(broadData);//1.1:處理需要廣播的數據,把數據集轉換成map類型,map中的key就是用戶姓名,value就是用戶年齡DataSet<HashMap<String, Integer>> toBroadcast = tupleData.map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {@Overridepublic HashMap<String, Integer> map(Tuple2<String, Integer> value) throws Exception {HashMap<String, Integer> res = new HashMap<>();res.put(value.f0, value.f1);return res;}});//源數據DataSource<String> data = env.fromElements("zs", "ls", "ww");//注意:在這里需要使用到RichMapFunction獲取廣播變量DataSet<String> result = data.map(new RichMapFunction<String, String>() {List<HashMap<String, Integer>> broadCastMap = new ArrayList<HashMap<String, Integer>>();HashMap<String, Integer> allMap = new HashMap<String, Integer>();/*** 這個方法只會執行一次* 可以在這里實現一些初始化的功能* 所以,就可以在open方法中獲取廣播變量數據*/@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);//3:獲取廣播數據this.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName");for (HashMap map : broadCastMap) {allMap.putAll(map);}}@Overridepublic String map(String value) throws Exception {Integer age = allMap.get(value);return value + "," + age;}}).withBroadcastSet(toBroadcast, "broadCastMapName");//2:執行廣播數據的操作result.print();}} /*** 計數器*/ public class CounterDemo {public static void main(String[] args) throws Exception{//獲取運行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSource<String> data = env.fromElements("a", "b", "c", "d");DataSet<String> result = data.map(new RichMapFunction<String, String>() {//1:創建累加器private IntCounter numLines = new IntCounter();@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);//2:注冊累加器getRuntimeContext().addAccumulator("num-lines",this.numLines);}//int sum = 0;@Overridepublic String map(String value) throws Exception {//如果并行度為1,使用普通的累加求和即可,但是設置多個并行度,則普通的累加求和結果就不準了//sum++;//System.out.println("sum:"+sum);this.numLines.add(1);return value;}}).setParallelism(8);//如果要獲取counter的值,只能是任務//result.print();result.writeAsText("d:\\data\\mycounter");JobExecutionResult jobResult = env.execute("counter");//3:獲取累加器int num = jobResult.getAccumulatorResult("num-lines");System.out.println("num:"+num);} }總結
以上是生活随笔為你收集整理的flink常见算子的一些操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【java机器学习】支持向量机之拉格朗日
- 下一篇: java stream filter m