日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark mongo java_java及spark2.X连接mongodb3.X单机或集群的方法(带认证及不带认证)...

發(fā)布時(shí)間:2025/3/20 编程问答 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark mongo java_java及spark2.X连接mongodb3.X单机或集群的方法(带认证及不带认证)... 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

首先,我們明確的是訪問Mongos和訪問單機(jī)Mongod并沒有什么區(qū)別。接下來的方法都是既可以訪問mongod又可以訪問Mongos的。

另外,讀作java寫作scala,反正大家都看得懂......大概?

1、不帶認(rèn)證集群的連接方法(JAVAscala):

首先是創(chuàng)建連接的方法,我們先聲明一個(gè)client,然后指定訪問的DB和collection:

private lazy val mongo = new MongoClient("192.168.2.51", 27017)private lazy val db = mongo.getDatabase("test")private lazy val dbColl = db.getCollection("origin2")

然后我們讀取數(shù)據(jù):

import com.mongodb.client.model.Filters.{eq =>eqq}

val docs= dbColl.find(eqq("basiclabel.procedure", "second")).iterator()

額。。上面那段代碼是帶filter過濾的讀取數(shù)據(jù)。首先Import?com.mongodb.client.model.Filters.eq并把eq重命名為eqq,然后通過dbColl.find(Bson)方法讀取指定數(shù)據(jù)。剩下的就是正常的迭代器的使用方法了,docs獲取出來的數(shù)據(jù)是Iterator[Document]。

然后我們更新數(shù)據(jù):

dbColl.updateOne(eqq("_id", x.get("_id")), set("segdata", fenduan(str, name)))

上面這段代碼是說找到_id對(duì)應(yīng)的數(shù)據(jù),并將其中一個(gè)字段set為一個(gè)新的值,這個(gè)值可以為Document,String,Int,List等一系列數(shù)據(jù)結(jié)構(gòu)。我這里fenduan方法返回的是一個(gè)Document,做了一層嵌套。

至于插入數(shù)據(jù)更為簡單:

dbColl.insertOne(doc)

2、不帶認(rèn)證的spark讀取方法(scala,理直氣壯)

兩種方式,其一是在創(chuàng)建sparksession的時(shí)候(SparkContext可以使用第二種方法,醒醒兄弟,2017年了),直接指定"spark.mongodb.input.uri"。然后使用正常的MongoSpark來讀取數(shù)據(jù)。(pipeline里面是過濾條件,愿意嘗試的各位可以自己試試filter下的其他方法)。使用rdd是因?yàn)閞dd更適合進(jìn)行map和flatmap等一系列精細(xì)的轉(zhuǎn)換操作,如果只需要讀數(shù)據(jù),可以使用MongoSpark.read(spark)方法,直接獲取DataFrameReader。

val spark =SparkSession.builder()

.master("spark://192.168.2.51:7077")

.config(new SparkConf().setJars(Array("hdfs://192.168.2.51:9000/mongolib/mongo-spark-connector_2.11-2.0.0.jar","hdfs://192.168.2.51:9000/mongolib/bson-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongo-java-driver-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongodb-driver-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongodb-driver-core-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/commons-io-2.5.jar","hdfs://192.168.2.51:9000/segwithorigin2.jar")))

.config("spark.cores.max", 80)

.config("spark.executor.cores", 16)

.config("spark.executor.memory", "32g")

.config("spark.mongodb.input.uri", "mongodb://192.168.2.51:27017/test.origin2")//.config("spark.mongodb.output.uri", "mongodb://192.168.12.161:27017/test.origin2")

.getOrCreate()

val rdd = MongoSpark.builder().sparkSession(spark).pipeline(Seq(`match`(eqq("basiclabel.procedure", "second")))).build.toRDD()

第二種方式也較為簡單,創(chuàng)建一個(gè)ReadConfig,這個(gè)是connector提供的一個(gè)單例類,可以設(shè)置很多參數(shù),例如(此時(shí)不必指定"spark.mongodb.input.uri"),如下所示是通過sparkcontext和通過sparksession兩種方式讀取數(shù)據(jù)的方法:

val readConfig =ReadConfig(Map("uri" -> "mongodb://192.168.2.48:27017/","database" -> "test","collection" -> "test"))

val r2=MongoSpark.load(spark, readConfig).rdd//val r2 = MongoSpark.load(spark.sparkContext, readConfig)

3、帶認(rèn)證的Java讀取方法:

帶認(rèn)證的需要先創(chuàng)建一個(gè)MongoURI,在URI里把用戶名,密碼和認(rèn)證庫都指定清楚。這種方法通用性比較強(qiáng),因?yàn)閟park也這么用,如果使用其他方式認(rèn)證要么是必須使用庫等于認(rèn)證庫,要么是沒有通用性。這種方法可以在admin認(rèn)證然后去讀test的數(shù)據(jù),就很好。

//帶認(rèn)證的需要先創(chuàng)建一個(gè)MongoURI,在URI里把用戶名,密碼和認(rèn)證庫都指定清楚,至于為什么需要指定庫建議看上一篇博客

val mongoURI = new MongoClientURI("mongodb://gaoze:gaolaoban@192.168.2.48:27017/?authSource=admin")//val mongoURI = new MongoClientURI("mongodb://192.168.2.48:27017/");

lazy val mongo = newMongoClient(mongoURI)private lazy val db = mongo.getDatabase("test")private lazy val dbColl = db.getCollection("test")

//然后和1一樣

4、帶認(rèn)證的Spark讀取方法:

同3一樣,在URI里加入用戶名密碼和庫就行了:

val spark =SparkSession.builder()

.master("spark://192.168.2.51:7077")

.config(new SparkConf().setJars(Array("hdfs://192.168.2.51:9000/mongolib/mongo-spark-connector_2.11-2.0.0.jar","hdfs://192.168.2.51:9000/mongolib/bson-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongo-java-driver-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongodb-driver-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongodb-driver-core-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/commons-io-2.5.jar","hdfs://192.168.2.51:9000/segwithorigin2.jar")))

.config("spark.cores.max", 80)

.config("spark.executor.cores", 16)

.config("spark.executor.memory", "32g")

//這里這個(gè)配置項(xiàng)指定了用戶名gaoze,密碼gaolaoban,認(rèn)證庫admin

.config("spark.mongodb.input.uri", "mongodb://gaoze:gaolaoban@192.168.2.51:27017/test.origin2?authSource=admin").getOrCreate()

val rdd= MongoSpark.builder().sparkSession(spark).pipeline(Seq(`match`(eqq("basiclabel.procedure", "second")))).build.toRDD()

或者:

//這里指定了用戶名rw,密碼1,認(rèn)證庫test

val readConfig =ReadConfig(Map("uri" -> "mongodb://rw:1@192.168.2.48:27017/?authSource=test","database" -> "test","collection" -> "test"))

val rdd = MongoSpark.builder().sparkSession(spark).readConfig(readConfig).build().toRDD()

//val r2 = MongoSpark.load(spark.sparkContext, readConfig)

總結(jié)

以上是生活随笔為你收集整理的spark mongo java_java及spark2.X连接mongodb3.X单机或集群的方法(带认证及不带认证)...的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。