日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > java >内容正文

java

引入ReactiveInflux:用于Scala和Java的无阻塞InfluxDB驱动程序,支持Apache Spark

發(fā)布時(shí)間:2023/12/3 java 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 引入ReactiveInflux:用于Scala和Java的无阻塞InfluxDB驱动程序,支持Apache Spark 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

我很高興宣布Pygmalios開(kāi)發(fā)的ReactiveInflux的第一個(gè)發(fā)行版。 InfluxDB錯(cuò)過(guò)了Scala和Java的非阻塞驅(qū)動(dòng)程序。 不變性,可測(cè)試性和可擴(kuò)展性是ReactiveInflux的關(guān)鍵功能。 加上對(duì)Apache Spark的支持,它是首選武器。

  • https://github.com/pygmalios/reactiveinflux

它在內(nèi)部使用Play Framework WS API ,它是基于Async Http Client構(gòu)建的豐富的異步HTTP客戶端 。

特征

  • Scala的異步(非阻塞)接口
  • Scala和Java的同步(阻塞)接口
  • 同時(shí)支持Spark和Spark流
  • 不變性
  • 可測(cè)性
  • 可擴(kuò)展性

兼容性

  • InfluxDB 0.11、0.10和0.9(甚至可能更舊)
  • Scala 2.11和2.10
  • Java 7及以上
  • Apache Spark 1.4及更高版本

Scala異步(非阻塞)示例

val result = withInfluxDb(new URI("http://localhost:8086/"), "example1") { db =>db.create().flatMap { _ =>val point = Point(time = DateTime.now(),measurement = "measurement1",tags = Map("t1" -> "A", "t2" -> "B"),fields = Map("f1" -> 10.3,"f2" -> "x","f3" -> -1,"f4" -> true))db.write(point).flatMap { _ =>db.query("SELECT * FROM measurement1").flatMap { queryResult =>println(queryResult.row.mkString)db.drop()}}} }

Scala同步(阻塞)示例

implicit val awaitAtMost = 10.seconds syncInfluxDb(new URI("http://localhost:8086/"), "example1") { db =>db.create()val point = Point(time = DateTime.now(),measurement = "measurement1",tags = Map("t1" -> "A", "t2" -> "B"),fields = Map("f1" -> 10.3,"f2" -> "x","f3" -> -1,"f4" -> true))db.write(point)val queryResult = db.query("SELECT * FROM measurement1")println(queryResult.row.mkString)db.drop() }

Java同步(阻塞)示例

// Use Influx at the provided URL ReactiveInfluxConfig config = new JavaReactiveInfluxConfig(new URI("http://localhost:8086/")); long awaitAtMostMillis = 30000; try (SyncReactiveInflux reactiveInflux = new JavaSyncReactiveInflux(config, awaitAtMostMillis)) {SyncReactiveInfluxDb db = reactiveInflux.database("example1");db.create();Map tags = new HashMap<>();tags.put("t1", "A");tags.put("t2", "B");Map fields = new HashMap<>();fields.put("f1", 10.3);fields.put("f2", "x");fields.put("f3", -1);fields.put("f4", true);Point point = new JavaPoint(DateTime.now(),"measurement1",tags,fields);db.write(point);QueryResult queryResult = db.query("SELECT * FROM measurement1");System.out.println(queryResult.getRow().mkString());db.drop(); }

Apache Spark Scala示例

val point1 = Point(time = DateTime.now(),measurement = "measurement1",tags = Map("tagKey1" -> "tagValue1","tagKey2" -> "tagValue2"),fields = Map("fieldKey1" -> "fieldValue1","fieldKey2" -> 10.7) ) sc.parallelize(Seq(point1)).saveToInflux()

Apache Spark流Scala示例

val point1 = Point(time = DateTime.now(),measurement = "measurement1",tags = Map("tagKey1" -> "tagValue1","tagKey2" -> "tagValue2"),fields = Map("fieldKey1" -> "fieldValue1","fieldKey2" -> 10.7) ) val queue = new mutable.Queue[RDD[Point]] queue.enqueue(ssc.sparkContext.parallelize(Seq(point1))) ssc.queueStream(queue).saveToInflux()

Apache Spark Java示例

... SparkInflux sparkInflux = new SparkInflux("example", 1000); sparkInflux.saveToInflux(sc.parallelize(Collections.singletonList(point)));

Apache Spark流Java示例

... SparkInflux sparkInflux = new SparkInflux("example", 1000); Queue> queue = new LinkedList<>(); queue.add(ssc.sparkContext().parallelize(Collections.singletonList(point))); sparkInflux.saveToInflux(ssc.queueStream(queue));

斯洛伐克布拉迪斯拉發(fā)的高科技初創(chuàng)公司投資于尖端技術(shù),以確保實(shí)時(shí)預(yù)測(cè)零售分析領(lǐng)域的快速增長(zhǎng)。

翻譯自: https://www.javacodegeeks.com/2016/04/introducing-reactiveinflux-non-blocking-influxdb-driver-scala-java-supporting-apache-spark.html

總結(jié)

以上是生活随笔為你收集整理的引入ReactiveInflux:用于Scala和Java的无阻塞InfluxDB驱动程序,支持Apache Spark的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。