日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

聊聊flink Table的OrderBy及Limit

發(fā)布時(shí)間:2023/11/29 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 聊聊flink Table的OrderBy及Limit 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

本文主要研究一下flink Table的OrderBy及Limit

實(shí)例

Table in = tableEnv.fromDataSet(ds, "a, b, c"); Table result = in.orderBy("a.asc");Table in = tableEnv.fromDataSet(ds, "a, b, c");// returns the first 5 records from the sorted result Table result1 = in.orderBy("a.asc").fetch(5); // skips the first 3 records and returns all following records from the sorted result Table result2 = in.orderBy("a.asc").offset(3);// skips the first 10 records and returns the next 5 records from the sorted result Table result3 = in.orderBy("a.asc").offset(10).fetch(5);
  • orderBy方法類似sql的order by;limit則由offset及fetch兩個(gè)方法構(gòu)成,類似sql的offset及fetch

Table

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

class Table(private[flink] val tableEnv: TableEnvironment,private[flink] val logicalPlan: LogicalNode) {//......def orderBy(fields: String): Table = {val parsedFields = ExpressionParser.parseExpressionList(fields)orderBy(parsedFields: _*)}def orderBy(fields: Expression*): Table = {val order: Seq[Ordering] = fields.map {case o: Ordering => ocase e => Asc(e)}new Table(tableEnv, Sort(order, logicalPlan).validate(tableEnv))}def offset(offset: Int): Table = {new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv))}def fetch(fetch: Int): Table = {if (fetch < 0) {throw new ValidationException("FETCH count must be equal or larger than 0.")}this.logicalPlan match {case Limit(o, -1, c) =>// replace LIMIT without FETCH by LIMIT with FETCHnew Table(tableEnv, Limit(o, fetch, c).validate(tableEnv))case Limit(_, _, _) =>throw new ValidationException("FETCH is already defined.")case _ =>new Table(tableEnv, Limit(0, fetch, logicalPlan).validate(tableEnv))}}//...... }
  • Table的orderBy方法,支持String或Expression類型的參數(shù),其中String類型最終是轉(zhuǎn)為Expression類型;orderBy方法最后使用Sort重新創(chuàng)建了Table;offset及fetch方法,使用Limit重新創(chuàng)建了Table(offset方法創(chuàng)建的Limit其fetch為-1;fetch方法如果之前沒有指定offset則創(chuàng)建的Limit的offset為0)

Sort

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

case class Sort(order: Seq[Ordering], child: LogicalNode) extends UnaryNode {override def output: Seq[Attribute] = child.outputoverride protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {child.construct(relBuilder)relBuilder.sort(order.map(_.toRexNode(relBuilder)).asJava)}override def validate(tableEnv: TableEnvironment): LogicalNode = {if (tableEnv.isInstanceOf[StreamTableEnvironment]) {failValidation(s"Sort on stream tables is currently not supported.")}super.validate(tableEnv)} }
  • Sort繼承了UnaryNode,它的構(gòu)造器接收Set類型的Ordering,其construct方法使用relBuilder.sort來構(gòu)建sort條件

Ordering

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/expressions/ordering.scala

abstract class Ordering extends UnaryExpression {override private[flink] def validateInput(): ValidationResult = {if (!child.isInstanceOf[NamedExpression]) {ValidationFailure(s"Sort should only based on field reference")} else {ValidationSuccess}} }case class Asc(child: Expression) extends Ordering {override def toString: String = s"($child).asc"override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {child.toRexNode}override private[flink] def resultType: TypeInformation[_] = child.resultType }case class Desc(child: Expression) extends Ordering {override def toString: String = s"($child).desc"override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {relBuilder.desc(child.toRexNode)}override private[flink] def resultType: TypeInformation[_] = child.resultType }
  • Ordering是一個(gè)抽象類,它有Asc及Desc兩個(gè)子類

Limit

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

case class Limit(offset: Int, fetch: Int = -1, child: LogicalNode) extends UnaryNode {override def output: Seq[Attribute] = child.outputoverride protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {child.construct(relBuilder)relBuilder.limit(offset, fetch)}override def validate(tableEnv: TableEnvironment): LogicalNode = {if (tableEnv.isInstanceOf[StreamTableEnvironment]) {failValidation(s"Limit on stream tables is currently not supported.")}if (!child.isInstanceOf[Sort]) {failValidation(s"Limit operator must be preceded by an OrderBy operator.")}if (offset < 0) {failValidation(s"Offset should be greater than or equal to zero.")}super.validate(tableEnv)} }
  • Limit繼承了UnaryNode,它的構(gòu)造器接收offset及fetch參數(shù),它的construct方法通過relBuilder.limit來設(shè)置offset及fetch

小結(jié)

  • Table的orderBy方法類似sql的order by;limit則由offset及fetch兩個(gè)方法構(gòu)成,類似sql的offset及fetch
  • Table的orderBy方法,支持String或Expression類型的參數(shù),其中String類型最終是轉(zhuǎn)為Expression類型;orderBy方法最后使用Sort重新創(chuàng)建了Table;offset及fetch方法,使用Limit重新創(chuàng)建了Table(offset方法創(chuàng)建的Limit其fetch為-1;fetch方法如果之前沒有指定offset則創(chuàng)建的Limit的offset為0)
  • Sort繼承了UnaryNode,它的構(gòu)造器接收Set類型的Ordering,其construct方法使用relBuilder.sort來構(gòu)建sort條件;Ordering是一個(gè)抽象類,它有Asc及Desc兩個(gè)子類;Limit繼承了UnaryNode,它的構(gòu)造器接收offset及fetch參數(shù),它的construct方法通過relBuilder.limit來設(shè)置offset及fetch

doc

  • OrderBy, Offset & Fetch

總結(jié)

以上是生活随笔為你收集整理的聊聊flink Table的OrderBy及Limit的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。