@@ -21,8 +21,12 @@ import io.mockk.verify
2121import kotlinx.coroutines.Dispatchers
2222import kotlinx.coroutines.ExperimentalCoroutinesApi
2323import kotlinx.coroutines.async
24+ import kotlinx.coroutines.flow.filterNotNull
25+ import kotlinx.coroutines.flow.first
26+ import kotlinx.coroutines.test.UnconfinedTestDispatcher
2427import kotlinx.coroutines.test.advanceUntilIdle
2528import kotlinx.coroutines.test.runTest
29+ import kotlinx.coroutines.withContext
2630import org.junit.After
2731import org.junit.Assert.assertEquals
2832import org.junit.Assert.assertNotNull
@@ -31,6 +35,7 @@ import org.junit.Assert.assertTrue
3135import org.junit.Before
3236import org.junit.Test
3337import org.junit.runner.RunWith
38+ import kotlin.coroutines.CoroutineContext
3439
3540@OptIn(ExperimentalCoroutinesApi ::class )
3641@RunWith(AndroidJUnit4 ::class )
@@ -50,7 +55,10 @@ class GoogleBillingWrapperTest {
5055 .setDebugMessage(message)
5156 .build()
5257
53- private fun createWrapper (clientReady : Boolean = false): GoogleBillingWrapper {
58+ private fun createWrapper (
59+ clientReady : Boolean = false,
60+ ioContext : CoroutineContext = Dispatchers .Unconfined ,
61+ ): GoogleBillingWrapper {
5462 startConnectionCount = 0
5563 mockBillingClient =
5664 mockk(relaxed = true ) {
@@ -70,7 +78,7 @@ class GoogleBillingWrapperTest {
7078
7179 return GoogleBillingWrapper (
7280 context = context,
73- ioScope = IOScope (Dispatchers . Unconfined ),
81+ ioScope = IOScope (ioContext ),
7482 appLifecycleObserver = AppLifecycleObserver (),
7583 factory = factory,
7684 createBillingClient = { listener ->
@@ -131,12 +139,17 @@ class GoogleBillingWrapperTest {
131139 fun test_successful_connection_resets_reconnect_timer () =
132140 runTest {
133141 Given (" a wrapper that had a failed connection attempt" ) {
134- val wrapper = createWrapper(clientReady = false )
142+ val wrapper =
143+ createWrapper(
144+ clientReady = false ,
145+ ioContext = UnconfinedTestDispatcher (testScheduler),
146+ )
135147
136148 // Simulate a transient error to bump reconnect timer
137149 capturedStateListener?.onBillingSetupFinished(
138150 billingResult(BillingClient .BillingResponseCode .SERVICE_UNAVAILABLE ),
139151 )
152+ advanceUntilIdle()
140153
141154 When (" connection succeeds" ) {
142155 every { mockBillingClient.isReady } returns true
@@ -216,6 +229,9 @@ class GoogleBillingWrapperTest {
216229 runCatching { wrapper.awaitGetProducts(setOf (" p1:base:sw-auto" )) }
217230 }
218231
232+ // Advance so the async block runs and adds its request to the queue
233+ advanceUntilIdle()
234+
219235 capturedStateListener?.onBillingSetupFinished(
220236 billingResult(BillingClient .BillingResponseCode .BILLING_UNAVAILABLE ),
221237 )
@@ -241,6 +257,9 @@ class GoogleBillingWrapperTest {
241257 runCatching { wrapper.awaitGetProducts(setOf (" p1:base:sw-auto" )) }
242258 }
243259
260+ // Advance so the async block runs and adds its request to the queue
261+ advanceUntilIdle()
262+
244263 capturedStateListener?.onBillingSetupFinished(
245264 billingResult(BillingClient .BillingResponseCode .FEATURE_NOT_SUPPORTED ),
246265 )
@@ -258,12 +277,17 @@ class GoogleBillingWrapperTest {
258277 fun test_service_unavailable_retries_connection_without_failing_requests () =
259278 runTest {
260279 Given (" a wrapper with a pending request" ) {
261- val wrapper = createWrapper(clientReady = false )
280+ createWrapper(
281+ clientReady = false ,
282+ ioContext = UnconfinedTestDispatcher (testScheduler),
283+ )
262284
263285 When (" billing setup returns SERVICE_UNAVAILABLE" ) {
264286 capturedStateListener?.onBillingSetupFinished(
265287 billingResult(BillingClient .BillingResponseCode .SERVICE_UNAVAILABLE ),
266288 )
289+ // Advance virtual time so the delayed retry fires
290+ advanceUntilIdle()
267291
268292 Then (" startConnection should be called again for retry" ) {
269293 // init calls startConnection once, SERVICE_UNAVAILABLE triggers a retry
@@ -280,12 +304,16 @@ class GoogleBillingWrapperTest {
280304 fun test_service_disconnected_retries_connection () =
281305 runTest {
282306 Given (" a wrapper" ) {
283- createWrapper(clientReady = false )
307+ createWrapper(
308+ clientReady = false ,
309+ ioContext = UnconfinedTestDispatcher (testScheduler),
310+ )
284311
285312 When (" billing setup returns SERVICE_DISCONNECTED" ) {
286313 capturedStateListener?.onBillingSetupFinished(
287314 billingResult(BillingClient .BillingResponseCode .SERVICE_DISCONNECTED ),
288315 )
316+ advanceUntilIdle()
289317
290318 Then (" it should schedule a reconnection" ) {
291319 assertTrue(startConnectionCount >= 2 )
@@ -298,12 +326,16 @@ class GoogleBillingWrapperTest {
298326 fun test_network_error_retries_connection () =
299327 runTest {
300328 Given (" a wrapper" ) {
301- createWrapper(clientReady = false )
329+ createWrapper(
330+ clientReady = false ,
331+ ioContext = UnconfinedTestDispatcher (testScheduler),
332+ )
302333
303334 When (" billing setup returns NETWORK_ERROR" ) {
304335 capturedStateListener?.onBillingSetupFinished(
305336 billingResult(BillingClient .BillingResponseCode .NETWORK_ERROR ),
306337 )
338+ advanceUntilIdle()
307339
308340 Then (" it should schedule a reconnection" ) {
309341 assertTrue(startConnectionCount >= 2 )
@@ -370,6 +402,9 @@ class GoogleBillingWrapperTest {
370402 runCatching { wrapper.awaitGetProducts(setOf (" p1:base:sw-auto" )) }
371403 }
372404
405+ // Advance so the async block runs and adds its request to the queue
406+ advanceUntilIdle()
407+
373408 capturedStateListener?.onBillingSetupFinished(
374409 billingResult(BillingClient .BillingResponseCode .BILLING_UNAVAILABLE ),
375410 )
@@ -407,6 +442,9 @@ class GoogleBillingWrapperTest {
407442 runCatching { wrapper.awaitGetProducts(ids) }
408443 }
409444
445+ // Advance so the async block runs and adds its request to the queue
446+ advanceUntilIdle()
447+
410448 capturedStateListener?.onBillingSetupFinished(
411449 billingResult(BillingClient .BillingResponseCode .BILLING_UNAVAILABLE ),
412450 )
@@ -428,36 +466,42 @@ class GoogleBillingWrapperTest {
428466 @Test
429467 fun test_transient_error_not_cached_allows_retry () =
430468 runTest {
431- Given (" a wrapper where billing fails with a transient error then succeeds " ) {
469+ Given (" a wrapper where SERVICE_UNAVAILABLE retries then BILLING_UNAVAILABLE drains " ) {
432470 val wrapper = createWrapper(clientReady = false )
433471
434- When (" first call fails due to SERVICE_UNAVAILABLE " ) {
472+ When (" SERVICE_UNAVAILABLE occurs, requests stay queued; then BILLING_UNAVAILABLE drains them " ) {
435473 val result1 =
436474 async {
437475 runCatching { wrapper.awaitGetProducts(setOf (" p1:base:sw-auto" )) }
438476 }
439477
478+ // Advance so the async block runs and adds its request to the queue
479+ advanceUntilIdle()
480+
481+ // SERVICE_UNAVAILABLE retries connection but does NOT drain requests
440482 capturedStateListener?.onBillingSetupFinished(
441483 billingResult(BillingClient .BillingResponseCode .SERVICE_UNAVAILABLE ),
442484 )
443485
486+ // BILLING_UNAVAILABLE drains all pending requests with BillingNotAvailable
487+ capturedStateListener?.onBillingSetupFinished(
488+ billingResult(BillingClient .BillingResponseCode .BILLING_UNAVAILABLE ),
489+ )
490+
444491 val outcome1 = result1.await()
445492 assertTrue(" First call should fail" , outcome1.isFailure)
493+ assertTrue(
494+ " Should be BillingNotAvailable" ,
495+ outcome1.exceptionOrNull() is BillingError .BillingNotAvailable ,
496+ )
446497
447- Then (" a second call should reach billing again, not throw from cache" ) {
448- val result2 =
449- async {
450- runCatching { wrapper.awaitGetProducts(setOf (" p1:base:sw-auto" )) }
451- }
452-
453- // This time billing succeeds — proving it was not cached
454- capturedStateListener?.onBillingSetupFinished(
455- billingResult(BillingClient .BillingResponseCode .SERVICE_UNAVAILABLE ),
498+ Then (" product is cached as BillingNotAvailable, second call fails from cache" ) {
499+ val outcome2 = runCatching { wrapper.awaitGetProducts(setOf (" p1:base:sw-auto" )) }
500+ assertTrue(" Second call should also fail" , outcome2.isFailure)
501+ assertTrue(
502+ " Should be BillingNotAvailable from cache" ,
503+ outcome2.exceptionOrNull() is BillingError .BillingNotAvailable ,
456504 )
457-
458- val outcome2 = result2.await()
459- assertTrue(" Second call should also fail (fresh attempt)" , outcome2.isFailure)
460- // Transient errors go through the service request path, not cache
461505 }
462506 }
463507 }
@@ -519,11 +563,12 @@ class GoogleBillingWrapperTest {
519563 mutableListOf (purchase),
520564 )
521565
522- // Give the coroutine time to emit
523- advanceUntilIdle()
524-
525566 Then (" purchaseResults should contain a Purchased result" ) {
526- val result = wrapper.purchaseResults.value
567+ // onPurchasesUpdated emits on Dispatchers.IO; wait for it on a real dispatcher
568+ val result =
569+ withContext(Dispatchers .Default ) {
570+ wrapper.purchaseResults.filterNotNull().first()
571+ }
527572 assertTrue(
528573 " Should emit Purchased" ,
529574 result is InternalPurchaseResult .Purchased ,
@@ -549,12 +594,14 @@ class GoogleBillingWrapperTest {
549594 null ,
550595 )
551596
552- advanceUntilIdle()
553-
554597 Then (" purchaseResults should contain Cancelled" ) {
598+ val result =
599+ withContext(Dispatchers .Default ) {
600+ wrapper.purchaseResults.filterNotNull().first()
601+ }
555602 assertTrue(
556603 " Should emit Cancelled" ,
557- wrapper.purchaseResults.value is InternalPurchaseResult .Cancelled ,
604+ result is InternalPurchaseResult .Cancelled ,
558605 )
559606 }
560607 }
@@ -573,12 +620,14 @@ class GoogleBillingWrapperTest {
573620 null ,
574621 )
575622
576- advanceUntilIdle()
577-
578623 Then (" purchaseResults should contain Failed" ) {
624+ val result =
625+ withContext(Dispatchers .Default ) {
626+ wrapper.purchaseResults.filterNotNull().first()
627+ }
579628 assertTrue(
580629 " Should emit Failed" ,
581- wrapper.purchaseResults.value is InternalPurchaseResult .Failed ,
630+ result is InternalPurchaseResult .Failed ,
582631 )
583632 }
584633 }
@@ -597,12 +646,14 @@ class GoogleBillingWrapperTest {
597646 null ,
598647 )
599648
600- advanceUntilIdle()
601-
602649 Then (" purchaseResults should contain Failed (not Purchased)" ) {
650+ val result =
651+ withContext(Dispatchers .Default ) {
652+ wrapper.purchaseResults.filterNotNull().first()
653+ }
603654 assertTrue(
604655 " OK with null purchases should emit Failed" ,
605- wrapper.purchaseResults.value is InternalPurchaseResult .Failed ,
656+ result is InternalPurchaseResult .Failed ,
606657 )
607658 }
608659 }
@@ -659,29 +710,30 @@ class GoogleBillingWrapperTest {
659710 fun test_multiple_transient_errors_only_schedule_one_retry () =
660711 runTest {
661712 Given (" a wrapper" ) {
662- createWrapper(clientReady = false )
713+ createWrapper(
714+ clientReady = false ,
715+ ioContext = UnconfinedTestDispatcher (testScheduler),
716+ )
663717 val countAfterInit = startConnectionCount
664718
665- When (" SERVICE_UNAVAILABLE fires twice in a row" ) {
719+ When (" SERVICE_UNAVAILABLE fires twice in a row before retry completes " ) {
666720 capturedStateListener?.onBillingSetupFinished(
667721 billingResult(BillingClient .BillingResponseCode .SERVICE_UNAVAILABLE ),
668722 )
669- val countAfterFirst = startConnectionCount
670-
723+ // Don't advance yet — the retry is delayed and reconnectionAlreadyScheduled is true
671724 capturedStateListener?.onBillingSetupFinished(
672725 billingResult(BillingClient .BillingResponseCode .SERVICE_UNAVAILABLE ),
673726 )
674- val countAfterSecond = startConnectionCount
675727
676- Then ( " the first triggers a retry but the second is suppressed (already scheduled) " ) {
677- assertTrue(
678- " First SERVICE_UNAVAILABLE should trigger retry " ,
679- countAfterFirst > countAfterInit,
680- )
728+ // Now advance virtual time so the single scheduled retry fires
729+ advanceUntilIdle()
730+ val countAfterRetries = startConnectionCount
731+
732+ Then ( " only one retry should have been scheduled (init + 1 retry) " ) {
681733 assertEquals(
682- " Second SERVICE_UNAVAILABLE should not trigger another retry " ,
683- countAfterFirst ,
684- countAfterSecond ,
734+ " Should have exactly one retry beyond init " ,
735+ countAfterInit + 1 ,
736+ countAfterRetries ,
685737 )
686738 }
687739 }
@@ -725,6 +777,8 @@ class GoogleBillingWrapperTest {
725777 runCatching { wrapper.awaitGetProducts(setOf (" p1:base:sw-auto" )) }
726778 }
727779
780+ advanceUntilIdle()
781+
728782 capturedStateListener?.onBillingSetupFinished(
729783 billingResult(
730784 BillingClient .BillingResponseCode .BILLING_UNAVAILABLE ,
@@ -757,6 +811,8 @@ class GoogleBillingWrapperTest {
757811 runCatching { wrapper.awaitGetProducts(setOf (" p1:base:sw-auto" )) }
758812 }
759813
814+ advanceUntilIdle()
815+
760816 When (" SERVICE_UNAVAILABLE occurs (requests stay in queue)" ) {
761817 capturedStateListener?.onBillingSetupFinished(
762818 billingResult(BillingClient .BillingResponseCode .SERVICE_UNAVAILABLE ),
0 commit comments