From ffac854d45d628ccd4acd312ac3195888c42d44b Mon Sep 17 00:00:00 2001 From: Ethan Berg Date: Thu, 26 Mar 2026 13:21:51 -0700 Subject: [PATCH] feat: added flow wrapper for cache --- cache/build.gradle.kts | 2 + .../kroger/cache/internal/CacheFlowWrapper.kt | 97 +++++++++++++++ .../cache/internal/CacheFlowWrapperTest.kt | 111 ++++++++++++++++++ gradle/libs.versions.toml | 2 + 4 files changed, 212 insertions(+) create mode 100644 cache/src/main/java/com/kroger/cache/internal/CacheFlowWrapper.kt create mode 100644 cache/src/test/java/com/kroger/cache/internal/CacheFlowWrapperTest.kt diff --git a/cache/build.gradle.kts b/cache/build.gradle.kts index 48da38d..8e3976a 100644 --- a/cache/build.gradle.kts +++ b/cache/build.gradle.kts @@ -33,5 +33,7 @@ dependencies { junit5() testImplementation(libs.kotlinx.coroutines.test) + testImplementation(libs.mockk) testImplementation(libs.truth) + testImplementation(libs.turbine) } diff --git a/cache/src/main/java/com/kroger/cache/internal/CacheFlowWrapper.kt b/cache/src/main/java/com/kroger/cache/internal/CacheFlowWrapper.kt new file mode 100644 index 0000000..a3ab76d --- /dev/null +++ b/cache/src/main/java/com/kroger/cache/internal/CacheFlowWrapper.kt @@ -0,0 +1,97 @@ +package com.kroger.cache.internal + +import com.kroger.cache.SnapshotPersistentCache +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.drop +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch + +/** + * MIT License + * + * Copyright (c) 2023 The Kroger Co. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +/** + * A Wrapper class for a SnapshotPersistentCache that exposes changes to the cache via a flow. + * + * **Note this works best when used as a singleton + * + * @param cache the [com.kroger.cache.SnapshotPersistentCache] holding the value(s) on disk + * @param scope the [kotlinx.coroutines.CoroutineScope] to run the flow on + * + */ +public class CacheFlowWrapper( + private val cache: SnapshotPersistentCache, + private val scope: CoroutineScope, +) { + /** + * A reference to the coroutine job used for reading the first value from the [cache] and emitting it on [_cacheValueState] + */ + private val initializerJob: Job + + /** + * The private mutable state flow for the current value + */ + private val _cacheValueState = MutableStateFlow(null) + + /** + * publicly exposed read-only flow on which to read and observe changes to the current value + */ + public val cacheValueFlow: StateFlow = _cacheValueState.asStateFlow() + + /** + * Initialization block reads the value the [cache] and emits it on [_cacheValueState] + * + * This job also updates the value in [cache] for each new value emitted on the flow + * except for the first, which is read from [cache] + */ + init { + initializerJob = scope.launch { + _cacheValueState.value = cache.read() + + cacheValueFlow + .drop(1) + .onEach { + cache.save(it) + }.launchIn(scope) + } + } + + /** + * Updates the value of the StateFlow + * Any update to the state flow will be persisted to the [cache] + * Waits for initialization to finish reading the first value from the [cache] + * before emitting a new value on the flow + * + * @param newValue The new value to be both emitted on the flow, and saved in the [cache] + */ + public suspend fun setValue(newValue: T) { + if (initializerJob.isActive) { + initializerJob.join() + } + _cacheValueState.emit(newValue) + } +} diff --git a/cache/src/test/java/com/kroger/cache/internal/CacheFlowWrapperTest.kt b/cache/src/test/java/com/kroger/cache/internal/CacheFlowWrapperTest.kt new file mode 100644 index 0000000..a8503ed --- /dev/null +++ b/cache/src/test/java/com/kroger/cache/internal/CacheFlowWrapperTest.kt @@ -0,0 +1,111 @@ +package com.kroger.cache.internal + +import app.cash.turbine.test +import com.google.common.truth.Truth.assertThat +import com.kroger.cache.SnapshotPersistentCache +import io.mockk.coEvery +import io.mockk.coVerifySequence +import io.mockk.just +import io.mockk.mockk +import io.mockk.runs +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.test.UnconfinedTestDispatcher +import kotlinx.coroutines.test.advanceTimeBy +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Test + +@OptIn(ExperimentalCoroutinesApi::class) +class CacheFlowWrapperTest { + private val testDispatcher = UnconfinedTestDispatcher() + val testScope = CoroutineScope(CoroutineName("CacheFlowWrapperTest") + testDispatcher) + val fileCache: SnapshotPersistentCache = mockk() + + lateinit var cacheWrapper: CacheFlowWrapper + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun `GIVEN cache is still reading WHEN new value is set THEN set value will wait for read to finish`() = runTest { + val fileCacheValue = "File cache value" + val newValue = "new value" + coEvery { fileCache.read() } coAnswers { + delay(1000) + fileCacheValue + } + coEvery { fileCache.save(any()) } just runs + cacheWrapper = CacheFlowWrapper(fileCache, testScope) + cacheWrapper.cacheValueFlow.test { + assertThat(awaitItem()).isEqualTo(null) + cacheWrapper.setValue(newValue) + assertThat(awaitItem()).isEqualTo(fileCacheValue) + advanceTimeBy(1000) + assertThat(awaitItem()).isEqualTo(newValue) + cancelAndIgnoreRemainingEvents() + } + + coVerifySequence { + fileCache.read() + fileCache.save(eq(newValue)) + } + } + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun `GIVEN cache is done reading WHEN new value is set THEN set value will happen immediately`() = runTest { + val fileCacheValue = "File cache value" + val newValue = "new value" + coEvery { fileCache.read() } coAnswers { + fileCacheValue + } + coEvery { fileCache.save(any()) } just runs + cacheWrapper = CacheFlowWrapper(fileCache, testScope) + cacheWrapper.cacheValueFlow.test { + assertThat(awaitItem()).isEqualTo(fileCacheValue) + advanceTimeBy(1000) + cacheWrapper.setValue(newValue) + assertThat(awaitItem()).isEqualTo(newValue) + cancelAndIgnoreRemainingEvents() + } + + coVerifySequence { + fileCache.read() + fileCache.save(eq(newValue)) + } + } + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun `GIVEN cache is writing values slowly WHEN new values are set in quick succession THEN all values are emitted on flow, and last value is saved to disk`() = runTest { + val firstValue = "first new value" + val secondValue = "second new value" + val thirdValue = "third new value" + val fourthValue = "Fourth new value" + coEvery { fileCache.read() } returns null + coEvery { fileCache.save(any()) } coAnswers { + delay(1000) + } + cacheWrapper = CacheFlowWrapper(fileCache, testScope) + cacheWrapper.cacheValueFlow.test { + assertThat(awaitItem()).isEqualTo(null) + cacheWrapper.setValue(firstValue) + cacheWrapper.setValue(secondValue) + cacheWrapper.setValue(thirdValue) + cacheWrapper.setValue(fourthValue) + advanceTimeBy(2000) + assertThat(awaitItem()).isEqualTo(firstValue) + assertThat(awaitItem()).isEqualTo(secondValue) + assertThat(awaitItem()).isEqualTo(thirdValue) + assertThat(awaitItem()).isEqualTo(fourthValue) + cancelAndIgnoreRemainingEvents() + } + + coVerifySequence { + fileCache.read() + fileCache.save(eq(firstValue)) // start writing the first value + // second and third should be skipped since first isn't done writing yet + fileCache.save(eq(fourthValue)) // fourth and final value is written + } + } +} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 7fc2eaa..f6f5f0b 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -39,6 +39,7 @@ ksp = "1.9.23-1.0.20" mockk = "1.12.5" telemetry = "1.0.0" truth = "1.1.3" +turbine = "1.0.0" [libraries] androidx-activity-compose = { module = "androidx.activity:activity-compose", version.ref = "androidxActivity" } @@ -73,6 +74,7 @@ moshi-ksp = { module = "com.squareup.moshi:moshi-kotlin-codegen", version.ref = truth = { module = "com.google.truth:truth", version.ref = "truth" } telemetry = { module = "com.kroger.telemetry:telemetry", version.ref = "telemetry" } telemetry-android = { module = "com.kroger.telemetry:android", version.ref = "telemetry" } +turbine = { module = "app.cash.turbine:turbine", version.ref = "turbine" } [plugins] android-application = { id = "com.android.application", version.ref = "androidGradlePlugin" }