Spark运行异常记录
[toc]
# org.apache.spark.shuffle.FetchFailedException: Connection from xxx closed
参考
https://blog.csdn.net/github_36444580/article/details/117415418
https://blog.csdn.net/Shockang/article/details/119063549
原因
数据量较大,过多block fetch操作导致shuffle server挂掉,还会伴随stage中task的失败和重试,对应task的executor上还会有如下的报错内容:“ ERROR shuffle-client-7-2 OneForOneBlockFetcher: Failed while starting block fetches”,或者“Unable to create executor due to Unable to register with external shuffle server due to : java.util.concurrent.TimeoutException: Timeout waiting for task.”。
解决方法
加上--conf spark.reducer.maxReqsInFlight=10和--conf spark.reducer.maxBlocksInFlightPerAddress=10参数(具体数值根据集群情况和业务量判断),来限制block fetch并发数。或者合入Spark3中的remote shuffle service源码特性。
- spark.reducer.maxReqsInFlight:shuffle read时,一个task的一个批次同时发送的请求数量;默认是 Int的最大值;
原理解释:构造远程请求时,单个请求大小限制是 48M / 5,而在一次拉取远程block数据时,是按批次拉取,一个批次的大小限制是 48M,所以理想情况下一个批次会发送5个请求;但如果block的分布不均匀,导致一个请求的请求大小远小于 48M / 5 (例如1M),而一个批次的大小限制是48M,所以这个批次就会发送48个请求;当节点数较多时,一个task的一个批次可能会发送非常多的请求,导致某些节点的入站连接数过多,从而导致失败;
spark.reducer.maxBlocksInFlightPerAddress:shuffle read时,一个节点同时被拉取的最大block数,如果太多可能会导致executor服务或nodemanager崩溃;默认Int的最大值;(spark2.2.1开始支持);
原理解释:shuffle read时每个task都会从shuffle write所在的节点拉取自己的block数据,如果一个shuffle write的executor运行了9个task,就会write9个data文件;如果shuffle read有1000核,那么同时运行1000个task,每个task要到shuffle write所在的executor获取9个block,极端情况下一个shuffle write的executor会被请求9000次;当节点数非常多时,一个shuffle write的executor会同时被很多节点拉取block,从而导致失败;
1
错误详细日志
22/08/08 15:28:06 ERROR TransportResponseHandler: Still have 2 requests outstanding when connection from /10.16.0.9:33385 is closed
22/08/08 15:28:06 ERROR OneForOneBlockFetcher: Failed while starting block fetches
java.io.IOException: Connection from /10.16.0.9:33385 closed
at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147)
at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:225)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:831)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
22/08/08 15:28:06 ERROR OneForOneBlockFetcher: Failed while starting block fetches
java.io.IOException: Connection from /10.16.0.9:33385 closed
at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147)
at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:225)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:831)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
22/08/08 15:28:11 ERROR RetryingBlockTransferor: Exception while beginning fetch of 1 outstanding blocks (after 1 retries)
org.apache.spark.ExecutorDeadException: The relative remote executor(Id: 10), which maintains the block data to fetch is dead.
at org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:136)
at org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:154)
at org.apache.spark.network.shuffle.RetryingBlockTransferor.lambda$initiateRetry$0(RetryingBlockTransferor.java:184)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
22/08/08 15:28:11 ERROR RetryingBlockTransferor: Exception while beginning fetch of 1 outstanding blocks (after 1 retries)
org.apache.spark.ExecutorDeadException: The relative remote executor(Id: 10), which maintains the block data to fetch is dead.
at org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:136)
at org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:154)
at org.apache.spark.network.shuffle.RetryingBlockTransferor.lambda$initiateRetry$0(RetryingBlockTransferor.java:184)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
22/08/08 15:28:11 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from 10.16.0.9:33385
org.apache.spark.ExecutorDeadException: The relative remote executor(Id: 10), which maintains the block data to fetch is dead.
at org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:136)
at org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:154)
at org.apache.spark.network.shuffle.RetryingBlockTransferor.lambda$initiateRetry$0(RetryingBlockTransferor.java:184)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
22/08/08 15:28:11 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from 10.16.0.9:33385
org.apache.spark.ExecutorDeadException: The relative remote executor(Id: 10), which maintains the block data to fetch is dead.
at org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:136)
at org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:154)
at org.apache.spark.network.shuffle.RetryingBlockTransferor.lambda$initiateRetry$0(RetryingBlockTransferor.java:184)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103