日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

pyspark的rdd直接写入mysql

發(fā)布時間:2023/12/31 数据库 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 pyspark的rdd直接写入mysql 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Google搜索"RDD write into ?mysql"前面5頁得到:[5][6][7][8][9][10]

我們一個個來分析

[1][2][3]讀出的是RDD,寫入的是foreachpartition的方式

[4]寫入的不是spark RDD,而是一個Spark的DataFrame類型的變量

[5]寫入的不是spark RDD,而是Spark RDD轉(zhuǎn)化為DataFrame類型然后寫入

[6][7]寫入的是spark DataFrame

[8]僅僅是代碼bug而已

[9]寫入的是dataframe不是rdd

[10]提到了檢查partition的數(shù)量可以加速rdd的寫入,里面有兩種方式,第一種是RDD寫入,第二種是Data Frame寫入。

結(jié)論:

[1][2][3][10]有用(都是scala語言),其他都沒有用。

[11]提到了針對partition來提高速度。

嘗試Pyspark版本:

import pandas as pd from pyspark.sql import SparkSession from pyspark import SparkContext from pyspark.sql import SQLContext from pymysql import *def map_extract(element):file_path, content = elementyear = file_path[-8:-4]return [(year, i) for i in content.split("\n") if i]spark = SparkSession\.builder\.appName("PythonTest")\.getOrCreate()res = spark.sparkContext.wholeTextFiles('hdfs://Desktop:9000/user/mercury/names',minPartitions=40) \.map(map_extract) \.flatMap(lambda x: x) \.map(lambda x: (x[0], int(x[1].split(',')[2]))) \.reduceByKey(lambda x,y:x+y)def write2mysql(x):conn = connect(host='Desktop', port=3306, database='leaf', user='appleyuchi',password='appleyuchi', charset='utf8')cs1 = conn.cursor() # 執(zhí)行sql語句 #---------------------------------------for item in x:key = str(item[0])num = item[1]# values = (key,num)# print("values=",values)sql = "insert into `spark`(`key`,`num`) values (%s,%s)"cs1.execute(sql,(key,num))# 提交之前的操作,如果之前已經(jīng)執(zhí)行多次的execute,那么就都進行提交conn.commit() #---------------------------------------# 關(guān)閉cursor對象cs1.close()# 關(guān)閉connection對象conn.close()res.foreachPartition(lambda x:write2mysql(x))

數(shù)據(jù)集用的是 :

https://github.com/wesm/pydata-book/tree/2nd-edition/datasets/babynames

如果報錯:

pymysql.err.InternalError: (1049, "Unknown database 'leaf'")

查過存在該數(shù)據(jù)庫還是解決不了的話,參考:

https://gitee.com/appleyuchi/cluster_configuration/blob/master/物理環(huán)境配置流程-必須先看.txt

?

Reference:

[1]RDD從mysql中讀取數(shù)據(jù)和RDD往數(shù)據(jù)庫中存數(shù)據(jù)(代碼完整)

[2]RDD 直接存入MySQL,以及直接讀取MySQL中數(shù)據(jù)(代碼不完整)

[3]spark以rdd方式讀寫mysql

[4]Spark RDD寫入RMDB(Mysql)方法二

[5]How to put data from Spark RDD to Mysql Table

[6]WRITING TO A DATABASE FROM SPARK

[7]save spark rdd into Mysql

[8]Writing from PySpark to MySQL Database

[9]spark 2.1寫入mysql spark 2.1 write to mysql

[10]how to properly save spark rdd result to a mysql database(代碼不完整)

[11]計算質(zhì)數(shù)通過分區(qū)(Partition)提高Spark的運行性能

?

總結(jié)

以上是生活随笔為你收集整理的pyspark的rdd直接写入mysql的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。