Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@

public class ServiceNowSinkPropertiesPageActions {
public static ServiceNowSourceConfig config;
private static Map<String, String> responseFromServiceNowTable;
private static JsonObject responseFromServiceNowTable;
private static Gson gson = new Gson();

public static void getRecordFromServiceNowTable(String query, String tableName)
Expand All @@ -71,10 +71,8 @@ public static boolean verifyIfRecordCreatedInServiceNowIsCorrect(String query, S
String bqTable = TestSetupHooks.bqTargetTable;
getRecordFromServiceNowTable(query, tableName);

JsonObject jsonObject = new JsonObject();
responseFromServiceNowTable.forEach(jsonObject::addProperty);
List<JsonObject> serviceNowResponse = new ArrayList<>();
serviceNowResponse.add(jsonObject);
serviceNowResponse.add(responseFromServiceNowTable);

List<JsonObject> bigQueryResponse = new ArrayList<>();
List<Object> bigQueryRows = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.cdap.plugin.servicenow;

import com.google.common.annotations.VisibleForTesting;
import com.google.gson.stream.JsonReader;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
Expand All @@ -31,14 +32,19 @@
import io.cdap.plugin.servicenow.util.SchemaType;
import io.cdap.plugin.servicenow.util.ServiceNowConstants;
import io.cdap.plugin.servicenow.util.SourceValueType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStreamReader;
import javax.annotation.Nullable;

/**
* ServiceNow Base Config. Contains connection properties and methods.
*/
public class ServiceNowBaseConfig extends PluginConfig {

private static final Logger log = LoggerFactory.getLogger(ServiceNowBaseConfig.class);
@Name(ConfigUtil.NAME_USE_CONNECTION)
@Nullable
@Description("Whether to use an existing connection.")
Expand Down Expand Up @@ -140,7 +146,7 @@ public void validateTable(String tableName, SourceValueType valueType, FailureCo
requestBuilder.setResponseHeaders(ServiceNowConstants.HEADER_NAME_TOTAL_COUNT);

apiResponse = serviceNowTableAPIClient.executeGetWithRetries(requestBuilder.build());
if (serviceNowTableAPIClient.parseResponseToResultListOfMap(apiResponse.getResponseBody()).isEmpty()) {
if (isResultEmpty(apiResponse)) {
// Removed config property as in case of MultiSource, only first table error was populating.
collector.addFailure("Table: " + tableName + " is empty.", "");
}
Expand All @@ -152,4 +158,31 @@ public void validateTable(String tableName, SourceValueType valueType, FailureCo
}
}

/**
* Checks if the "result" array in the ServiceNow REST API response is empty.
* <p>
* Determines if the "result" array in a ServiceNow REST API response is empty by specifically looking for a top-level
* key named "result". Once found, it opens the associated array and checks for the presence of a first element.
* </p>
*
* @param restAPIResponse The response object containing the JSON input stream
* @return true, if the "result" array exists and is empty, or if the "result" key is never found;
* false, if the array contains at least one element.
* @throws IOException If there is an error reading the input stream or parsing the JSON.
*/
public boolean isResultEmpty(RestAPIResponse restAPIResponse) throws IOException {
JsonReader reader = new JsonReader(new InputStreamReader(restAPIResponse.getResponseStream()));
reader.beginObject();
while (reader.hasNext()) {
String name = reader.nextName();
if (ServiceNowConstants.RESULT.equals(name)) {
reader.beginArray();
return !reader.hasNext();
} else {
reader.skipValue();
}
}
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
import com.sun.org.apache.xpath.internal.operations.Bool;
import com.google.gson.stream.JsonReader;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.servicenow.connector.ServiceNowConnectorConfig;
Expand Down Expand Up @@ -56,7 +56,10 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -86,6 +89,7 @@ public class ServiceNowTableAPIClientImpl extends RestAPIClient {
public static JsonArray serviceNowJsonResultArray;

public ServiceNowTableAPIClientImpl(ServiceNowConnectorConfig conf, Boolean useConnection) {
super();
this.conf = conf;
this.schemaType = getSchemaTypeBasedOnUseConnection(useConnection);
}
Expand Down Expand Up @@ -135,7 +139,7 @@ public String getAccessTokenRetryableMode() throws ExecutionException, RetryExce
* @return A list of maps, where each map represents a record from the table.
* @throws ServiceNowAPIException If an error occurs while fetching the records.
*/
public List<Map<String, String>> fetchTableRecords(
public RestAPIResponse fetchTableRecords(
String tableName,
SourceValueType valueType,
String filterQuery,
Expand All @@ -159,7 +163,7 @@ public List<Map<String, String>> fetchTableRecords(
String accessToken = getAccessToken();
requestBuilder.setAuthHeader(accessToken);
RestAPIResponse apiResponse = executeGetWithRetries(requestBuilder.build());
return parseResponseToResultListOfMap(apiResponse.getResponseBody());
return apiResponse;
}

private int getRecordCountFromHeader(RestAPIResponse apiResponse) {
Expand Down Expand Up @@ -209,30 +213,24 @@ private String getErrorMessage(String responseBody) {
* @param limit The number of records to be fetched
* @return The list of Map; each Map representing a table row
*/
public List<Map<String, String>> fetchTableRecordsRetryableMode(String tableName, SourceValueType valueType,
public RestAPIResponse fetchTableRecordsRetryableMode(String tableName, SourceValueType valueType,
String filterQuery, int offset,
int limit) throws ServiceNowAPIException {
final List<Map<String, String>> results = new ArrayList<>();
Callable<Boolean> fetchRecords = () -> {
results.addAll(fetchTableRecords(tableName, valueType, filterQuery, offset, limit));
return true;
};
Callable<RestAPIResponse> fetchRecords = () -> fetchTableRecords(tableName, valueType, filterQuery, offset, limit);

Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
Retryer<RestAPIResponse> retryer = RetryerBuilder.<RestAPIResponse>newBuilder()
.retryIfException(this::isExceptionRetryable)
.withWaitStrategy(WaitStrategies.exponentialWait(ServiceNowConstants.WAIT_TIME, TimeUnit.MILLISECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(ServiceNowConstants.MAX_NUMBER_OF_RETRY_ATTEMPTS))
.build();

try {
retryer.call(fetchRecords);
return retryer.call(fetchRecords);
} catch (RetryException | ExecutionException e) {
throw new ServiceNowAPIException(
String.format("Data Recovery failed for batch %s to %s.", offset, (offset + limit)),
e, null, false);
}

return results;
}

/**
Expand All @@ -255,8 +253,8 @@ public Schema fetchTableSchema(String tableName, FailureCollector collector) {
}

@VisibleForTesting
public MetadataAPISchemaResponse parseSchemaResponse(String responseBody) {
return GSON.fromJson(responseBody, MetadataAPISchemaResponse.class);
public MetadataAPISchemaResponse parseSchemaResponse(InputStream responseStream) {
return GSON.fromJson(createJsonReader(responseStream), MetadataAPISchemaResponse.class);
}

/**
Expand All @@ -268,8 +266,8 @@ public MetadataAPISchemaResponse parseSchemaResponse(String responseBody) {
* @throws ServiceNowAPIException
*/
public Schema fetchTableSchema(String tableName, SourceValueType valueType)
throws ServiceNowAPIException {
return fetchTableSchema(tableName, getAccessToken(), valueType, schemaType, false);
throws ServiceNowAPIException {
return fetchTableSchema(tableName, getAccessToken(), valueType, schemaType, true);
}

private SchemaType getSchemaTypeBasedOnUseConnection(Boolean useConnection) {
Expand All @@ -293,7 +291,7 @@ private SchemaType getSchemaTypeBasedOnUseConnection(Boolean useConnection) {
*/
public Schema fetchTableSchema(String tableName, String accessToken, SourceValueType valueType,
SchemaType schemaType, Boolean legacyMapping)
throws ServiceNowAPIException {
throws ServiceNowAPIException {
ServiceNowTableAPIRequestBuilder requestBuilder = new ServiceNowTableAPIRequestBuilder(
this.conf.getRestApiEndpoint(), tableName, true, schemaType)
.setExcludeReferenceLink(true);
Expand All @@ -303,12 +301,16 @@ public Schema fetchTableSchema(String tableName, String accessToken, SourceValue
restAPIResponse = executeGetWithRetries(requestBuilder.build());
List<ServiceNowColumn> columns = new ArrayList<>();

if (schemaType == SchemaType.METADATA_API_BASED) {
return prepareSchemaWithMetadataAPI(restAPIResponse, columns, tableName, valueType, legacyMapping);
} else if (schemaType == SchemaType.SCHEMA_API_BASED) {
return prepareSchemaWithSchemaAPI(restAPIResponse, columns, tableName);
} else {
return prepareStringBasedSchema(restAPIResponse, columns, tableName);
try {
if (schemaType == SchemaType.METADATA_API_BASED) {
return prepareSchemaWithMetadataAPI(restAPIResponse, columns, tableName, valueType, legacyMapping);
} else if (schemaType == SchemaType.SCHEMA_API_BASED) {
return prepareSchemaWithSchemaAPI(restAPIResponse, columns, tableName, legacyMapping);
} else {
return prepareStringBasedSchema(restAPIResponse, columns, tableName, legacyMapping);
}
} catch (IOException exception) {
throw new RuntimeException("Error in fetching schema for table " + tableName, exception);
}
}

Expand All @@ -329,9 +331,9 @@ public Schema fetchTableSchema(String tableName, String accessToken, SourceValue
* @throws RuntimeException if the schema response is null or contains no result.
*/
private Schema prepareSchemaWithSchemaAPI(RestAPIResponse restAPIResponse, List<ServiceNowColumn> columns,
String tableName) throws ServiceNowAPIException {
String tableName, Boolean legacyMapping) throws ServiceNowAPIException, IOException {
SchemaAPISchemaResponse schemaAPISchemaResponse =
GSON.fromJson(restAPIResponse.getResponseBody(), SchemaAPISchemaResponse.class);
GSON.fromJson(createJsonReader(restAPIResponse.getResponseStream()), SchemaAPISchemaResponse.class);

if (schemaAPISchemaResponse.getResult() == null || schemaAPISchemaResponse.getResult().isEmpty()) {
throw new ServiceNowAPIException(
Expand All @@ -341,7 +343,7 @@ private Schema prepareSchemaWithSchemaAPI(RestAPIResponse restAPIResponse, List<
for (SchemaAPISchemaField field : schemaAPISchemaResponse.getResult()) {
columns.add(new ServiceNowColumn(field.getName(), field.getInternalType()));
}
return SchemaBuilder.constructSchema(tableName, columns, false);
return SchemaBuilder.constructSchema(tableName, columns, legacyMapping);
}

/**
Expand All @@ -363,8 +365,8 @@ private Schema prepareSchemaWithSchemaAPI(RestAPIResponse restAPIResponse, List<
* @throws RuntimeException if the response does not contain valid column information.
*/
private Schema prepareSchemaWithMetadataAPI(RestAPIResponse restAPIResponse, List<ServiceNowColumn> columns,
String tableName, SourceValueType valueType, Boolean legacyMapping) throws ServiceNowAPIException {
MetadataAPISchemaResponse metadataAPISchemaResponse = parseSchemaResponse(restAPIResponse.getResponseBody());
String tableName, SourceValueType valueType, Boolean legacyMapping) throws ServiceNowAPIException, IOException {
MetadataAPISchemaResponse metadataAPISchemaResponse = parseSchemaResponse(restAPIResponse.getResponseStream());

if (metadataAPISchemaResponse.getResult() == null || metadataAPISchemaResponse.getResult().getColumns() == null ||
metadataAPISchemaResponse.getResult().getColumns().isEmpty()) {
Expand All @@ -391,6 +393,11 @@ private Schema prepareSchemaWithMetadataAPI(RestAPIResponse restAPIResponse, Lis
return SchemaBuilder.constructSchema(tableName, columns, legacyMapping);
}

public JsonReader createJsonReader(InputStream inputStream) {
Objects.requireNonNull(inputStream, "InputStream must not be null");
return new JsonReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
}

/**
* Gets the total number of records in the table based on the provided date range.
*
Expand Down Expand Up @@ -451,8 +458,8 @@ public String createRecord(String tableName, HttpEntity entity) throws IOExcepti
requestBuilder.setContentTypeHeader("application/json");
requestBuilder.setEntity(entity);
apiResponse = executePost(requestBuilder.build());

systemID = String.valueOf(getSystemId(apiResponse));
apiResponse.close();
} catch (IOException e) {
throw new ServiceNowAPIException("Error in creating a new record", e, null, false);
}
Expand Down Expand Up @@ -482,15 +489,17 @@ public String createRecordInDisplayMode(String tableName, HttpEntity entity) thr
apiResponse = executePost(requestBuilder.build());

systemID = String.valueOf(getSystemId(apiResponse));
apiResponse.close();
} catch (IOException e) {
throw new ServiceNowAPIException("Error in creating a new record", e, null, false);
}
return systemID;
}

private String getSystemId(RestAPIResponse restAPIResponse) {
CreateRecordAPIResponse apiResponse = GSON.fromJson(restAPIResponse.getResponseBody(),
CreateRecordAPIResponse.class);
private String getSystemId(RestAPIResponse restAPIResponse) throws IOException {
CreateRecordAPIResponse apiResponse = GSON.fromJson(
new InputStreamReader(restAPIResponse.getResponseStream(), StandardCharsets.UTF_8),
CreateRecordAPIResponse.class);
return apiResponse.getResult().get(ServiceNowConstants.SYSTEM_ID).toString();
}

Expand All @@ -501,8 +510,8 @@ private String getSystemId(RestAPIResponse restAPIResponse) {
* @param tableName The ServiceNow table name
* @param query The query
*/
public Map<String, String> getRecordFromServiceNowTable(String tableName, String query)
throws ServiceNowAPIException {
public JsonObject getRecordFromServiceNowTable(String tableName, String query)
throws ServiceNowAPIException, IOException {

ServiceNowTableAPIRequestBuilder requestBuilder = new ServiceNowTableAPIRequestBuilder(
this.conf.getRestApiEndpoint(), tableName, false, schemaType)
Expand All @@ -513,7 +522,8 @@ public Map<String, String> getRecordFromServiceNowTable(String tableName, String
requestBuilder.setAuthHeader(accessToken);
restAPIResponse = executeGetWithRetries(requestBuilder.build());

APIResponse apiResponse = GSON.fromJson(restAPIResponse.getResponseBody(), APIResponse.class);
APIResponse apiResponse = GSON.fromJson(
new InputStreamReader(restAPIResponse.getResponseStream(), StandardCharsets.UTF_8), APIResponse.class);
return apiResponse.getResult().get(0);
}

Expand All @@ -530,16 +540,42 @@ public Map<String, String> getRecordFromServiceNowTable(String tableName, String
*
* @throws RuntimeException if the schema response is null or contains no result.
*/
private Schema prepareStringBasedSchema(RestAPIResponse restAPIResponse, List<ServiceNowColumn> columns,
String tableName) {
List<Map<String, String>> result = parseResponseToResultListOfMap(restAPIResponse.getResponseBody());
if (result != null && !result.isEmpty()) {
Map<String, String> firstRecord = result.get(0);
for (String key : firstRecord.keySet()) {
columns.add(new ServiceNowColumn(key, "string"));
private Schema prepareStringBasedSchema(RestAPIResponse restAPIResponse, List<ServiceNowColumn> columns,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my understanding: When we already fetching schema from a metadata API so what is the use case of this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to support users who're still using the previous release version : v1.1
Please refer to this comment in the code.

// use connection was added in release/1.2, so it will be null for users who are upgrading from release/1.1
    // This is added to support backward compatibility for 1.1 users.
    if (useConnection == null) {
      return SchemaType.STRING_BASED;
    }

String tableName, Boolean legacyMapping) throws IOException {
InputStream in = restAPIResponse.getResponseStream();
JsonReader reader = new JsonReader(new InputStreamReader(in, StandardCharsets.UTF_8));
JsonObject firstRecord;
try {
firstRecord = getFirstRecord(reader);
restAPIResponse.close();
} catch (IOException e) {
throw new RuntimeException("Failed to parse schema for table: " + tableName, e);
}
if (firstRecord != null) {
firstRecord.entrySet().forEach(entry ->
columns.add(new ServiceNowColumn(entry.getKey(), "string"))
);
return SchemaBuilder.constructSchema(tableName, columns, legacyMapping);
}
return null;
}

public JsonObject getFirstRecord(JsonReader reader) throws IOException {
reader.beginObject();
while (reader.hasNext()) {
String name = reader.nextName();
if (name.equals(ServiceNowConstants.RESULT)) {
reader.beginArray();
if (reader.hasNext()) {
// ONLY parse the first object into memory
return GSON.fromJson(reader, JsonObject.class);
}
reader.endArray();
} else {
reader.skipValue();
}
return SchemaBuilder.constructSchema(tableName, columns, false);
}
reader.endObject();
return null;
}

Expand Down
Loading
Loading