1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > spark调用python_在MRS集群中使用Python3.7运行PySpark程序 调用RDD的take函数报错处理...

spark调用python_在MRS集群中使用Python3.7运行PySpark程序 调用RDD的take函数报错处理...

时间:2022-07-05 03:31:19

相关推荐

spark调用python_在MRS集群中使用Python3.7运行PySpark程序 调用RDD的take函数报错处理...

现象

如果我们安装了python3.7版本来运行spark,并且用到了RDD的take函数,就会报错:RuntimeError:generatorraisedStopIteration

我们可以编写一个python脚本test.py来进行测试,用spark-submit命令提交: spark-submit test.pyfrompyspark.sqlimportSparkSession

spark=SparkSession\

.builder\

.appName("PythonWordCount")\

.getOrCreate()

sc=spark.sparkContext

print(sc.parallelize([1,2]).take(1))

当然也可以直接在pyspark的交互式shell中执行sc.parallelize([1,2]).take(1)

执行就会碰到上面说的错误。

原因分析

这个错误是由于Python3.7合入了一个不兼容性修改PEP-0479引起的。

Spark社区已经修复并合入了Spark的2.3.2和2.4版本,具体参考Spark对应的JIRASPARK-24739。

解决方法

MRS目前使用的版本为Spark 2.2.1,尚未修复。目前可以通过两种方法规避:(推荐)使用Python 3.6,3.5;

参考开源合入,修改提交任务节点的对应python脚本,并打包。具体见下一节。

参考开源合入,修改不兼容的python代码

打开客户端节点,修改/opt/client/Spark/spark/python/pyspark/rdd.py中的takeUpToNumLeft方法。deftakeUpToNumLeft(iterator):

iterator=iter(iterator)

taken=0

whiletaken<left:

try:

yieldnext(iterator)

exceptStopIteration:

return

taken+=1

重新打包pyspark.zip#备份pyspark.zip

cd/opt/client/Spark/spark/python/lib&&mvpyspark.zippyspark.zip.bak

#打包新的pyspark.zip

cd/opt/client/Spark/spark/python;zip-rpyspark.zippyspark

#移动至lib目录

mvpyspark.zip/opt/client/Spark/spark/python/lib/

#修改权限

chmod777/opt/client/Spark/spark/python/lib/pyspark.zip

这时再来试一下。spark-submittest.py

或者

spark-submit--masteryarntest.py

额外说一点,使用pyspark

shell,使用的是未打包的python,就是/opt/client/Spark/spark/python/pyspark/xxx.py;使用spark-submit命令提交,用的就是lib目录下的pyspark.zip了,具体可以看下面这一行上传的日志。-12-1816:24:05,888|INFO|Thread-3|Uploadingresourcefile:/opt/client/Spark/spark/python/li

b/pyspark.zip->hdfs://hacluster/user/hdfs/.sparkStaging/application_1545119010002_0007/pyspark.zip|

org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。