Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.google.firebase.dataconnect

import com.google.firebase.auth.FirebaseAuth
import com.google.firebase.dataconnect.core.FirebaseDataConnectInternal
import com.google.firebase.dataconnect.testutil.DataConnectBackend
import com.google.firebase.dataconnect.testutil.DataConnectIntegrationTestBase
import com.google.firebase.dataconnect.testutil.InProcessDataConnectGrpcServer
Expand All @@ -44,6 +43,7 @@ import io.kotest.property.Arb
import io.kotest.property.arbitrary.next
import java.util.concurrent.CopyOnWriteArrayList
import kotlin.random.Random
import kotlin.time.Duration.Companion.seconds
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toCollection
import kotlinx.coroutines.launch
Expand All @@ -70,10 +70,10 @@ class AuthIntegrationTest : DataConnectIntegrationTestBase() {

@Test
fun authenticatedRequestsAreSuccessful() = runTest {
signIn()
val person1Id = Arb.alphanumericString(prefix = "person1Id").next()
val person2Id = Arb.alphanumericString(prefix = "person2Id").next()
val person3Id = Arb.alphanumericString(prefix = "person3Id").next()
signIn(personSchema.dataConnect)

personSchema.createPersonAuth(id = person1Id, name = "TestName1", age = 42).execute()
personSchema.createPersonAuth(id = person2Id, name = "TestName2", age = 43).execute()
Expand All @@ -85,7 +85,7 @@ class AuthIntegrationTest : DataConnectIntegrationTestBase() {

@Test
fun queryFailsAfterUserSignsOut() = runTest {
signIn()
signIn(personSchema.dataConnect)
// Verify that we are signed in by executing a query, which should succeed.
personSchema.getPersonAuth(id = "foo").execute()
signOut()
Expand All @@ -98,7 +98,7 @@ class AuthIntegrationTest : DataConnectIntegrationTestBase() {

@Test
fun mutationFailsAfterUserSignsOut() = runTest {
signIn()
signIn(personSchema.dataConnect)
// Verify that we are signed in by executing a mutation, which should succeed.
personSchema.createPersonAuth(id = Random.nextAlphanumericString(20), name = "foo").execute()
signOut()
Expand All @@ -115,20 +115,20 @@ class AuthIntegrationTest : DataConnectIntegrationTestBase() {

@Test
fun queryShouldRetryOnUnauthenticated() = runTest {
signIn()
val responseData = buildStructProto { put("foo", key) }
val executeQueryResponse = executeQueryResponse { data = responseData }
val grpcServer =
inProcessDataConnectGrpcServer.newInstance(
errors = listOf(Status.UNAUTHENTICATED),
executeQueryResponse = executeQueryResponse
executeQueryResponse = executeQueryResponse,
responseDelay = 1.seconds, // avoid getting the same access token from auth emulator
)
val authTokens = CopyOnWriteArrayList<String?>()
backgroundScope.launch {
grpcServer.metadatas.map { it.get(firebaseAuthTokenHeader) }.toCollection(authTokens)
}
val dataConnect = dataConnectFactory.newInstance(auth.app, grpcServer)
(dataConnect as FirebaseDataConnectInternal).awaitAuthReady()
signIn(dataConnect)
val operationName = Arb.dataConnect.operationName().next(rs)
val queryRef =
dataConnect.query(operationName, Unit, serializer<TestData>(), serializer<Unit>())
Expand All @@ -144,20 +144,20 @@ class AuthIntegrationTest : DataConnectIntegrationTestBase() {

@Test
fun mutationShouldRetryOnUnauthenticated() = runTest {
signIn()
val responseData = buildStructProto { put("foo", key) }
val executeMutationResponse = executeMutationResponse { data = responseData }
val grpcServer =
inProcessDataConnectGrpcServer.newInstance(
errors = listOf(Status.UNAUTHENTICATED),
executeMutationResponse = executeMutationResponse
executeMutationResponse = executeMutationResponse,
responseDelay = 1.seconds, // avoid getting the same access token from auth emulator
)
val authTokens = CopyOnWriteArrayList<String?>()
backgroundScope.launch {
grpcServer.metadatas.map { it.get(firebaseAuthTokenHeader) }.toCollection(authTokens)
}
val dataConnect = dataConnectFactory.newInstance(auth.app, grpcServer)
(dataConnect as FirebaseDataConnectInternal).awaitAuthReady()
signIn(dataConnect)
val operationName = Arb.dataConnect.operationName().next(rs)
val mutationRef =
dataConnect.mutation(operationName, Unit, serializer<TestData>(), serializer<Unit>())
Expand All @@ -173,12 +173,13 @@ class AuthIntegrationTest : DataConnectIntegrationTestBase() {

@Test
fun queryShouldOnlyRetryOnUnauthenticatedOnce() = runTest {
signIn()
val grpcServer =
inProcessDataConnectGrpcServer.newInstance(
errors = listOf(Status.UNAUTHENTICATED, Status.UNAUTHENTICATED),
responseDelay = 1.seconds, // avoid getting the same access token from auth emulator
)
val dataConnect = dataConnectFactory.newInstance(auth.app, grpcServer)
signIn(dataConnect)
val operationName = Arb.dataConnect.operationName().next(rs)
val queryRef = dataConnect.query(operationName, Unit, serializer<Unit>(), serializer<Unit>())

Expand All @@ -189,12 +190,13 @@ class AuthIntegrationTest : DataConnectIntegrationTestBase() {

@Test
fun mutationShouldOnlyRetryOnUnauthenticatedOnce() = runTest {
signIn()
val grpcServer =
inProcessDataConnectGrpcServer.newInstance(
errors = listOf(Status.UNAUTHENTICATED, Status.UNAUTHENTICATED),
responseDelay = 1.seconds, // avoid getting the same access token from auth emulator
)
val dataConnect = dataConnectFactory.newInstance(auth.app, grpcServer)
signIn(dataConnect)
val operationName = Arb.dataConnect.operationName().next(rs)
val mutationRef =
dataConnect.mutation(operationName, Unit, serializer<Unit>(), serializer<Unit>())
Expand All @@ -204,8 +206,8 @@ class AuthIntegrationTest : DataConnectIntegrationTestBase() {
thrownException.asClue { it.status shouldBe Status.UNAUTHENTICATED }
}

private suspend fun signIn() {
personSchema.dataConnect.awaitAuthReady()
private suspend fun signIn(dataConnect: FirebaseDataConnect) {
dataConnect.awaitAuthReady()
val authResult = auth.run { signInAnonymously().await() }
withClue("authResult.user returned from signInAnonymously()") {
authResult.user.shouldNotBeNull()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import io.grpc.Status
import io.grpc.StatusException
import io.grpc.okhttp.OkHttpServerBuilder
import io.grpc.stub.StreamObserver
import kotlin.time.Duration
import kotlinx.coroutines.channels.BufferOverflow.DROP_OLDEST
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
Expand All @@ -53,32 +54,40 @@ class InProcessDataConnectGrpcServer :
fun newInstance(
errors: List<Status>? = null,
executeQueryResponse: ExecuteQueryResponse? = null,
executeMutationResponse: ExecuteMutationResponse? = null
executeMutationResponse: ExecuteMutationResponse? = null,
responseDelay: Duration? = null,
): ServerInfo =
createInstance(
errors = errors,
executeQueryResponse = executeQueryResponse,
executeMutationResponse = executeMutationResponse
executeMutationResponse = executeMutationResponse,
responseDelay = responseDelay,
)

override fun createInstance(params: Params?): ServerInfo {
return createInstance(
params?.errors,
params?.executeQueryResponse,
params?.executeMutationResponse
params?.executeMutationResponse,
params?.responseDelay,
)
}

private fun createInstance(
errors: List<Status>? = null,
executeQueryResponse: ExecuteQueryResponse? = null,
executeMutationResponse: ExecuteMutationResponse? = null
errors: List<Status>?,
executeQueryResponse: ExecuteQueryResponse?,
executeMutationResponse: ExecuteMutationResponse?,
responseDelay: Duration?,
): ServerInfo {
val serverInterceptor = ServerInterceptorImpl(errors ?: Params.defaults.errors)
val serverInterceptor =
ServerInterceptorImpl(
errors ?: Params.defaults.errors,
responseDelay ?: Params.defaults.responseDelay,
)
val connectorService =
ConnectorServiceImpl(
executeQueryResponse ?: Params.defaults.executeQueryResponse,
executeMutationResponse ?: Params.defaults.executeMutationResponse
executeMutationResponse ?: Params.defaults.executeMutationResponse,
)
val grpcServer =
OkHttpServerBuilder.forPort(0, InsecureServerCredentials.create())
Expand All @@ -92,7 +101,8 @@ class InProcessDataConnectGrpcServer :
data class Params(
val errors: List<Status> = emptyList(),
val executeQueryResponse: ExecuteQueryResponse? = null,
val executeMutationResponse: ExecuteMutationResponse? = null
val executeMutationResponse: ExecuteMutationResponse? = null,
val responseDelay: Duration? = null,
) {
companion object {
val defaults = Params()
Expand All @@ -105,7 +115,8 @@ class InProcessDataConnectGrpcServer :

data class ServerInfo(val server: Server, val metadatas: Flow<Metadata>)

private class ServerInterceptorImpl(errors: List<Status> = emptyList()) : ServerInterceptor {
private class ServerInterceptorImpl(errors: List<Status>, private val responseDelay: Duration?) :
ServerInterceptor {

private val errors = errors.toList().iterator()

Expand All @@ -121,6 +132,8 @@ class InProcessDataConnectGrpcServer :
): ServerCall.Listener<ReqT> {
check(_metadatas.tryEmit(headers)) { "_metadatas.tryEmit(headers) failed" }

responseDelay?.let { Thread.sleep(it.inWholeMilliseconds) }

synchronized(errors) {
if (errors.hasNext()) {
throw StatusException(errors.next())
Expand All @@ -132,8 +145,8 @@ class InProcessDataConnectGrpcServer :
}

private class ConnectorServiceImpl(
val executeQueryResponse: ExecuteQueryResponse? = null,
val executeMutationResponse: ExecuteMutationResponse? = null
val executeQueryResponse: ExecuteQueryResponse?,
val executeMutationResponse: ExecuteMutationResponse?,
) : ConnectorServiceGrpc.ConnectorServiceImplBase() {
override fun executeQuery(
request: ExecuteQueryRequest,
Expand Down
Loading