4444import java .util .ArrayList ;
4545import java .util .List ;
4646import java .util .Random ;
47+ import java .util .concurrent .ExecutionException ;
4748import java .util .concurrent .ExecutorService ;
4849import java .util .concurrent .Executors ;
4950import java .util .concurrent .Future ;
5051import java .util .concurrent .TimeUnit ;
5152import java .util .concurrent .TimeoutException ;
5253import java .util .concurrent .atomic .AtomicInteger ;
53- import org .junit .Ignore ;
5454import org .junit .Test ;
5555import org .junit .runner .RunWith ;
5656import org .junit .runners .JUnit4 ;
@@ -500,7 +500,6 @@ private void testRejectedReserveRelease(
500500 }
501501
502502 flowController .release (1 , 1 );
503-
504503 flowController .reserve (maxElementCount , maxNumBytes );
505504 flowController .release (maxElementCount , maxNumBytes );
506505 }
@@ -523,11 +522,11 @@ public void testConcurrentUpdateThresholds_blocking() throws Exception {
523522 final AtomicInteger totalDecreased = new AtomicInteger (0 );
524523 final AtomicInteger releasedCounter = new AtomicInteger (0 );
525524
526- List <Thread > reserveThreads =
525+ List <Future > reserveThreads =
527526 testConcurrentUpdates (
528- flowController , 100 , 100 , 100 , totalIncreased , totalDecreased , releasedCounter );
529- for (Thread t : reserveThreads ) {
530- t .join (200 );
527+ flowController , 100 , 100 , 10 , totalIncreased , totalDecreased , releasedCounter );
528+ for (Future t : reserveThreads ) {
529+ t .get (200 , TimeUnit . MILLISECONDS );
531530 }
532531 assertEquals (reserveThreads .size (), releasedCounter .get ());
533532 assertTrue (totalIncreased .get () > 0 );
@@ -539,9 +538,6 @@ public void testConcurrentUpdateThresholds_blocking() throws Exception {
539538 testBlockingReserveRelease (flowController , 0 , expectedValue );
540539 }
541540
542- // This test is very flaky. Remove @Ignore once https://github.com/googleapis/gax-java/issues/1359
543- // is fixed.
544- @ Ignore
545541 @ Test
546542 public void testConcurrentUpdateThresholds_nonBlocking () throws Exception {
547543 int initialValue = 5000 ;
@@ -559,11 +555,11 @@ public void testConcurrentUpdateThresholds_nonBlocking() throws Exception {
559555 AtomicInteger totalIncreased = new AtomicInteger (0 );
560556 AtomicInteger totalDecreased = new AtomicInteger (0 );
561557 AtomicInteger releasedCounter = new AtomicInteger (0 );
562- List <Thread > reserveThreads =
558+ List <Future > reserveThreads =
563559 testConcurrentUpdates (
564560 flowController , 100 , 100 , 100 , totalIncreased , totalDecreased , releasedCounter );
565- for (Thread t : reserveThreads ) {
566- t .join (200 );
561+ for (Future t : reserveThreads ) {
562+ t .get (200 , TimeUnit . MILLISECONDS );
567563 }
568564 assertEquals (reserveThreads .size (), releasedCounter .get ());
569565 assertTrue (totalIncreased .get () > 0 );
@@ -698,8 +694,7 @@ public void run() {
698694 };
699695 // blocked by element. Reserve all 5 elements first, reserve in the runnable will be blocked
700696 flowController .reserve (5 , 1 );
701- ExecutorService executor = Executors .newCachedThreadPool ();
702- Future <?> finished1 = executor .submit (runnable );
697+ Future <?> finished1 = Executors .newSingleThreadExecutor ().submit (runnable );
703698 try {
704699 finished1 .get (50 , TimeUnit .MILLISECONDS );
705700 fail ("reserve should block" );
@@ -722,7 +717,7 @@ public void run() {
722717
723718 // Similar to blocked by element, test blocking by bytes.
724719 flowController .reserve (1 , 5 );
725- Future <?> finished2 = executor .submit (runnable );
720+ Future <?> finished2 = Executors . newSingleThreadExecutor () .submit (runnable );
726721 try {
727722 finished2 .get (50 , TimeUnit .MILLISECONDS );
728723 fail ("reserve should block" );
@@ -739,15 +734,15 @@ public void run() {
739734 .isAtLeast (currentTime );
740735 }
741736
742- private List <Thread > testConcurrentUpdates (
737+ private List <Future > testConcurrentUpdates (
743738 final FlowController flowController ,
744739 final int increaseStepRange ,
745740 final int decreaseStepRange ,
746741 final int reserve ,
747742 final AtomicInteger totalIncreased ,
748743 final AtomicInteger totalDecreased ,
749744 final AtomicInteger releasedCounter )
750- throws InterruptedException {
745+ throws InterruptedException , TimeoutException , ExecutionException {
751746 final Random random = new Random ();
752747 Runnable increaseRunnable =
753748 new Runnable () {
@@ -779,22 +774,19 @@ public void run() {
779774 }
780775 }
781776 };
782- List <Thread > updateThreads = new ArrayList <>();
783- List <Thread > reserveReleaseThreads = new ArrayList <>();
784- for (int i = 0 ; i < 20 ; i ++) {
785- Thread increase = new Thread (increaseRunnable );
786- Thread decrease = new Thread (decreaseRunnable );
787- Thread reserveRelease = new Thread (reserveReleaseRunnable );
788- updateThreads .add (increase );
789- updateThreads .add (decrease );
790- reserveReleaseThreads .add (reserveRelease );
791- increase .start ();
792- decrease .start ();
793- reserveRelease .start ();
777+ List <Future > updateFuture = new ArrayList <>();
778+ List <Future > reserveReleaseFuture = new ArrayList <>();
779+ ExecutorService executors = Executors .newFixedThreadPool (10 );
780+ ExecutorService reserveExecutor = Executors .newFixedThreadPool (10 );
781+ for (int i = 0 ; i < 5 ; i ++) {
782+ updateFuture .add (executors .submit (increaseRunnable ));
783+ updateFuture .add (executors .submit (decreaseRunnable ));
784+ reserveReleaseFuture .add (reserveExecutor .submit (reserveReleaseRunnable ));
794785 }
795- for (Thread t : updateThreads ) {
796- t .join ( 10 );
786+ for (Future t : updateFuture ) {
787+ t .get ( 50 , TimeUnit . MILLISECONDS );
797788 }
798- return reserveReleaseThreads ;
789+ executors .shutdown ();
790+ return reserveReleaseFuture ;
799791 }
800792}
0 commit comments