欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 人文社科 > 生活经验 >内容正文

生活经验

pyspark汇总小结

发布时间:2023/11/28 生活经验 50 豆豆
生活随笔 收集整理的这篇文章主要介绍了 pyspark汇总小结 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

20220402

Spark报Total size of serialized results of 12189 tasks is bigger than spark.driver.maxResultSize
https://blog.csdn.net/qq_27600723/article/details/107023574
pyspark读写iceberg# code:utf-8
import findspark
findspark.init(r"D:\Python37\Lib\site-packages\pyspark")
这里要指定pyspark的路径,如果是服务器的话最好用spark所在的pyspark路径
import os
java8_location = r'D:\Java\jdk1.8.0_301/'  # 设置你自己的路径
os.environ['JAVA_HOME'] = java8_location
from pyspark.sql import SparkSessiondef get_spark():# pyspark 读iceberg表spark = SparkSession.builder.getOrCreate()spark.conf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")spark.conf.set("spark.sql.catalog.iceberg.type", "hive")spark.conf.set("spark.sql.catalog.iceberg.uri", "thrift://192.168.1.54:9083")不同的目标地址,不同的服务器集群,要拷贝对应的两个hive文件到当地客户端的pyspar conf文件夹下return sparkif __name__ == '__main__':spark = get_spark()pdf = spark.sql("select shangpgg from iceberg.test.end_spec limit 10")spark.sql("insert into iceberg.test.end_spec  values ('aa','bb')")pdf.show()print()
1. 在pyspark下新建conf文件夹,把iceberg下的两个hive配置文件
放在下面
hdfs-site.xml
hive-site.xm
2. iceberg-spark3-runtime-0.13.1.jar 把这个文件放在pyspark的jars文件夹
Failed to open input stream for file: hdfs://ns1/warehouse/test.db/end_spec/metadata/00025-73e8d58b-c4f1-4c81-b0a8-f1a8a12090b1.metadata.json
org.apache.iceberg.exceptions.RuntimeIOException: Failed to open input stream for file: hdfs://ns1/warehouse/test.db/end_spec/metadata/00025-73e8d58b-c4f1-4c81-b0a8-f1a8a12090b1.metadata.json没找到hive的两个配置文件,需要在init里面指定pyspark的路径即可解决
# findspark.init(r"D:\Python37\Lib\site-packages\pyspark")
      od_all = spark.createDataFrame(od)od_all.createOrReplaceTempView('od_all')od_duplicate = spark.sql("select distinct user_id,goods_id,category_second_id from od_all;")od_duplicate.createOrReplaceTempView('od_duplicate')od_goods_group = spark.sql(" select user_id,count(goods_id) goods_nums_total from od_duplicate group by user_id ;")
sql语句中所牵扯的表,需要createOrReplaceTempView创建
执行sql时出现错误 extraneous input ';' expecting EOF near '<EOF>'
https://blog.csdn.net/xieganyu3460/article/details/83055935

https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/types.html?highlight=type
pyspark数据类型

TypeError: field id: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.LongType'>https://blog.csdn.net/weixin_40983094/article/details/115630358
# code:utf-8
from pathlib import Pathimport pandas as pd
from pyspark.ml.fpm import FPGrowth
import datetime
import platform
import os
import warnings
warnings.filterwarnings("ignore")
from utils_ import usetime,log_generate
from param_config import configlogger = log_generate(config.log["name"], config.log["date"])sys = platform.system()
if sys == "Windows":PATH = os.path.abspath(str(Path("").absolute())) + "/"
else:PATH = "/home/guanlian_algo_confirm3/"os.environ["JAVA_HOME"] = r"D:\Java\jdk1.8.0_301"t1 = datetime.datetime.now()@usetime
def calculate_fpgrowth(spark, data, total_nums):data = spark.createDataFrame(data)data.createOrReplaceTempView("all_data")part_data = spark.sql("select * from all_data ")all_record = part_data.select("goods_huizong")  # 筛选多列all_record.show(5)def transform_to_list(col):per_row = col.split("|")  # 传入的列数据,自动对每行数据进行处理return per_rowall_record = all_record.rdd.map(lambda row: (row["goods_huizong"], transform_to_list(row["goods_huizong"])))all_record = spark.createDataFrame(all_record, ["goods_huizong", "goods_huizong_list"])all_record.show(5)all_record = all_record.select("goods_huizong_list")all_record = all_record.withColumnRenamed("goods_huizong_list", "items")logger.debug()("总数据条数 {}".format(total_nums))fp = FPGrowth(minSupport=0.0001, minConfidence=0.8)fpm = fp.fit(all_record)  # 模型拟合fpm.freqItemsets.show(5)  # 在控制台显示前五条频繁项集fp_count = fpm.freqItemsets.count()if fp_count == 0:return pd.DataFrame()logger.debug()("*" * 100)logger.debug()("频繁项条数 {} ".format(fp_count))ass_rule = fpm.associationRules  # 强关联规则ass_rule.show()rule_nums = ass_rule.count()if rule_nums == 0:return pd.DataFrame()logger.debug()("规则条数 {} ".format(rule_nums))ass_rule = ass_rule.select(["antecedent", "consequent", "confidence"])ass_rule.show(5)ass_rule_df = ass_rule.toPandas()ass_rule_df["antecedent_str"] = ass_rule_df["antecedent"].apply(lambda x: str(x))ass_rule_df.sort_values(["antecedent_str", "confidence"], ascending=[True, False], inplace=True)t2 = datetime.datetime.now()logger.debug()("spent ts:", t2 - t1)return ass_rule_df简单实例

20220314

代码设置参数比命令行传参数的级别高,最终用的还是代码里面设置的参数

py4j.protocol.Py4JJavaError: An error occurred while calling o24.sql.
: org.apache.spark.SparkException: Cannot find catalog plugin class for catalog 'iceberg': org.apache.iceberg.spark.SparkCatalog
需要去iceberg官网下载一个 iceberg-spark-runtime-3.2_2.12-0.13.1.jar包
放在spark的jars下面https://iceberg.apache.org/docs/latest/getting-started/

# code:utf-8
import findspark
import pandas as pd
findspark.init()
from datetime import datetime, date
import re
from pyspark.sql import SparkSession
# from out_udf import outer_udf
#  /home/spark-3.1.2-bin-hadoop3.2/bin/spark-submit \
#  --master local  --py-files /root/bin/python_job/pyspark/out_udf.py hello_spark.py
# from pyspark.sql.functions import pandas_udf
spark = SparkSession.builder.getOrCreate()df = spark.createDataFrame([(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df.createOrReplaceTempView("t1")# UDF- 匿名函数
spark.udf.register('xtrim', lambda x: re.sub('[ \n\r\t]', '', x), 'string')# UDF 显式函数
def xtrim2(record):return re.sub('[ \n\r\t]', '', record)# pyspark 读iceberg表
spark.conf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.iceberg.type", "hive")
spark.conf.set("spark.sql.catalog.iceberg.uri", "thrift://192.168.1.54:9083")spark.udf.register('xtrim2', xtrim2, 'string')# spark.udf.register('outer_udf', outer_udf)if __name__ == '__main__':df.show()spark.sql("select * from t1").show()spark.sql("select xtrim2('测试 数据    你好') ").show()spark.sql("use iceberg").show()spark.sql("show databases").show()pyspark读取iceberg
from datetime import datetime, date
import re
from pyspark.sql import SparkSession
from out_udf import outer_udf
#  /home/spark-3.1.2-bin-hadoop3.2/bin/spark-submit \
#  --master local  --py-files /root/bin/python_job/pyspark/out_udf.py hello_spark.py
# from pyspark.sql.functions import pandas_udfspark = SparkSession.builder.getOrCreate()df = spark.createDataFrame([(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df.createOrReplaceTempView("t1")# UDF- 匿名函数
spark.udf.register('xtrim', lambda x: re.sub('[ \n\r\t]', '', x), 'string')# UDF 显式函数
def xtrim2(record):return re.sub('[ \n\r\t]', '', record)# pyspark 读iceberg表
spark.conf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.iceberg.type", "hive")
spark.conf.set("spark.sql.catalog.iceberg.uri", "thrift://192.168.1.54:9083")spark.udf.register('xtrim2', xtrim2, 'string')
spark.udf.register('outer_udf', outer_udf)if __name__ == '__main__':df.show()spark.sql("select * from t1").show()spark.sql("select xtrim2('测试 数据    你好') ").show()spark.sql("select outer_udf('测试数据你好') ").show()spark.sql("use iceberg").show()spark.sql("show databases").show()pyspark对iceberg(hive)进行操作

20220311

AttributeError: 'NoneType' object has no attribute 'sc' 解决方法!
把构建spark对象放在循环外面或者临时建一个sc对象?

spark的本质就是处理数据的代码换一种语言,另一种表达方式而已

参数调节
把executor数量调小,其他参数值调大,不容易报错

Spark任务报java.lang.StackOverflowError
https://blog.csdn.net/u010936936/article/details/88363449
Spark:java.io.IOException: No space left on devicehttps://blog.csdn.net/dupihua/article/details/51133551
ass_rule = ass_rule.filter('antecedent_len == 1')ass_rule = ass_rule.filter('consequent_len == 1')
dataframe过滤https://blog.csdn.net/qq_40006058/article/details/88931884
dataframe各种操作

20220310

data = spark.createDataFrame(data) # 普通dataframe转spark dataframe
data.createOrReplaceTempView("all_data") # 转sql表进行操作part_data = spark.sql("select * from all_data where user_type= " + str(cus_type)) #sql操作

https://blog.csdn.net/zhurui_idea/article/details/73090951

ass_rule = ass_rule.rdd.map(lambda row:(row["antecedent"],row['consequent'], calculate_len(row['antecedent'])))# rdd执行一下就变成了pipelinerddass_rule = spark.createDataFrame(ass_rule)再createDataFrame一下就变回dataframe

dataframe和RDD的转换


自动对每行数据进行处理并保留原始其他字段

java.lang.IllegalStateException: Input row doesn't have expected number of values required by the sc
好奇怪字符分裂为列表的时候,必须前面还有其他字段或者会报错
 part_data = spark.sql("select * from all_data where user_type= " + str(cus_type))part_data.show()all_record = part_data.select("user_type",'goods_huizong') # 可以选多个字段all_record = all_record.rdd.map(lambda row: (row['user_type],transform_to_list(row['goods_huizong']))) 
后面也可以选多个字段
  File "/usr/local/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 875, in subimport__import__(name)
ModuleNotFoundError: No module named 'utils_'与pyspark大数据相关的函数只能放在当前模块?通过其他模块导入
会不能识别?

20211231

 Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources资源被其他人占用了

20211230

Spark 2.0.x dump a csv file from a dataframe containing one array of type string
https://stackoverflow.com/questions/40426106/spark-2-0-x-dump-a-csv-file-from-a-dataframe-containing-one-array-of-type-string

from pyspark.sql.functions import udf
from pyspark.sql.types import StringTypedef array_to_string(my_list):return '[' + ','.join([str(elem) for elem in my_list]) + ']'array_to_string_udf = udf(array_to_string, StringType())df = df.withColumn('column_as_str', array_to_string_udf(df["column_as_array"]))
df.drop("column_as_array").write.csv(...)
上面的方式有问题 生成的列里面的值是生成式import org.apache.spark.sql.functions._
val dumpCSV = df.withColumn("ArrayOfString", assRule["ArrayOfString"].cast("string")).write.csv(path="/home/me/saveDF")
这一种可以实现

https://www.jianshu.com/p/3735b5e2c540
https://www.jianshu.com/p/80964332b3c4
rdd或者sparkDataframe写入csv普通的pandas不能写入hdfs

import  findspark
findspark.init()
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.fpm import FPGrowth
import datetime
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
from tqdm import tqdm
import platform
import os
os.environ['JAVA_HOME']=r'/usr/local/jdk1.8.0_212'
t1 = datetime.datetime.now()
appname = "FPgrowth"
#master = "local[6]"spark = SparkSession.Builder().appName(appname)\.config('spark.num-executors','50')\.config('spark.executor.memory','4g')\.config('spark.executor.cores','3')\.config('spark.driver.memory','1g')\.config('spark.default.parallelism','1000')\.config('spark.storage.memoryFraction','0.5')\.config('spark.shuffle.memoryFraction','0.3')\.config("spark.speculation",'True')\.config("spark.speculation.interval",'100')\.config("spark.speculation.quantile","0.75")\.config("spark.speculation.multiplier",'1.5')\.config("spark.scheduler.mode",'FAIR')\.getOrCreate()
df = spark.read.format("csv"). \option("header", "true") \.load("/data/tb_order_user_sec_type_group.csv")df.createOrReplaceTempView('all_data')
sec_type=spark.sql("select sec_type from all_data ")

https://hub.mybinder.turing.ac.uk/user/apache-spark-sjqwupmp/notebooks/python/docs/source/getting_started/quickstart_ps.ipynb
Quickstart: Pandas API on Spark 快速开始基于pyspark的pandas

part_data=spark.sql("select * from all_data where sec_type= "+ cus_type)
part_data.count() # 统计RDD中的元素个数 行数
lines.first() # 这个RDD中的第一个元素,也就是README.md的第一行

http://spark.apache.org/docs/latest/api/python/getting_started/index.html
pyspark 官方文档 sparksql和sparkdataframe都参考官方文档


快速转化成pandas进行操作

20210831

Windows10:spark报错。WARN Utils: Service ‘SparkUI‘ could not bind on port 4040. Attempting port 4041.

https://blog.csdn.net/weixin_43748432/article/details/107378033

java.lang.OutOfMemoryError: GC overhead limit exceeded
https://blog.csdn.net/gaokao2011/article/details/51707163调大下面的参数

Spark算子:RDD基本转换操作(5)–mapPartitions、
http://lxw1234.com/archives/2015/07/348.htm
以分区为单位来map而不是对每个元素单独map
提高效率

spark = SparkSession.Builder().appName(appname).master(master)\.config('spark.some.config.option0','some_value') \ .config('spark.executor.memory','2g')\  #executor 内存设置.config('spark.executor.cores','2')\ #单个executor的可用的cpu核心数.config("spark.executor.instances",'10')\ #executor的总个数.config('spark.driver.memory','1g')\ # driver 的设置 要比 executor的小?.config('spark.default.parallelism','1000')\ #任务数的设置.config('spark.sql.shuffle.partitions','300')\  #分区数的设置.config("spark.driver.extraJavaOptions","-Xss2048M")\    #jvm相关设置 .config("spark.speculation",'True')\  # 避免卡在某个stage.config("spark.speculation.interval",'100')\ # 避免卡在某个stage.config("spark.speculation.quantile","0.1")\ # 避免卡在某个stage.config("spark.speculation.multiplier",'1')\   # 避免卡在某个stage.config("spark.scheduler.mode",'FAIR')\ # 调度方式.getOrCreate()
参数设置spark = SparkSession.Builder().appName(appname).master(master)\.config('spark.some.config.option0','some_value') \.config('spark.executor.memory','2g')\.config('spark.executor.cores','2')\.config("spark.executor.instances",'10')\.config('spark.driver.memory','3g')\
#这个参数很重要    .config('spark.default.parallelism','1000')\#这个参数很重要   .config('spark.sql.shuffle.partitions','300')\.config("spark.driver.extraJavaOptions","-Xss3072M")\#这个参数很重要    .config("spark.speculation",'True')\.config("spark.speculation.interval",'100')\.config("spark.speculation.quantile","0.1")\.config("spark.speculation.multiplier",'1')\.config("spark.scheduler.mode",'FAIR')\.getOrCreate()总共32gb内存 这个配置能很快的跑出结果

https://blog.csdn.net/lotusws/article/details/52423254
spark master local 参数

然后访问浏览器地址:http://192.168.1.116:4040
sparkui
spark面板地址


配置参数查看


正在跑的stage
pending 还没跑的stage
completed 完成的stage
12/69 13 一共69个 stage 已经跑了12个 13个正在跑
面板主要看stage 和 executor


时间线 从左到右


job 下面查看具体失败原因

https://blog.csdn.net/weixin_42340179/article/details/82415085
https://blog.csdn.net/whgyxy/article/details/88779965
在某个stage卡住
spark运行正常,某一个Stage卡住,停止不前异常分析

https://blog.csdn.net/yf_bit/article/details/93610829
重点
https://www.cnblogs.com/candlia/p/11920289.html
https://www.cnblogs.com/xiao02fang/p/13197877.html
影响spark性能的因素

https://www.csdn.net/tags/OtDaUgysMTk3Mi1ibG9n.html
https://www.cnblogs.com/yangsy0915/p/6060532.html
重点
pyspark 配置参数

https://www.javaroad.cn/questions/15705
按行循环

http://www.sofasofa.io/forum_main_post.php?postid=1005461
获取总行数和总列数

https://blog.csdn.net/qq_40006058/article/details/88822268
PySpark学习 | 常用的 68 个函数 | 解释 + python代码

https://blog.csdn.net/qq_29153321/article/details/88648948
RDD操作

https://www.jianshu.com/p/55efdcabd163
pyspark一些简单常用的函数方法

http://sofasofa.io/forum_main_post.php?postid=1002482
dataframe更改列名

总结

以上是生活随笔为你收集整理的pyspark汇总小结的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。