scala – Apache Spark:执行器之间的网络错误
发布时间:2020-12-16 09:19:44 所属栏目:安全 来源:网络整理
导读:我在Scala 2.11.2上运行Apache Spark 1.3.1,当在具有足够数据的HPC群集上运行时,我收到了许多错误,如我的帖子底部的错误(每秒重复一次,直到作业随着时间的推移被杀死)基于错误,执行程序正在尝试从其他节点获取洗牌数据,但无法执行此操作. 这个相同的程序可以
我在Scala 2.11.2上运行Apache Spark 1.3.1,当在具有足够数据的HPC群集上运行时,我收到了许多错误,如我的帖子底部的错误(每秒重复一次,直到作业随着时间的推移被杀死)基于错误,执行程序正在尝试从其他节点获取洗牌数据,但无法执行此操作.
这个相同的程序可以使用(a)较小的数据量,或(b)在本地模式下执行,因此它与通过网络发送的数据有关(并且不会被非常小的触发)数据量). 正在执行的代码在这个时候发生如下: val partitioned_data = data // data was read as sc.textFile(inputFile) .zipWithIndex.map(x => (x._2,x._1)) .partitionBy(partitioner) // A custom partitioner .map(_._2) // Force previous lazy operations to be evaluated. Presumably adds some // overhead,but hopefully the minimum possible... // Suggested on Spark user list: http://apache-spark-user-list.1001560.n3.nabble.com/Forcing-RDD-computation-with-something-else-than-count-td707.html sc.runJob(partitioned_data,(iter: Iterator[_]) => {}) 这是否表示一个错误,还是有什么我做错了? 这是一个执行者之一的stderr日志的小代码片段(完整日志是here): 15/04/21 14:59:28 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1601401593000,chunkIndex=0},buffer=FileSegmentManagedBuffer{file=/tmp/spark-0f8d0598-b137-4d14-993a-568b2ab3709a/spark-12d5ff0a-2793-4b76-8a0b-d977a5924925/spark-7ad9382d-05cf-49d4-9a52-d42e6ca7117d/blockmgr-b72d4068-d065-47e6-8a10-867f723000db/15/shuffle_0_1_0.data,offset=26501223,length=6227612}} to /10.0.0.5:41160; closing connection java.io.IOException: Resource temporarily unavailable at sun.nio.ch.FileChannelImpl.transferTo0(Native Method) at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:415) at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:516) at org.apache.spark.network.buffer.LazyFileRegion.transferTo(LazyFileRegion.java:96) at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:89) at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:237) at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:233) at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:264) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:707) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:315) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:676) at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1059) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:669) at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:718) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:706) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:741) at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:895) at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:240) at org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:147) at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:119) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:95) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:619) 15/04/21 14:59:28 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1601401593000,chunkIndex=1},buffer=FileSegmentManagedBuffer{file=/tmp/spark-0f8d0598-b137-4d14-993a-568b2ab3709a/spark-12d5ff0a-2793-4b76-8a0b-d977a5924925/spark-7ad9382d-05cf-49d4-9a52-d42e6ca7117d/blockmgr-b72d4068-d065-47e6-8a10-867f723000db/27/shuffle_0_5_0.data,offset=3792987,length=2862285}} to /10.0.0.5:41160; closing connection java.nio.channels.ClosedChannelException 15/04/21 14:59:28 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1601401593002,offset=0,length=10993212}} to /10.0.0.6:42426; closing connection java.io.IOException: Resource temporarily unavailable at sun.nio.ch.FileChannelImpl.transferTo0(Native Method) at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:415) at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:516) at org.apache.spark.network.buffer.LazyFileRegion.transferTo(LazyFileRegion.java:96) at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:89) at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:237) at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:233) at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:264) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:707) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:315) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:676) at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1059) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:669) at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:718) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:706) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:741) at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:895) at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:240) at org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:147) at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:119) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:95) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:619) 15/04/21 14:59:28 WARN TransportChannelHandler: Exception in connection from node5.someuniversity.edu/10.0.0.5:60089 java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:233) at sun.nio.ch.IOUtil.read(IOUtil.java:206) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:236) at io.netty.buffer.PooledHeapByteBuf.setBytes(PooledHeapByteBuf.java:234) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:619) 15/04/21 14:59:28 ERROR TransportResponseHandler: Still have 2 requests outstanding when connection from node5.someuniversity.edu/10.0.0.5:60089 is closed 15/04/21 14:59:28 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 2 outstanding blocks after 5000 ms 解决方法
这似乎是与Netty网络系统(块传输服务)相关的错误,添加到
Spark 1.2.将.set(“spark.shuffle.blockTransferService”,“nio”)添加到我的SparkConf修复了错误,所以现在一切都很完美.
我发现有人遇到类似的错误a post on the spark-user mailing list,他们建议尝试nio而不是Netty. SPARK-5085是类似的,因为从Netty到nio的改变解决了他们的问题;然而,他们也可以通过改变一些网络设置来解决这个问题. (我没有尝试这个我自己,因为我不知道我有正确的访问权限在集群上这样做). (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |