【原創】大叔問題定位分享(7)Spark任務中Job進度卡住不動

 Spark2.1.1java

 

最近運行spark任務時會發現任務常常運行好久,具體job以下:node

Job Id  ▾git

Descriptiongithub

Submittedapache

Durationbash

Stages: Succeeded/Totalapp

Tasks (for all stages): Succeeded/Totaloop

16ui

(kill)treeReduce at CRFWithLBFGS.scala:160this

2018/12/03 12:39:50

2.3 h

0/5

196/4723

 job中正在運行的stage以下:

Stage Id  ▾

Description

Submitted

Duration

Tasks: Succeeded/Total

Input

Output

Shuffle Read

Shuffle Write

60

(kill)treeReduce at CRFWithLBFGS.scala:160+details

2018/12/03 12:39:57

2.3 h

196/200

4.5 GB

   

1455.1 MB

 該stage中有4個task一直處於running狀態,這些task的統計信息異常(Input Size / RecordsShuffle Write Size / Records均爲0.0B/0),而且這4個task都位於同一個executor上:

33

8938

0

RUNNING

PROCESS_LOCAL

12 / $executor_server_ip

stdout

stderr

2018/12/03 12:39:57

2.3 h

 

0.0 B / 0

 

0.0 B / 0

 有問題的task所在的executor統計信息也有異常(Total Tasks0),該executor以下:

12

stdout

stderr

$executor_server_ip:36755

0 ms

0

0

0

0

0.0 B / 0

0.0 B / 0

 

 此時Driver堆棧信息以下:

"Driver" #26 prio=5 os_prio=0 tid=0x00007f163a116000 nid=0x5192 waiting on condition [0x00007f15bb9a0000]

   java.lang.Thread.State: WAITING (parking)

        at sun.misc.Unsafe.park(Native Method)

        - parking to wait for  <0x00000001a8c4f9e0> (a scala.concurrent.impl.Promise$CompletionLatch)

        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)

        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)

        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)

        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)

        at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)

        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)

        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153)

        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:619)

        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)

        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1988)

        at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1026)

        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

        at org.apache.spark.rdd.RDD.reduce(RDD.scala:1008)

        at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1151)

        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

        at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1128)

        at org.apache.spark.rdd.RDD$$anonfun$treeReduce$1.apply(RDD.scala:1059)

        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

        at org.apache.spark.rdd.RDD.treeReduce(RDD.scala:1037)

        at breeze.optimize.CachedDiffFunction.calculate(CachedDiffFunction.scala:23)

        at breeze.optimize.LineSearch$$anon$1.calculate(LineSearch.scala:41)

        at breeze.optimize.LineSearch$$anon$1.calculate(LineSearch.scala:30)

        at breeze.optimize.StrongWolfeLineSearch.breeze$optimize$StrongWolfeLineSearch$$phi$1(StrongWolfe.scala:69)

        at breeze.optimize.StrongWolfeLineSearch$$anonfun$minimize$1.apply$mcVI$sp(StrongWolfe.scala:142)

        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)

        at breeze.optimize.StrongWolfeLineSearch.minimize(StrongWolfe.scala:141)

        at breeze.optimize.LBFGS.determineStepSize(LBFGS.scala:78)

        at breeze.optimize.LBFGS.determineStepSize(LBFGS.scala:40)

        at breeze.optimize.FirstOrderMinimizer$$anonfun$infiniteIterations$1.apply(FirstOrderMinimizer.scala:64)

        at breeze.optimize.FirstOrderMinimizer$$anonfun$infiniteIterations$1.apply(FirstOrderMinimizer.scala:62)

        at scala.collection.Iterator$$anon$7.next(Iterator.scala:129)

        at breeze.util.IteratorImplicits$RichIterator$$anon$2.next(Implicits.scala:71)

        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)

        at scala.collection.immutable.Range.foreach(Range.scala:160)

        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)

        at app.package.AppClass.main(AppClass.scala)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:497)

        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)

可見正在runJob,而且等待executor執行結果;

 

有問題的executor上堆棧信息有一個可疑的thread長時間一直在running:

"shuffle-client-5-4" #94 daemon prio=5 os_prio=0 tid=0x00007fbae0e42800 nid=0x2a3a runnable [0x00007fbae4760000]

   java.lang.Thread.State: RUNNABLE

        at io.netty.util.Recycler$Stack.scavengeSome(Recycler.java:476)

        at io.netty.util.Recycler$Stack.scavenge(Recycler.java:454)

        at io.netty.util.Recycler$Stack.pop(Recycler.java:435)

        at io.netty.util.Recycler.get(Recycler.java:144)

        at io.netty.buffer.PooledUnsafeDirectByteBuf.newInstance(PooledUnsafeDirectByteBuf.java:39)

        at io.netty.buffer.PoolArena$DirectArena.newByteBuf(PoolArena.java:727)

        at io.netty.buffer.PoolArena.allocate(PoolArena.java:140)

        at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:271)

        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177)

        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168)

        at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129)

        at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)

        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)

        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)

        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)

        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)

        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)

        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)

        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)

        at java.lang.Thread.run(Thread.java:745)

 

ps:出問題的executor上當時的內存資源很空閒,進程狀態也正常:

-bash-4.2$ free -m

              total        used        free      shared  buff/cache   available

Mem:         257676       29251        5274         517      223150      226669

Swap:             0           0           0

 

懷疑此處可能有死循環,spark2.1.1使用的netty版本是4.0.42,查看netty代碼:

io.netty.util.Recycler

        boolean scavengeSome() {

            WeakOrderQueue cursor = this.cursor;

            if (cursor == null) {

                cursor = head;

                if (cursor == null) {

                    return false;

                }

            }

 

            boolean success = false;

            WeakOrderQueue prev = this.prev;

            do {

                if (cursor.transfer(this)) {

                    success = true;

                    break;

                }

 

                WeakOrderQueue next = cursor.next;

                if (cursor.owner.get() == null) {

                    // If the thread associated with the queue is gone, unlink it, after

                    // performing a volatile read to confirm there is no data left to collect.

                    // We never unlink the first queue, as we don't want to synchronize on updating the head.

                    if (cursor.hasFinalData()) {

                        for (;;) {

                            if (cursor.transfer(this)) {

                                success = true;

                            } else {

                                break;

                            }

                        }

                    }

                    if (prev != null) {

                        prev.next = next;

                    }

                } else {

                    prev = cursor;

                }

 

                cursor = next;

 

            } while (cursor != null && !success);

 

            this.prev = prev;

            this.cursor = cursor;

            return success;

        }

 

 

問題在於cursor初始化的時候沒有清空prev:

            if (cursor == null) {

                cursor = head;

該問題在4.0.43中被修復,升級spark2.1.1中的netty到4.0.43或以上版本能夠修復問題;

 

官方issues位於:https://github.com/netty/netty/issues/6153

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息