Skip to content

Commit cb9bdc3

Browse files
authored
fix(bqjdbc): fallback to RestAPI if ReadAPI is not accessible (#13018)
1 parent cb56c04 commit cb9bdc3

2 files changed

Lines changed: 122 additions & 21 deletions

File tree

java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import com.google.api.core.InternalApi;
2020
import com.google.api.gax.paging.Page;
21+
import com.google.api.gax.rpc.ApiException;
22+
import com.google.api.gax.rpc.StatusCode;
2123
import com.google.cloud.Tuple;
2224
import com.google.cloud.bigquery.BigQuery;
2325
import com.google.cloud.bigquery.BigQuery.JobListOption;
@@ -56,6 +58,8 @@
5658
import com.google.common.annotations.VisibleForTesting;
5759
import com.google.common.collect.ImmutableList;
5860
import com.google.common.util.concurrent.Uninterruptibles;
61+
import io.grpc.Status;
62+
import io.grpc.StatusRuntimeException;
5963
import java.lang.ref.ReferenceQueue;
6064
import java.sql.Connection;
6165
import java.sql.ResultSet;
@@ -853,9 +857,8 @@ Thread populateArrowBufferedQueue(
853857
rowsRead += response.getRowCount();
854858
}
855859
break;
856-
} catch (com.google.api.gax.rpc.ApiException e) {
857-
if (e.getStatusCode().getCode()
858-
== com.google.api.gax.rpc.StatusCode.Code.NOT_FOUND) {
860+
} catch (ApiException e) {
861+
if (e.getStatusCode().getCode() == StatusCode.Code.NOT_FOUND) {
859862
LOG.warning("Read session expired or not found: %s", e.getMessage());
860863
enqueueError(arrowBatchWrapperBlockingQueue, e);
861864
break;
@@ -913,25 +916,47 @@ Thread populateArrowBufferedQueue(
913916

914917
/** Executes SQL query using either fast query path or read API */
915918
void processQueryResponse(String query, TableResult results) throws SQLException {
916-
LOG.finer(
917-
"API call completed{Query=%s, Parent Job ID=%s, Total rows=%s} ",
918-
query, results.getJobId(), results.getTotalRows());
919-
JobId currentJobId = results.getJobId();
920-
if (currentJobId == null) {
921-
LOG.fine("Standard API with Stateless query used.");
922-
this.currentResultSet = processJsonResultSet(results);
923-
} else if (useReadAPI(results)) {
924-
LOG.fine("HighThroughputAPI used.");
925-
LOG.info("HTAPI job ID: " + currentJobId.getJob());
926-
this.currentResultSet = processArrowResultSet(results);
927-
} else {
928-
// read API cannot be used.
929-
LOG.fine("Standard API used.");
930-
this.currentResultSet = processJsonResultSet(results);
919+
JobId jobId = results.getJobId();
920+
String queryId = results.getQueryId();
921+
LOG.info(
922+
"Processing query response. JobId: %s, QueryId: %s, Total rows: %s",
923+
jobId, queryId, results.getTotalRows());
924+
LOG.fine("Processing query response. Query: %s", query);
925+
926+
ResultSet resultSet = null;
927+
if (jobId != null && useReadAPI(results)) {
928+
try {
929+
LOG.info("Using ReadAPI to read the data.");
930+
resultSet = processArrowResultSet(results);
931+
} catch (SQLException e) {
932+
if (!isPermissionDeniedException(e)) {
933+
throw e;
934+
}
935+
LOG.log(Level.WARNING, "Permission denied for Read API, falling back to JSON API", e);
936+
}
931937
}
938+
939+
if (resultSet == null) {
940+
LOG.info("Using Standard API to read the data.");
941+
resultSet = processJsonResultSet(results);
942+
}
943+
this.currentResultSet = resultSet;
932944
this.currentUpdateCount = -1;
933945
}
934946

947+
private boolean isPermissionDeniedException(Throwable t) {
948+
while (t != null) {
949+
if (t instanceof StatusRuntimeException) {
950+
return ((StatusRuntimeException) t).getStatus().getCode() == Status.Code.PERMISSION_DENIED;
951+
}
952+
if (t instanceof ApiException) {
953+
return ((ApiException) t).getStatusCode().getCode() == StatusCode.Code.PERMISSION_DENIED;
954+
}
955+
t = t.getCause();
956+
}
957+
return false;
958+
}
959+
935960
// The read Ratio should be met
936961
// AND the User must not have disabled the Read API
937962
@VisibleForTesting
@@ -967,9 +992,6 @@ private boolean meetsReadRatio(TableResult results) {
967992
}
968993

969994
BigQueryJsonResultSet processJsonResultSet(TableResult results) {
970-
String jobIdOrQueryId =
971-
results.getJobId() == null ? results.getQueryId() : results.getJobId().getJob();
972-
LOG.info("BigQuery Job %s completed. Fetching results.", jobIdOrQueryId);
973995
List<Thread> threadList = new ArrayList<Thread>();
974996

975997
Schema schema = results.getSchema();

java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@
2020
import static com.google.common.truth.Truth.assertThat;
2121
import static org.junit.jupiter.api.Assertions.assertEquals;
2222
import static org.junit.jupiter.api.Assertions.assertTrue;
23+
import static org.junit.jupiter.api.Assertions.fail;
2324
import static org.mockito.ArgumentMatchers.any;
2425
import static org.mockito.ArgumentMatchers.eq;
2526
import static org.mockito.Mockito.doReturn;
2627
import static org.mockito.Mockito.mock;
2728
import static org.mockito.Mockito.verify;
2829

30+
import com.google.api.gax.rpc.ApiException;
31+
import com.google.api.gax.rpc.StatusCode;
2932
import com.google.cloud.ServiceOptions;
3033
import com.google.cloud.bigquery.BigQuery;
3134
import com.google.cloud.bigquery.BigQuery.QueryResultsOption;
@@ -45,6 +48,7 @@
4548
import com.google.cloud.bigquery.StandardSQLTypeName;
4649
import com.google.cloud.bigquery.TableId;
4750
import com.google.cloud.bigquery.TableResult;
51+
import com.google.cloud.bigquery.exception.BigQueryJdbcException;
4852
import com.google.cloud.bigquery.jdbc.BigQueryStatement.JobIdWrapper;
4953
import com.google.cloud.bigquery.spi.BigQueryRpcFactory;
5054
import com.google.cloud.bigquery.storage.v1.ArrowSchema;
@@ -497,6 +501,81 @@ public void testGetStatementType(boolean isReadOnlyTokenUsed) throws Exception {
497501
.create(any(JobInfo.class));
498502
}
499503

504+
@Test
505+
public void testProcessQueryResponseFallbackToJsonOnReadApiFailure() throws SQLException {
506+
BigQueryStatement statementSpy = Mockito.spy(bigQueryStatement);
507+
TableResult tableResultMock = mockTableResultWithJob("job-id");
508+
509+
// Force useReadAPI to return true to enter the HTAPI block
510+
doReturn(true).when(statementSpy).useReadAPI(tableResultMock);
511+
512+
// Mock a permission denied ApiException
513+
ApiException apiExceptionMock = mockApiException(StatusCode.Code.PERMISSION_DENIED);
514+
515+
BigQueryJdbcException exceptionToThrow =
516+
new BigQueryJdbcException("Simulated permission denied", apiExceptionMock);
517+
518+
// Force processArrowResultSet to throw the permission exception
519+
Mockito.doThrow(exceptionToThrow).when(statementSpy).processArrowResultSet(tableResultMock);
520+
521+
BigQueryJsonResultSet jsonResultSetMock = mock(BigQueryJsonResultSet.class);
522+
// Mock processJsonResultSet to return our mock JSON result set
523+
doReturn(jsonResultSetMock).when(statementSpy).processJsonResultSet(tableResultMock);
524+
525+
statementSpy.processQueryResponse("SELECT 1", tableResultMock);
526+
527+
// Verify that processJsonResultSet was indeed called as a fallback
528+
verify(statementSpy).processJsonResultSet(tableResultMock);
529+
// Verify that currentResultSet is set to the mocked JSON result set
530+
assertThat(statementSpy.currentResultSet).isEqualTo(jsonResultSetMock);
531+
}
532+
533+
@Test
534+
public void testProcessQueryResponseNoFallbackOnNonPermissionFailure() throws SQLException {
535+
BigQueryStatement statementSpy = Mockito.spy(bigQueryStatement);
536+
TableResult tableResultMock = mockTableResultWithJob("job-id");
537+
538+
// Force useReadAPI to return true to enter the HTAPI block
539+
doReturn(true).when(statementSpy).useReadAPI(tableResultMock);
540+
541+
// Mock a non-permission ApiException (e.g., INTERNAL)
542+
ApiException apiExceptionMock = mockApiException(StatusCode.Code.INTERNAL);
543+
544+
BigQueryJdbcException exceptionToThrow =
545+
new BigQueryJdbcException("Simulated internal error", apiExceptionMock);
546+
547+
// Force processArrowResultSet to throw the non-permission exception
548+
Mockito.doThrow(exceptionToThrow).when(statementSpy).processArrowResultSet(tableResultMock);
549+
550+
BigQueryJsonResultSet jsonResultSetMock = mock(BigQueryJsonResultSet.class);
551+
doReturn(jsonResultSetMock).when(statementSpy).processJsonResultSet(tableResultMock);
552+
553+
// Assert that the exception is propagated
554+
try {
555+
statementSpy.processQueryResponse("SELECT 1", tableResultMock);
556+
fail("Expected SQLException to be thrown");
557+
} catch (SQLException e) {
558+
assertEquals(exceptionToThrow, e);
559+
}
560+
561+
// Verify that processJsonResultSet was NOT called
562+
verify(statementSpy, Mockito.never()).processJsonResultSet(tableResultMock);
563+
}
564+
565+
private TableResult mockTableResultWithJob(String jobId) {
566+
TableResult tableResult = mock(TableResult.class);
567+
doReturn(JobId.of(jobId)).when(tableResult).getJobId();
568+
return tableResult;
569+
}
570+
571+
private ApiException mockApiException(StatusCode.Code code) {
572+
ApiException apiExceptionMock = mock(ApiException.class);
573+
StatusCode statusCodeMock = mock(StatusCode.class);
574+
doReturn(statusCodeMock).when(apiExceptionMock).getStatusCode();
575+
doReturn(code).when(statusCodeMock).getCode();
576+
return apiExceptionMock;
577+
}
578+
500579
@Test
501580
public void testUseReadAPI_SafeguardSmallDataset() throws SQLException {
502581
// Setup: totalRows < MinTableSize, so it should not activate the Read API

0 commit comments

Comments
 (0)