pyspark汇总小结
20220402
Spark報(bào)Total size of serialized results of 12189 tasks is bigger than spark.driver.maxResultSize
https://blog.csdn.net/qq_27600723/article/details/107023574
pyspark讀寫iceberg# code:utf-8
import findspark
findspark.init(r"D:\Python37\Lib\site-packages\pyspark")
這里要指定pyspark的路徑,如果是服務(wù)器的話最好用spark所在的pyspark路徑
import os
java8_location = r'D:\Java\jdk1.8.0_301/' # 設(shè)置你自己的路徑
os.environ['JAVA_HOME'] = java8_location
from pyspark.sql import SparkSessiondef get_spark():# pyspark 讀iceberg表spark = SparkSession.builder.getOrCreate()spark.conf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")spark.conf.set("spark.sql.catalog.iceberg.type", "hive")spark.conf.set("spark.sql.catalog.iceberg.uri", "thrift://192.168.1.54:9083")不同的目標(biāo)地址,不同的服務(wù)器集群,要拷貝對(duì)應(yīng)的兩個(gè)hive文件到當(dāng)?shù)乜蛻舳说膒yspar conf文件夾下return sparkif __name__ == '__main__':spark = get_spark()pdf = spark.sql("select shangpgg from iceberg.test.end_spec limit 10")spark.sql("insert into iceberg.test.end_spec values ('aa','bb')")pdf.show()print()
1. 在pyspark下新建conf文件夾,把iceberg下的兩個(gè)hive配置文件
放在下面
hdfs-site.xml
hive-site.xm
2. iceberg-spark3-runtime-0.13.1.jar 把這個(gè)文件放在pyspark的jars文件夾
Failed to open input stream for file: hdfs://ns1/warehouse/test.db/end_spec/metadata/00025-73e8d58b-c4f1-4c81-b0a8-f1a8a12090b1.metadata.json
org.apache.iceberg.exceptions.RuntimeIOException: Failed to open input stream for file: hdfs://ns1/warehouse/test.db/end_spec/metadata/00025-73e8d58b-c4f1-4c81-b0a8-f1a8a12090b1.metadata.json沒找到hive的兩個(gè)配置文件,需要在init里面指定pyspark的路徑即可解決
# findspark.init(r"D:\Python37\Lib\site-packages\pyspark")
od_all = spark.createDataFrame(od)od_all.createOrReplaceTempView('od_all')od_duplicate = spark.sql("select distinct user_id,goods_id,category_second_id from od_all;")od_duplicate.createOrReplaceTempView('od_duplicate')od_goods_group = spark.sql(" select user_id,count(goods_id) goods_nums_total from od_duplicate group by user_id ;")
sql語句中所牽扯的表,需要createOrReplaceTempView創(chuàng)建
執(zhí)行sql時(shí)出現(xiàn)錯(cuò)誤 extraneous input ';' expecting EOF near '<EOF>'
https://blog.csdn.net/xieganyu3460/article/details/83055935
https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/types.html?highlight=type
pyspark數(shù)據(jù)類型
TypeError: field id: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.LongType'>https://blog.csdn.net/weixin_40983094/article/details/115630358
# code:utf-8
from pathlib import Pathimport pandas as pd
from pyspark.ml.fpm import FPGrowth
import datetime
import platform
import os
import warnings
warnings.filterwarnings("ignore")
from utils_ import usetime,log_generate
from param_config import configlogger = log_generate(config.log["name"], config.log["date"])sys = platform.system()
if sys == "Windows":PATH = os.path.abspath(str(Path("").absolute())) + "/"
else:PATH = "/home/guanlian_algo_confirm3/"os.environ["JAVA_HOME"] = r"D:\Java\jdk1.8.0_301"t1 = datetime.datetime.now()@usetime
def calculate_fpgrowth(spark, data, total_nums):data = spark.createDataFrame(data)data.createOrReplaceTempView("all_data")part_data = spark.sql("select * from all_data ")all_record = part_data.select("goods_huizong") # 篩選多列all_record.show(5)def transform_to_list(col):per_row = col.split("|") # 傳入的列數(shù)據(jù),自動(dòng)對(duì)每行數(shù)據(jù)進(jìn)行處理return per_rowall_record = all_record.rdd.map(lambda row: (row["goods_huizong"], transform_to_list(row["goods_huizong"])))all_record = spark.createDataFrame(all_record, ["goods_huizong", "goods_huizong_list"])all_record.show(5)all_record = all_record.select("goods_huizong_list")all_record = all_record.withColumnRenamed("goods_huizong_list", "items")logger.debug()("總數(shù)據(jù)條數(shù) {}".format(total_nums))fp = FPGrowth(minSupport=0.0001, minConfidence=0.8)fpm = fp.fit(all_record) # 模型擬合fpm.freqItemsets.show(5) # 在控制臺(tái)顯示前五條頻繁項(xiàng)集fp_count = fpm.freqItemsets.count()if fp_count == 0:return pd.DataFrame()logger.debug()("*" * 100)logger.debug()("頻繁項(xiàng)條數(shù) {} ".format(fp_count))ass_rule = fpm.associationRules # 強(qiáng)關(guān)聯(lián)規(guī)則ass_rule.show()rule_nums = ass_rule.count()if rule_nums == 0:return pd.DataFrame()logger.debug()("規(guī)則條數(shù) {} ".format(rule_nums))ass_rule = ass_rule.select(["antecedent", "consequent", "confidence"])ass_rule.show(5)ass_rule_df = ass_rule.toPandas()ass_rule_df["antecedent_str"] = ass_rule_df["antecedent"].apply(lambda x: str(x))ass_rule_df.sort_values(["antecedent_str", "confidence"], ascending=[True, False], inplace=True)t2 = datetime.datetime.now()logger.debug()("spent ts:", t2 - t1)return ass_rule_df簡(jiǎn)單實(shí)例
20220314
代碼設(shè)置參數(shù)比命令行傳參數(shù)的級(jí)別高,最終用的還是代碼里面設(shè)置的參數(shù)
py4j.protocol.Py4JJavaError: An error occurred while calling o24.sql.
: org.apache.spark.SparkException: Cannot find catalog plugin class for catalog 'iceberg': org.apache.iceberg.spark.SparkCatalog
需要去iceberg官網(wǎng)下載一個(gè) iceberg-spark-runtime-3.2_2.12-0.13.1.jar包
放在spark的jars下面https://iceberg.apache.org/docs/latest/getting-started/
# code:utf-8
import findspark
import pandas as pd
findspark.init()
from datetime import datetime, date
import re
from pyspark.sql import SparkSession
# from out_udf import outer_udf
# /home/spark-3.1.2-bin-hadoop3.2/bin/spark-submit \
# --master local --py-files /root/bin/python_job/pyspark/out_udf.py hello_spark.py
# from pyspark.sql.functions import pandas_udf
spark = SparkSession.builder.getOrCreate()df = spark.createDataFrame([(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df.createOrReplaceTempView("t1")# UDF- 匿名函數(shù)
spark.udf.register('xtrim', lambda x: re.sub('[ \n\r\t]', '', x), 'string')# UDF 顯式函數(shù)
def xtrim2(record):return re.sub('[ \n\r\t]', '', record)# pyspark 讀iceberg表
spark.conf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.iceberg.type", "hive")
spark.conf.set("spark.sql.catalog.iceberg.uri", "thrift://192.168.1.54:9083")spark.udf.register('xtrim2', xtrim2, 'string')# spark.udf.register('outer_udf', outer_udf)if __name__ == '__main__':df.show()spark.sql("select * from t1").show()spark.sql("select xtrim2('測(cè)試 數(shù)據(jù) 你好') ").show()spark.sql("use iceberg").show()spark.sql("show databases").show()pyspark讀取iceberg
from datetime import datetime, date
import re
from pyspark.sql import SparkSession
from out_udf import outer_udf
# /home/spark-3.1.2-bin-hadoop3.2/bin/spark-submit \
# --master local --py-files /root/bin/python_job/pyspark/out_udf.py hello_spark.py
# from pyspark.sql.functions import pandas_udfspark = SparkSession.builder.getOrCreate()df = spark.createDataFrame([(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df.createOrReplaceTempView("t1")# UDF- 匿名函數(shù)
spark.udf.register('xtrim', lambda x: re.sub('[ \n\r\t]', '', x), 'string')# UDF 顯式函數(shù)
def xtrim2(record):return re.sub('[ \n\r\t]', '', record)# pyspark 讀iceberg表
spark.conf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.iceberg.type", "hive")
spark.conf.set("spark.sql.catalog.iceberg.uri", "thrift://192.168.1.54:9083")spark.udf.register('xtrim2', xtrim2, 'string')
spark.udf.register('outer_udf', outer_udf)if __name__ == '__main__':df.show()spark.sql("select * from t1").show()spark.sql("select xtrim2('測(cè)試 數(shù)據(jù) 你好') ").show()spark.sql("select outer_udf('測(cè)試數(shù)據(jù)你好') ").show()spark.sql("use iceberg").show()spark.sql("show databases").show()pyspark對(duì)iceberg(hive)進(jìn)行操作
20220311
AttributeError: 'NoneType' object has no attribute 'sc' 解決方法!
把構(gòu)建spark對(duì)象放在循環(huán)外面或者臨時(shí)建一個(gè)sc對(duì)象?
spark的本質(zhì)就是處理數(shù)據(jù)的代碼換一種語言,另一種表達(dá)方式而已
參數(shù)調(diào)節(jié)
把executor數(shù)量調(diào)小,其他參數(shù)值調(diào)大,不容易報(bào)錯(cuò)
Spark任務(wù)報(bào)java.lang.StackOverflowError
https://blog.csdn.net/u010936936/article/details/88363449
Spark:java.io.IOException: No space left on devicehttps://blog.csdn.net/dupihua/article/details/51133551
ass_rule = ass_rule.filter('antecedent_len == 1')ass_rule = ass_rule.filter('consequent_len == 1')
dataframe過濾https://blog.csdn.net/qq_40006058/article/details/88931884
dataframe各種操作
20220310
data = spark.createDataFrame(data) # 普通dataframe轉(zhuǎn)spark dataframe
data.createOrReplaceTempView("all_data") # 轉(zhuǎn)sql表進(jìn)行操作part_data = spark.sql("select * from all_data where user_type= " + str(cus_type)) #sql操作
https://blog.csdn.net/zhurui_idea/article/details/73090951
ass_rule = ass_rule.rdd.map(lambda row:(row["antecedent"],row['consequent'], calculate_len(row['antecedent'])))# rdd執(zhí)行一下就變成了pipelinerddass_rule = spark.createDataFrame(ass_rule)再createDataFrame一下就變回dataframe
dataframe和RDD的轉(zhuǎn)換
自動(dòng)對(duì)每行數(shù)據(jù)進(jìn)行處理并保留原始其他字段
java.lang.IllegalStateException: Input row doesn't have expected number of values required by the sc
好奇怪字符分裂為列表的時(shí)候,必須前面還有其他字段或者會(huì)報(bào)錯(cuò)
part_data = spark.sql("select * from all_data where user_type= " + str(cus_type))part_data.show()all_record = part_data.select("user_type",'goods_huizong') # 可以選多個(gè)字段all_record = all_record.rdd.map(lambda row: (row['user_type],transform_to_list(row['goods_huizong'])))
后面也可以選多個(gè)字段
File "/usr/local/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 875, in subimport__import__(name)
ModuleNotFoundError: No module named 'utils_'與pyspark大數(shù)據(jù)相關(guān)的函數(shù)只能放在當(dāng)前模塊?通過其他模塊導(dǎo)入
會(huì)不能識(shí)別?
20211231
Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources資源被其他人占用了
20211230
Spark 2.0.x dump a csv file from a dataframe containing one array of type string
https://stackoverflow.com/questions/40426106/spark-2-0-x-dump-a-csv-file-from-a-dataframe-containing-one-array-of-type-string
from pyspark.sql.functions import udf
from pyspark.sql.types import StringTypedef array_to_string(my_list):return '[' + ','.join([str(elem) for elem in my_list]) + ']'array_to_string_udf = udf(array_to_string, StringType())df = df.withColumn('column_as_str', array_to_string_udf(df["column_as_array"]))
df.drop("column_as_array").write.csv(...)
上面的方式有問題 生成的列里面的值是生成式import org.apache.spark.sql.functions._
val dumpCSV = df.withColumn("ArrayOfString", assRule["ArrayOfString"].cast("string")).write.csv(path="/home/me/saveDF")
這一種可以實(shí)現(xiàn)
https://www.jianshu.com/p/3735b5e2c540
https://www.jianshu.com/p/80964332b3c4
rdd或者sparkDataframe寫入csv普通的pandas不能寫入hdfs
import findspark
findspark.init()
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.fpm import FPGrowth
import datetime
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
from tqdm import tqdm
import platform
import os
os.environ['JAVA_HOME']=r'/usr/local/jdk1.8.0_212'
t1 = datetime.datetime.now()
appname = "FPgrowth"
#master = "local[6]"spark = SparkSession.Builder().appName(appname)\.config('spark.num-executors','50')\.config('spark.executor.memory','4g')\.config('spark.executor.cores','3')\.config('spark.driver.memory','1g')\.config('spark.default.parallelism','1000')\.config('spark.storage.memoryFraction','0.5')\.config('spark.shuffle.memoryFraction','0.3')\.config("spark.speculation",'True')\.config("spark.speculation.interval",'100')\.config("spark.speculation.quantile","0.75")\.config("spark.speculation.multiplier",'1.5')\.config("spark.scheduler.mode",'FAIR')\.getOrCreate()
df = spark.read.format("csv"). \option("header", "true") \.load("/data/tb_order_user_sec_type_group.csv")df.createOrReplaceTempView('all_data')
sec_type=spark.sql("select sec_type from all_data ")
https://hub.mybinder.turing.ac.uk/user/apache-spark-sjqwupmp/notebooks/python/docs/source/getting_started/quickstart_ps.ipynb
Quickstart: Pandas API on Spark 快速開始基于pyspark的pandas
part_data=spark.sql("select * from all_data where sec_type= "+ cus_type)
part_data.count() # 統(tǒng)計(jì)RDD中的元素個(gè)數(shù) 行數(shù)
lines.first() # 這個(gè)RDD中的第一個(gè)元素,也就是README.md的第一行
http://spark.apache.org/docs/latest/api/python/getting_started/index.html
pyspark 官方文檔 sparksql和sparkdataframe都參考官方文檔
快速轉(zhuǎn)化成pandas進(jìn)行操作
20210831
Windows10:spark報(bào)錯(cuò)。WARN Utils: Service ‘SparkUI‘ could not bind on port 4040. Attempting port 4041.
https://blog.csdn.net/weixin_43748432/article/details/107378033
java.lang.OutOfMemoryError: GC overhead limit exceeded
https://blog.csdn.net/gaokao2011/article/details/51707163調(diào)大下面的參數(shù)
Spark算子:RDD基本轉(zhuǎn)換操作(5)–mapPartitions、
http://lxw1234.com/archives/2015/07/348.htm
以分區(qū)為單位來map而不是對(duì)每個(gè)元素單獨(dú)map
提高效率
spark = SparkSession.Builder().appName(appname).master(master)\.config('spark.some.config.option0','some_value') \ .config('spark.executor.memory','2g')\ #executor 內(nèi)存設(shè)置.config('spark.executor.cores','2')\ #單個(gè)executor的可用的cpu核心數(shù).config("spark.executor.instances",'10')\ #executor的總個(gè)數(shù).config('spark.driver.memory','1g')\ # driver 的設(shè)置 要比 executor的小?.config('spark.default.parallelism','1000')\ #任務(wù)數(shù)的設(shè)置.config('spark.sql.shuffle.partitions','300')\ #分區(qū)數(shù)的設(shè)置.config("spark.driver.extraJavaOptions","-Xss2048M")\ #jvm相關(guān)設(shè)置 .config("spark.speculation",'True')\ # 避免卡在某個(gè)stage.config("spark.speculation.interval",'100')\ # 避免卡在某個(gè)stage.config("spark.speculation.quantile","0.1")\ # 避免卡在某個(gè)stage.config("spark.speculation.multiplier",'1')\ # 避免卡在某個(gè)stage.config("spark.scheduler.mode",'FAIR')\ # 調(diào)度方式.getOrCreate()
參數(shù)設(shè)置spark = SparkSession.Builder().appName(appname).master(master)\.config('spark.some.config.option0','some_value') \.config('spark.executor.memory','2g')\.config('spark.executor.cores','2')\.config("spark.executor.instances",'10')\.config('spark.driver.memory','3g')\
#這個(gè)參數(shù)很重要 .config('spark.default.parallelism','1000')\#這個(gè)參數(shù)很重要 .config('spark.sql.shuffle.partitions','300')\.config("spark.driver.extraJavaOptions","-Xss3072M")\#這個(gè)參數(shù)很重要 .config("spark.speculation",'True')\.config("spark.speculation.interval",'100')\.config("spark.speculation.quantile","0.1")\.config("spark.speculation.multiplier",'1')\.config("spark.scheduler.mode",'FAIR')\.getOrCreate()總共32gb內(nèi)存 這個(gè)配置能很快的跑出結(jié)果
https://blog.csdn.net/lotusws/article/details/52423254
spark master local 參數(shù)
然后訪問瀏覽器地址:http://192.168.1.116:4040
sparkui
spark面板地址
配置參數(shù)查看
正在跑的stage
pending 還沒跑的stage
completed 完成的stage
12/69 13 一共69個(gè) stage 已經(jīng)跑了12個(gè) 13個(gè)正在跑
面板主要看stage 和 executor
時(shí)間線 從左到右
job 下面查看具體失敗原因
https://blog.csdn.net/weixin_42340179/article/details/82415085
https://blog.csdn.net/whgyxy/article/details/88779965
在某個(gè)stage卡住
spark運(yùn)行正常,某一個(gè)Stage卡住,停止不前異常分析
https://blog.csdn.net/yf_bit/article/details/93610829
重點(diǎn)
https://www.cnblogs.com/candlia/p/11920289.html
https://www.cnblogs.com/xiao02fang/p/13197877.html
影響spark性能的因素
https://www.csdn.net/tags/OtDaUgysMTk3Mi1ibG9n.html
https://www.cnblogs.com/yangsy0915/p/6060532.html
重點(diǎn)
pyspark 配置參數(shù)
https://www.javaroad.cn/questions/15705
按行循環(huán)
http://www.sofasofa.io/forum_main_post.php?postid=1005461
獲取總行數(shù)和總列數(shù)
https://blog.csdn.net/qq_40006058/article/details/88822268
PySpark學(xué)習(xí) | 常用的 68 個(gè)函數(shù) | 解釋 + python代碼
https://blog.csdn.net/qq_29153321/article/details/88648948
RDD操作
https://www.jianshu.com/p/55efdcabd163
pyspark一些簡(jiǎn)單常用的函數(shù)方法
http://sofasofa.io/forum_main_post.php?postid=1002482
dataframe更改列名
總結(jié)
以上是生活随笔為你收集整理的pyspark汇总小结的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 大数据报错问题
- 下一篇: pyspark性能调优参数