日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

pyspark汇总小结

發(fā)布時(shí)間:2023/11/28 生活经验 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 pyspark汇总小结 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

20220402

Spark報(bào)Total size of serialized results of 12189 tasks is bigger than spark.driver.maxResultSize
https://blog.csdn.net/qq_27600723/article/details/107023574
pyspark讀寫iceberg# code:utf-8
import findspark
findspark.init(r"D:\Python37\Lib\site-packages\pyspark")
這里要指定pyspark的路徑,如果是服務(wù)器的話最好用spark所在的pyspark路徑
import os
java8_location = r'D:\Java\jdk1.8.0_301/'  # 設(shè)置你自己的路徑
os.environ['JAVA_HOME'] = java8_location
from pyspark.sql import SparkSessiondef get_spark():# pyspark 讀iceberg表spark = SparkSession.builder.getOrCreate()spark.conf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")spark.conf.set("spark.sql.catalog.iceberg.type", "hive")spark.conf.set("spark.sql.catalog.iceberg.uri", "thrift://192.168.1.54:9083")不同的目標(biāo)地址,不同的服務(wù)器集群,要拷貝對(duì)應(yīng)的兩個(gè)hive文件到當(dāng)?shù)乜蛻舳说膒yspar conf文件夾下return sparkif __name__ == '__main__':spark = get_spark()pdf = spark.sql("select shangpgg from iceberg.test.end_spec limit 10")spark.sql("insert into iceberg.test.end_spec  values ('aa','bb')")pdf.show()print()
1. 在pyspark下新建conf文件夾,把iceberg下的兩個(gè)hive配置文件
放在下面
hdfs-site.xml
hive-site.xm
2. iceberg-spark3-runtime-0.13.1.jar 把這個(gè)文件放在pyspark的jars文件夾
Failed to open input stream for file: hdfs://ns1/warehouse/test.db/end_spec/metadata/00025-73e8d58b-c4f1-4c81-b0a8-f1a8a12090b1.metadata.json
org.apache.iceberg.exceptions.RuntimeIOException: Failed to open input stream for file: hdfs://ns1/warehouse/test.db/end_spec/metadata/00025-73e8d58b-c4f1-4c81-b0a8-f1a8a12090b1.metadata.json沒找到hive的兩個(gè)配置文件,需要在init里面指定pyspark的路徑即可解決
# findspark.init(r"D:\Python37\Lib\site-packages\pyspark")
      od_all = spark.createDataFrame(od)od_all.createOrReplaceTempView('od_all')od_duplicate = spark.sql("select distinct user_id,goods_id,category_second_id from od_all;")od_duplicate.createOrReplaceTempView('od_duplicate')od_goods_group = spark.sql(" select user_id,count(goods_id) goods_nums_total from od_duplicate group by user_id ;")
sql語句中所牽扯的表,需要createOrReplaceTempView創(chuàng)建
執(zhí)行sql時(shí)出現(xiàn)錯(cuò)誤 extraneous input ';' expecting EOF near '<EOF>'
https://blog.csdn.net/xieganyu3460/article/details/83055935

https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/types.html?highlight=type
pyspark數(shù)據(jù)類型

TypeError: field id: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.LongType'>https://blog.csdn.net/weixin_40983094/article/details/115630358
# code:utf-8
from pathlib import Pathimport pandas as pd
from pyspark.ml.fpm import FPGrowth
import datetime
import platform
import os
import warnings
warnings.filterwarnings("ignore")
from utils_ import usetime,log_generate
from param_config import configlogger = log_generate(config.log["name"], config.log["date"])sys = platform.system()
if sys == "Windows":PATH = os.path.abspath(str(Path("").absolute())) + "/"
else:PATH = "/home/guanlian_algo_confirm3/"os.environ["JAVA_HOME"] = r"D:\Java\jdk1.8.0_301"t1 = datetime.datetime.now()@usetime
def calculate_fpgrowth(spark, data, total_nums):data = spark.createDataFrame(data)data.createOrReplaceTempView("all_data")part_data = spark.sql("select * from all_data ")all_record = part_data.select("goods_huizong")  # 篩選多列all_record.show(5)def transform_to_list(col):per_row = col.split("|")  # 傳入的列數(shù)據(jù),自動(dòng)對(duì)每行數(shù)據(jù)進(jìn)行處理return per_rowall_record = all_record.rdd.map(lambda row: (row["goods_huizong"], transform_to_list(row["goods_huizong"])))all_record = spark.createDataFrame(all_record, ["goods_huizong", "goods_huizong_list"])all_record.show(5)all_record = all_record.select("goods_huizong_list")all_record = all_record.withColumnRenamed("goods_huizong_list", "items")logger.debug()("總數(shù)據(jù)條數(shù) {}".format(total_nums))fp = FPGrowth(minSupport=0.0001, minConfidence=0.8)fpm = fp.fit(all_record)  # 模型擬合fpm.freqItemsets.show(5)  # 在控制臺(tái)顯示前五條頻繁項(xiàng)集fp_count = fpm.freqItemsets.count()if fp_count == 0:return pd.DataFrame()logger.debug()("*" * 100)logger.debug()("頻繁項(xiàng)條數(shù) {} ".format(fp_count))ass_rule = fpm.associationRules  # 強(qiáng)關(guān)聯(lián)規(guī)則ass_rule.show()rule_nums = ass_rule.count()if rule_nums == 0:return pd.DataFrame()logger.debug()("規(guī)則條數(shù) {} ".format(rule_nums))ass_rule = ass_rule.select(["antecedent", "consequent", "confidence"])ass_rule.show(5)ass_rule_df = ass_rule.toPandas()ass_rule_df["antecedent_str"] = ass_rule_df["antecedent"].apply(lambda x: str(x))ass_rule_df.sort_values(["antecedent_str", "confidence"], ascending=[True, False], inplace=True)t2 = datetime.datetime.now()logger.debug()("spent ts:", t2 - t1)return ass_rule_df簡(jiǎn)單實(shí)例

20220314

代碼設(shè)置參數(shù)比命令行傳參數(shù)的級(jí)別高,最終用的還是代碼里面設(shè)置的參數(shù)

py4j.protocol.Py4JJavaError: An error occurred while calling o24.sql.
: org.apache.spark.SparkException: Cannot find catalog plugin class for catalog 'iceberg': org.apache.iceberg.spark.SparkCatalog
需要去iceberg官網(wǎng)下載一個(gè) iceberg-spark-runtime-3.2_2.12-0.13.1.jar包
放在spark的jars下面https://iceberg.apache.org/docs/latest/getting-started/

# code:utf-8
import findspark
import pandas as pd
findspark.init()
from datetime import datetime, date
import re
from pyspark.sql import SparkSession
# from out_udf import outer_udf
#  /home/spark-3.1.2-bin-hadoop3.2/bin/spark-submit \
#  --master local  --py-files /root/bin/python_job/pyspark/out_udf.py hello_spark.py
# from pyspark.sql.functions import pandas_udf
spark = SparkSession.builder.getOrCreate()df = spark.createDataFrame([(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df.createOrReplaceTempView("t1")# UDF- 匿名函數(shù)
spark.udf.register('xtrim', lambda x: re.sub('[ \n\r\t]', '', x), 'string')# UDF 顯式函數(shù)
def xtrim2(record):return re.sub('[ \n\r\t]', '', record)# pyspark 讀iceberg表
spark.conf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.iceberg.type", "hive")
spark.conf.set("spark.sql.catalog.iceberg.uri", "thrift://192.168.1.54:9083")spark.udf.register('xtrim2', xtrim2, 'string')# spark.udf.register('outer_udf', outer_udf)if __name__ == '__main__':df.show()spark.sql("select * from t1").show()spark.sql("select xtrim2('測(cè)試 數(shù)據(jù)    你好') ").show()spark.sql("use iceberg").show()spark.sql("show databases").show()pyspark讀取iceberg
from datetime import datetime, date
import re
from pyspark.sql import SparkSession
from out_udf import outer_udf
#  /home/spark-3.1.2-bin-hadoop3.2/bin/spark-submit \
#  --master local  --py-files /root/bin/python_job/pyspark/out_udf.py hello_spark.py
# from pyspark.sql.functions import pandas_udfspark = SparkSession.builder.getOrCreate()df = spark.createDataFrame([(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df.createOrReplaceTempView("t1")# UDF- 匿名函數(shù)
spark.udf.register('xtrim', lambda x: re.sub('[ \n\r\t]', '', x), 'string')# UDF 顯式函數(shù)
def xtrim2(record):return re.sub('[ \n\r\t]', '', record)# pyspark 讀iceberg表
spark.conf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.iceberg.type", "hive")
spark.conf.set("spark.sql.catalog.iceberg.uri", "thrift://192.168.1.54:9083")spark.udf.register('xtrim2', xtrim2, 'string')
spark.udf.register('outer_udf', outer_udf)if __name__ == '__main__':df.show()spark.sql("select * from t1").show()spark.sql("select xtrim2('測(cè)試 數(shù)據(jù)    你好') ").show()spark.sql("select outer_udf('測(cè)試數(shù)據(jù)你好') ").show()spark.sql("use iceberg").show()spark.sql("show databases").show()pyspark對(duì)iceberg(hive)進(jìn)行操作

20220311

AttributeError: 'NoneType' object has no attribute 'sc' 解決方法!
把構(gòu)建spark對(duì)象放在循環(huán)外面或者臨時(shí)建一個(gè)sc對(duì)象?

spark的本質(zhì)就是處理數(shù)據(jù)的代碼換一種語言,另一種表達(dá)方式而已

參數(shù)調(diào)節(jié)
把executor數(shù)量調(diào)小,其他參數(shù)值調(diào)大,不容易報(bào)錯(cuò)

Spark任務(wù)報(bào)java.lang.StackOverflowError
https://blog.csdn.net/u010936936/article/details/88363449
Spark:java.io.IOException: No space left on devicehttps://blog.csdn.net/dupihua/article/details/51133551
ass_rule = ass_rule.filter('antecedent_len == 1')ass_rule = ass_rule.filter('consequent_len == 1')
dataframe過濾https://blog.csdn.net/qq_40006058/article/details/88931884
dataframe各種操作

20220310

data = spark.createDataFrame(data) # 普通dataframe轉(zhuǎn)spark dataframe
data.createOrReplaceTempView("all_data") # 轉(zhuǎn)sql表進(jìn)行操作part_data = spark.sql("select * from all_data where user_type= " + str(cus_type)) #sql操作

https://blog.csdn.net/zhurui_idea/article/details/73090951

ass_rule = ass_rule.rdd.map(lambda row:(row["antecedent"],row['consequent'], calculate_len(row['antecedent'])))# rdd執(zhí)行一下就變成了pipelinerddass_rule = spark.createDataFrame(ass_rule)再createDataFrame一下就變回dataframe

dataframe和RDD的轉(zhuǎn)換


自動(dòng)對(duì)每行數(shù)據(jù)進(jìn)行處理并保留原始其他字段

java.lang.IllegalStateException: Input row doesn't have expected number of values required by the sc
好奇怪字符分裂為列表的時(shí)候,必須前面還有其他字段或者會(huì)報(bào)錯(cuò)
 part_data = spark.sql("select * from all_data where user_type= " + str(cus_type))part_data.show()all_record = part_data.select("user_type",'goods_huizong') # 可以選多個(gè)字段all_record = all_record.rdd.map(lambda row: (row['user_type],transform_to_list(row['goods_huizong']))) 
后面也可以選多個(gè)字段
  File "/usr/local/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 875, in subimport__import__(name)
ModuleNotFoundError: No module named 'utils_'與pyspark大數(shù)據(jù)相關(guān)的函數(shù)只能放在當(dāng)前模塊?通過其他模塊導(dǎo)入
會(huì)不能識(shí)別?

20211231

 Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources資源被其他人占用了

20211230

Spark 2.0.x dump a csv file from a dataframe containing one array of type string
https://stackoverflow.com/questions/40426106/spark-2-0-x-dump-a-csv-file-from-a-dataframe-containing-one-array-of-type-string

from pyspark.sql.functions import udf
from pyspark.sql.types import StringTypedef array_to_string(my_list):return '[' + ','.join([str(elem) for elem in my_list]) + ']'array_to_string_udf = udf(array_to_string, StringType())df = df.withColumn('column_as_str', array_to_string_udf(df["column_as_array"]))
df.drop("column_as_array").write.csv(...)
上面的方式有問題 生成的列里面的值是生成式import org.apache.spark.sql.functions._
val dumpCSV = df.withColumn("ArrayOfString", assRule["ArrayOfString"].cast("string")).write.csv(path="/home/me/saveDF")
這一種可以實(shí)現(xiàn)

https://www.jianshu.com/p/3735b5e2c540
https://www.jianshu.com/p/80964332b3c4
rdd或者sparkDataframe寫入csv普通的pandas不能寫入hdfs

import  findspark
findspark.init()
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.fpm import FPGrowth
import datetime
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
from tqdm import tqdm
import platform
import os
os.environ['JAVA_HOME']=r'/usr/local/jdk1.8.0_212'
t1 = datetime.datetime.now()
appname = "FPgrowth"
#master = "local[6]"spark = SparkSession.Builder().appName(appname)\.config('spark.num-executors','50')\.config('spark.executor.memory','4g')\.config('spark.executor.cores','3')\.config('spark.driver.memory','1g')\.config('spark.default.parallelism','1000')\.config('spark.storage.memoryFraction','0.5')\.config('spark.shuffle.memoryFraction','0.3')\.config("spark.speculation",'True')\.config("spark.speculation.interval",'100')\.config("spark.speculation.quantile","0.75")\.config("spark.speculation.multiplier",'1.5')\.config("spark.scheduler.mode",'FAIR')\.getOrCreate()
df = spark.read.format("csv"). \option("header", "true") \.load("/data/tb_order_user_sec_type_group.csv")df.createOrReplaceTempView('all_data')
sec_type=spark.sql("select sec_type from all_data ")

https://hub.mybinder.turing.ac.uk/user/apache-spark-sjqwupmp/notebooks/python/docs/source/getting_started/quickstart_ps.ipynb
Quickstart: Pandas API on Spark 快速開始基于pyspark的pandas

part_data=spark.sql("select * from all_data where sec_type= "+ cus_type)
part_data.count() # 統(tǒng)計(jì)RDD中的元素個(gè)數(shù) 行數(shù)
lines.first() # 這個(gè)RDD中的第一個(gè)元素,也就是README.md的第一行

http://spark.apache.org/docs/latest/api/python/getting_started/index.html
pyspark 官方文檔 sparksql和sparkdataframe都參考官方文檔


快速轉(zhuǎn)化成pandas進(jìn)行操作

20210831

Windows10:spark報(bào)錯(cuò)。WARN Utils: Service ‘SparkUI‘ could not bind on port 4040. Attempting port 4041.

https://blog.csdn.net/weixin_43748432/article/details/107378033

java.lang.OutOfMemoryError: GC overhead limit exceeded
https://blog.csdn.net/gaokao2011/article/details/51707163調(diào)大下面的參數(shù)

Spark算子:RDD基本轉(zhuǎn)換操作(5)–mapPartitions、
http://lxw1234.com/archives/2015/07/348.htm
以分區(qū)為單位來map而不是對(duì)每個(gè)元素單獨(dú)map
提高效率

spark = SparkSession.Builder().appName(appname).master(master)\.config('spark.some.config.option0','some_value') \ .config('spark.executor.memory','2g')\  #executor 內(nèi)存設(shè)置.config('spark.executor.cores','2')\ #單個(gè)executor的可用的cpu核心數(shù).config("spark.executor.instances",'10')\ #executor的總個(gè)數(shù).config('spark.driver.memory','1g')\ # driver 的設(shè)置 要比 executor的小?.config('spark.default.parallelism','1000')\ #任務(wù)數(shù)的設(shè)置.config('spark.sql.shuffle.partitions','300')\  #分區(qū)數(shù)的設(shè)置.config("spark.driver.extraJavaOptions","-Xss2048M")\    #jvm相關(guān)設(shè)置 .config("spark.speculation",'True')\  # 避免卡在某個(gè)stage.config("spark.speculation.interval",'100')\ # 避免卡在某個(gè)stage.config("spark.speculation.quantile","0.1")\ # 避免卡在某個(gè)stage.config("spark.speculation.multiplier",'1')\   # 避免卡在某個(gè)stage.config("spark.scheduler.mode",'FAIR')\ # 調(diào)度方式.getOrCreate()
參數(shù)設(shè)置spark = SparkSession.Builder().appName(appname).master(master)\.config('spark.some.config.option0','some_value') \.config('spark.executor.memory','2g')\.config('spark.executor.cores','2')\.config("spark.executor.instances",'10')\.config('spark.driver.memory','3g')\
#這個(gè)參數(shù)很重要    .config('spark.default.parallelism','1000')\#這個(gè)參數(shù)很重要   .config('spark.sql.shuffle.partitions','300')\.config("spark.driver.extraJavaOptions","-Xss3072M")\#這個(gè)參數(shù)很重要    .config("spark.speculation",'True')\.config("spark.speculation.interval",'100')\.config("spark.speculation.quantile","0.1")\.config("spark.speculation.multiplier",'1')\.config("spark.scheduler.mode",'FAIR')\.getOrCreate()總共32gb內(nèi)存 這個(gè)配置能很快的跑出結(jié)果

https://blog.csdn.net/lotusws/article/details/52423254
spark master local 參數(shù)

然后訪問瀏覽器地址:http://192.168.1.116:4040
sparkui
spark面板地址


配置參數(shù)查看


正在跑的stage
pending 還沒跑的stage
completed 完成的stage
12/69 13 一共69個(gè) stage 已經(jīng)跑了12個(gè) 13個(gè)正在跑
面板主要看stage 和 executor


時(shí)間線 從左到右


job 下面查看具體失敗原因

https://blog.csdn.net/weixin_42340179/article/details/82415085
https://blog.csdn.net/whgyxy/article/details/88779965
在某個(gè)stage卡住
spark運(yùn)行正常,某一個(gè)Stage卡住,停止不前異常分析

https://blog.csdn.net/yf_bit/article/details/93610829
重點(diǎn)
https://www.cnblogs.com/candlia/p/11920289.html
https://www.cnblogs.com/xiao02fang/p/13197877.html
影響spark性能的因素

https://www.csdn.net/tags/OtDaUgysMTk3Mi1ibG9n.html
https://www.cnblogs.com/yangsy0915/p/6060532.html
重點(diǎn)
pyspark 配置參數(shù)

https://www.javaroad.cn/questions/15705
按行循環(huán)

http://www.sofasofa.io/forum_main_post.php?postid=1005461
獲取總行數(shù)和總列數(shù)

https://blog.csdn.net/qq_40006058/article/details/88822268
PySpark學(xué)習(xí) | 常用的 68 個(gè)函數(shù) | 解釋 + python代碼

https://blog.csdn.net/qq_29153321/article/details/88648948
RDD操作

https://www.jianshu.com/p/55efdcabd163
pyspark一些簡(jiǎn)單常用的函數(shù)方法

http://sofasofa.io/forum_main_post.php?postid=1002482
dataframe更改列名

總結(jié)

以上是生活随笔為你收集整理的pyspark汇总小结的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 国产成人av一区二区三区在线观看 | 日本一区二区在线视频 | 97超碰站 | 色片网站在线观看 | 精品熟妇视频一区二区三区 | 在线国产精品视频 | 懂色av,蜜臀av粉嫩av | 日韩成人欧美 | 国产成人在线看 | 一区二区伦理 | 日本xxxx免费 | 中文久久久 | 午夜视频| 久久久久久久一区二区三区 | 99re在线视频精品 | 亚洲高清无码久久久 | 欧美日韩乱国产 | 国模私拍一区二区 | 精品在线视频免费 | 亚洲午夜精品久久久久久app | 久久亚洲AV无码精品 | 国产午夜免费 | 婷婷色网 | 天天色天天操天天射 | 一级一片免费播放 | 国产日产精品一区二区 | 成人深夜小视频 | www.欧美.com | 国产激情无码一区二区 | 日韩女优在线观看 | 毛片av在线观看 | 国产人成无码视频在线观看 | 中文字幕3区 | 淫视频在线观看 | 一起草最新网址 | 奇米第四色影视 | 91香蕉国产在线观看软件 | 精品欧美一区二区三区久久久 | 日本黄色三级网站 | 他趴在我两腿中间添得好爽在线看 | 日本伦理片在线看 | 青青草视频污 | 在线观看免费国产 | jizz国产在线 | 久久久久久久久国产精品 | 一二三区不卡 | 亚洲中文一区二区 | 草草影院在线观看视频 | 日韩欧美视频在线免费观看 | 免费毛片网站 | 成人性做爰aaa片免费 | 欧美bbbbbbbbbbbb1 麻豆精品av | 黄色成人在线视频 | 中国美女黄色一级片 | 黑人极品videos精品欧美裸 | 中文字幕在线播放一区二区 | 夜夜综合 | 亚洲午夜精品一区二区三区 | 伊人精品视频 | 国产啊啊啊啊 | 久操影视 | 97成人人妻一区二区三区 | 黄网站免费大全入口 | 青青草公开视频 | 91精品国产色综合久久不卡电影 | 高清乱码免费网 | 自拍超碰| 亚洲成人免费视频 | 波多野结衣网址 | 日本欧美www | 精品欧美一区二区在线观看 | 成人动漫视频在线观看 | 波多野结衣视频在线播放 | 十八岁世界在线观看高清免费韩剧 | 欧美色视频一区二区三区 | 国产只有精品 | 成人毛片视频免费看 | 精品成人一区二区三区久久精品 | 国产精品免费久久 | 高清一区在线观看 | 天天鲁一鲁摸一摸爽一爽 | 香蕉视频免费在线 | 日韩性av | 精品肉丝脚一区二区三区 | 欧美一级片免费在线观看 | 青青草草 | 亚洲成a人 | 日本中文字幕在线视频 | 亚洲精品国产精品国自产观看 | 国产精品一区二区电影 | 午夜免费福利影院 | 国产一区二区三区四区五区 | 日韩少妇内射免费播放18禁裸乳 | 国产午夜精品久久久 | 成人欧美一区二区三区黑人一 | 国产精品久线在线观看 | 一本一道久久a久久精品蜜桃 | 蜜桃视频欧美 | 91久久久国产精品 |