apache beam 入门之beam-sql
目錄:apache beam 個(gè)人使用經(jīng)驗(yàn)總結(jié)目錄和入門指導(dǎo)(Java)
就像spark-sql 一樣,apache beam也有beam-sql, 就是能夠輸入1張模擬數(shù)據(jù)表, 然后通過sql語句來實(shí)現(xiàn)計(jì)算。
舉個(gè)例子,我們不希望在數(shù)據(jù)源端執(zhí)行 select * from tableA left join talbeB on tableA.id = tableB.id where tableA.id < 10 這句話, 因?yàn)檫@樣很占用數(shù)據(jù)源端的計(jì)算資源(尤其是hive這類數(shù)倉), 所以會(huì)希望把tableA和tableB的所有數(shù)據(jù)讀入到自己的計(jì)算集群中,然后在beam里去執(zhí)行這一句sql。
如何創(chuàng)建模擬表
首先要?jiǎng)?chuàng)建1個(gè)表的schema(模式),或者說叫做表結(jié)構(gòu)。 beam的schema采用builder模式進(jìn)行建立。
Schema tableASchema = Schema.builder() .addInt32Field("id") .addStringField("name") .build();接著同樣用builder模式去創(chuàng)建1條表的行記錄
Row row1 = Row.withSchema(tableASchema) .addValue(1) .addValue("tony") .build();注意addValue的時(shí)候, 要按照schma里添加字段的順序和類型來添加數(shù)據(jù), 不要添加錯(cuò)了。
我們多造幾條數(shù)據(jù)
造好后,用Create進(jìn)行模擬表的創(chuàng)建,主要不要遺漏setRowSchema,否則會(huì)無法識(shí)別編碼。
PCollection<Row> pTableA = pipeline.apply(Create.of(row1, row2, row3)) .setRowSchema(tableASchema);這時(shí)候在pipeline運(yùn)行時(shí),pTableA數(shù)據(jù)集里就會(huì)塞進(jìn)3行記錄,但是現(xiàn)在還差1個(gè)表名。因此需要把數(shù)據(jù)集pTable變成PCollectionTuple
PCollectionTuple tupleTableA = PCollectionTuple.of(new TupleTag<>("tableA"), pTableA);這時(shí)候"tableA"這個(gè)名字就通過new TupleTag賦予了pTableA,此時(shí)tupleTableA可以理解為1張模擬表了。
執(zhí)行beam-sql
執(zhí)行beam-sql前,要先添加如下依賴:
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-extensions-sql</artifactId> <version>${beam.version}</version> </dependency>接著用SQLTransform這個(gè)sdk即可實(shí)現(xiàn)beam-sql
// 執(zhí)行bema-sql PCollection<Row> afterSelectTableA = tupleTableA.apply(SqlTransform.query("select name from tableA where id <= 2"));// 打印結(jié)果 afterSelectTableA.apply(ParDo.of(new PrintStrFn()));pipeline.run().waitUntilFinish();總結(jié)
以上是生活随笔為你收集整理的apache beam 入门之beam-sql的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。