现象
如果我们安装了python3.7版本来运行spark,并且用到了RDD的take函数,就会报错:RuntimeError:generatorraisedStopIteration
我们可以编写一个python脚本test.py来进行测试,用spark-submit命令提交: spark-submit test.pyfrompyspark.sqlimportSparkSession
spark=SparkSession\
.builder\
.appName("PythonWordCount")\
.getOrCreate()
sc=spark.sparkContext
print(sc.parallelize([1,2]).take(1))
当然也可以直接在pyspark的交互式shell中执行sc.parallelize([1,2]).take(1)
执行就会碰到上面说的错误。
原因分析
这个错误是由于Python3.7合入了一个不兼容性修改PEP-0479引起的。
Spark社区已经修复并合入了Spark的2.3.2和2.4版本,具体参考Spark对应的JIRASPARK-24739。
解决方法
MRS目前使用的版本为Spark 2.2.1,尚未修复。目前可以通过两种方法规避:(推荐)使用Python 3.6,3.5;
参考开源合入,修改提交任务节点的对应python脚本,并打包。具体见下一节。
参考开源合入,修改不兼容的python代码
打开客户端节点,修改/opt/client/Spark/spark/python/pyspark/rdd.py中的takeUpToNumLeft方法。deftakeUpToNumLeft(iterator):
iterator=iter(iterator)
taken=0
whiletaken<left:
try:
yieldnext(iterator)
exceptStopIteration:
return
taken+=1
重新打包pyspark.zip#备份pyspark.zip
cd/opt/client/Spark/spark/python/lib&&mvpyspark.zippyspark.zip.bak
#打包新的pyspark.zip
cd/opt/client/Spark/spark/python;zip-rpyspark.zippyspark
#移动至lib目录
mvpyspark.zip/opt/client/Spark/spark/python/lib/
#修改权限
chmod777/opt/client/Spark/spark/python/lib/pyspark.zip
这时再来试一下。spark-submittest.py
或者
spark-submit--masteryarntest.py
额外说一点,使用pyspark
shell,使用的是未打包的python,就是/opt/client/Spark/spark/python/pyspark/xxx.py;使用spark-submit命令提交,用的就是lib目录下的pyspark.zip了,具体可以看下面这一行上传的日志。-12-1816:24:05,888|INFO|Thread-3|Uploadingresourcefile:/opt/client/Spark/spark/python/li
b/pyspark.zip->hdfs://hacluster/user/hdfs/.sparkStaging/application_1545119010002_0007/pyspark.zip|
org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)