@@ -496,10 +496,20 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
496
496
}
497
497
498
498
errCh := parallelize .NewErrorChannel ()
499
+ // Whether all victim pods are already deleted before making API call.
500
+ allPodsAlreadyDeleted := true
499
501
preemptPod := func (index int ) {
500
502
victim := victimPods [index ]
501
- if err := ev .PreemptPod (ctx , c , pod , victim , pluginName ); err != nil {
503
+ err := ev .PreemptPod (ctx , c , pod , victim , pluginName )
504
+ switch {
505
+ case apierrors .IsNotFound (err ):
506
+ logger .V (2 ).Info ("Victim Pod is already deleted" , "preemptor" , klog .KObj (pod ), "node" , c .Name (), "err" , err )
507
+ case err != nil :
502
508
errCh .SendErrorWithCancel (err , cancel )
509
+ default :
510
+ ev .mu .Lock ()
511
+ allPodsAlreadyDeleted = false
512
+ ev .mu .Unlock ()
503
513
}
504
514
}
505
515
@@ -511,11 +521,11 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
511
521
startTime := time .Now ()
512
522
result := metrics .GoroutineResultSuccess
513
523
514
- // Whether all victim pods are already deleted before making API call.
515
- allPodsAlreadyDeleted := true
516
524
defer metrics .PreemptionGoroutinesDuration .WithLabelValues (result ).Observe (metrics .SinceInSeconds (startTime ))
517
525
defer metrics .PreemptionGoroutinesExecutionTotal .WithLabelValues (result ).Inc ()
518
526
defer func () {
527
+ ev .mu .Lock ()
528
+ defer ev .mu .Unlock ()
519
529
// When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case.
520
530
// So, we should move the Pod to the activeQ.
521
531
if result == metrics .GoroutineResultError ||
@@ -547,15 +557,9 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
547
557
// and the pod could end up stucking at the unschedulable pod pool
548
558
// by all the pod removal events being ignored.
549
559
ev .Handler .Parallelizer ().Until (ctx , len (victimPods )- 1 , preemptPod , ev .PluginName )
550
- err := errCh .ReceiveError ()
551
- switch {
552
- case apierrors .IsNotFound (err ):
553
- logger .V (2 ).Info ("Victim Pod is already deleted" , "preemptor" , klog .KObj (pod ), "node" , c .Name (), "err" , err )
554
- case err != nil :
560
+ if err := errCh .ReceiveError (); err != nil {
555
561
utilruntime .HandleErrorWithContext (ctx , err , "Error occurred during async preemption" )
556
562
result = metrics .GoroutineResultError
557
- default :
558
- allPodsAlreadyDeleted = false
559
563
}
560
564
}
561
565
@@ -569,6 +573,7 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
569
573
logger .V (2 ).Info ("Victim Pod is already deleted" , "preemptor" , klog .KObj (pod ), "node" , c .Name (), "err" , err )
570
574
case err != nil :
571
575
utilruntime .HandleErrorWithContext (ctx , err , "Error occurred during async preemption" )
576
+ cancel ()
572
577
result = metrics .GoroutineResultError
573
578
default :
574
579
allPodsAlreadyDeleted = false
0 commit comments