日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

spark mongo java_java及spark2.X连接mongodb3.X单机或集群的方法(带认证及不带认证)...

發布時間:2025/3/20 46 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark mongo java_java及spark2.X连接mongodb3.X单机或集群的方法(带认证及不带认证)... 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

首先,我們明確的是訪問Mongos和訪問單機Mongod并沒有什么區別。接下來的方法都是既可以訪問mongod又可以訪問Mongos的。

另外,讀作java寫作scala,反正大家都看得懂......大概?

1、不帶認證集群的連接方法(JAVAscala):

首先是創建連接的方法,我們先聲明一個client,然后指定訪問的DB和collection:

private lazy val mongo = new MongoClient("192.168.2.51", 27017)private lazy val db = mongo.getDatabase("test")private lazy val dbColl = db.getCollection("origin2")

然后我們讀取數據:

import com.mongodb.client.model.Filters.{eq =>eqq}

val docs= dbColl.find(eqq("basiclabel.procedure", "second")).iterator()

額。。上面那段代碼是帶filter過濾的讀取數據。首先Import?com.mongodb.client.model.Filters.eq并把eq重命名為eqq,然后通過dbColl.find(Bson)方法讀取指定數據。剩下的就是正常的迭代器的使用方法了,docs獲取出來的數據是Iterator[Document]。

然后我們更新數據:

dbColl.updateOne(eqq("_id", x.get("_id")), set("segdata", fenduan(str, name)))

上面這段代碼是說找到_id對應的數據,并將其中一個字段set為一個新的值,這個值可以為Document,String,Int,List等一系列數據結構。我這里fenduan方法返回的是一個Document,做了一層嵌套。

至于插入數據更為簡單:

dbColl.insertOne(doc)

2、不帶認證的spark讀取方法(scala,理直氣壯)

兩種方式,其一是在創建sparksession的時候(SparkContext可以使用第二種方法,醒醒兄弟,2017年了),直接指定"spark.mongodb.input.uri"。然后使用正常的MongoSpark來讀取數據。(pipeline里面是過濾條件,愿意嘗試的各位可以自己試試filter下的其他方法)。使用rdd是因為rdd更適合進行map和flatmap等一系列精細的轉換操作,如果只需要讀數據,可以使用MongoSpark.read(spark)方法,直接獲取DataFrameReader。

val spark =SparkSession.builder()

.master("spark://192.168.2.51:7077")

.config(new SparkConf().setJars(Array("hdfs://192.168.2.51:9000/mongolib/mongo-spark-connector_2.11-2.0.0.jar","hdfs://192.168.2.51:9000/mongolib/bson-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongo-java-driver-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongodb-driver-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongodb-driver-core-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/commons-io-2.5.jar","hdfs://192.168.2.51:9000/segwithorigin2.jar")))

.config("spark.cores.max", 80)

.config("spark.executor.cores", 16)

.config("spark.executor.memory", "32g")

.config("spark.mongodb.input.uri", "mongodb://192.168.2.51:27017/test.origin2")//.config("spark.mongodb.output.uri", "mongodb://192.168.12.161:27017/test.origin2")

.getOrCreate()

val rdd = MongoSpark.builder().sparkSession(spark).pipeline(Seq(`match`(eqq("basiclabel.procedure", "second")))).build.toRDD()

第二種方式也較為簡單,創建一個ReadConfig,這個是connector提供的一個單例類,可以設置很多參數,例如(此時不必指定"spark.mongodb.input.uri"),如下所示是通過sparkcontext和通過sparksession兩種方式讀取數據的方法:

val readConfig =ReadConfig(Map("uri" -> "mongodb://192.168.2.48:27017/","database" -> "test","collection" -> "test"))

val r2=MongoSpark.load(spark, readConfig).rdd//val r2 = MongoSpark.load(spark.sparkContext, readConfig)

3、帶認證的Java讀取方法:

帶認證的需要先創建一個MongoURI,在URI里把用戶名,密碼和認證庫都指定清楚。這種方法通用性比較強,因為spark也這么用,如果使用其他方式認證要么是必須使用庫等于認證庫,要么是沒有通用性。這種方法可以在admin認證然后去讀test的數據,就很好。

//帶認證的需要先創建一個MongoURI,在URI里把用戶名,密碼和認證庫都指定清楚,至于為什么需要指定庫建議看上一篇博客

val mongoURI = new MongoClientURI("mongodb://gaoze:gaolaoban@192.168.2.48:27017/?authSource=admin")//val mongoURI = new MongoClientURI("mongodb://192.168.2.48:27017/");

lazy val mongo = newMongoClient(mongoURI)private lazy val db = mongo.getDatabase("test")private lazy val dbColl = db.getCollection("test")

//然后和1一樣

4、帶認證的Spark讀取方法:

同3一樣,在URI里加入用戶名密碼和庫就行了:

val spark =SparkSession.builder()

.master("spark://192.168.2.51:7077")

.config(new SparkConf().setJars(Array("hdfs://192.168.2.51:9000/mongolib/mongo-spark-connector_2.11-2.0.0.jar","hdfs://192.168.2.51:9000/mongolib/bson-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongo-java-driver-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongodb-driver-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongodb-driver-core-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/commons-io-2.5.jar","hdfs://192.168.2.51:9000/segwithorigin2.jar")))

.config("spark.cores.max", 80)

.config("spark.executor.cores", 16)

.config("spark.executor.memory", "32g")

//這里這個配置項指定了用戶名gaoze,密碼gaolaoban,認證庫admin

.config("spark.mongodb.input.uri", "mongodb://gaoze:gaolaoban@192.168.2.51:27017/test.origin2?authSource=admin").getOrCreate()

val rdd= MongoSpark.builder().sparkSession(spark).pipeline(Seq(`match`(eqq("basiclabel.procedure", "second")))).build.toRDD()

或者:

//這里指定了用戶名rw,密碼1,認證庫test

val readConfig =ReadConfig(Map("uri" -> "mongodb://rw:1@192.168.2.48:27017/?authSource=test","database" -> "test","collection" -> "test"))

val rdd = MongoSpark.builder().sparkSession(spark).readConfig(readConfig).build().toRDD()

//val r2 = MongoSpark.load(spark.sparkContext, readConfig)

總結

以上是生活随笔為你收集整理的spark mongo java_java及spark2.X连接mongodb3.X单机或集群的方法(带认证及不带认证)...的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。