diff --git a/digdag-docs/src/operators/bq.md b/digdag-docs/src/operators/bq.md index 3d80738ade..f8adad6be9 100644 --- a/digdag-docs/src/operators/bq.md +++ b/digdag-docs/src/operators/bq.md @@ -3,6 +3,8 @@ **bq>** operator runs a query on Google BigQuery. _export: + gcp: + project: my_project_id bq: dataset: my_dataset @@ -56,6 +58,17 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag. location: asia-northeast1 ``` +* **gcp.project**: NAME + + Specifies the default Google Cloud project to use in the query and in the `destination_table` parameter. + + Examples: + + ``` + gcp: + project: my_project_id + ``` + * **dataset**: NAME Specifies the default dataset to use in the query and in the `destination_table` parameter. @@ -167,6 +180,70 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag. Describes user-defined function resources used in the query. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.userDefinedFunctionResources). +* **clustering**: OBJECT + + Clustering specification for the destination table. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Clustering). + + Examples: + + ```yaml + clustering: + fields: + - field1 + ``` + +* **encryption_configuration**: OBJECT + + Custom encryption configuration. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/EncryptionConfiguration). + + Examples: + + ```yaml + encryption_configuration: + kmsKeyName: key_name + ``` + +* **maximum_bytes_billed**: LONG + + Limits the bytes billed for this job. Queries that will have bytes billed beyond this limit will fail (without incurring a charge). If unspecified, this will be set to your project default. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationquery). + +* **schema_update_options**: LIST + + Allows the schema of the destination table to be updated as a side effect of the query job. Schema update options are supported in two cases: when writeDisposition is WRITE_APPEND; when writeDisposition is WRITE_TRUNCATE and the destination table is a partition of a table, specified by partition decorators. For normal tables, WRITE_TRUNCATE will always overwrite the schema. One or more of the following values are specified: + - ALLOW_FIELD_ADDITION: allow adding a nullable field to the schema. + - ALLOW_FIELD_RELAXATION: allow relaxing a required field in the original schema to nullable. + + For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationquery). + +* **range_partitioning**: OBJECT + + Range partitioning specification for the destination table. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#RangePartitioning). + + Examples: + + ```yaml + range_partitioning: + field: id + range: + start: 0 + interval: 10 + end: 100 + ``` + +* **time_partitioning**: OBJECT + + Time-based partitioning specification for the destination table. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TimePartitioning). + + Examples: + + ```yaml + time_partitioning: + type: DAY + field: date + requirePartitionFilter: true + + ``` + ## Output parameters * **bq.last_job_id** diff --git a/digdag-docs/src/operators/bq_load.md b/digdag-docs/src/operators/bq_load.md index 96b2c85307..56702daea3 100644 --- a/digdag-docs/src/operators/bq_load.md +++ b/digdag-docs/src/operators/bq_load.md @@ -90,10 +90,17 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag. location: asia-northeast1 ``` -* **project**: NAME +* **gcp.project**: NAME The project that the table is located in or should be created in. Can also be specified directly in the table reference or the dataset parameter. + Examples: + + ``` + gcp: + project: my_project_id + ``` + * **source_format**: CSV | NEWLINE_DELIMITED_JSON | AVRO | DATASTORE_BACKUP The format of the files to be imported. *Default*: `CSV`. @@ -283,6 +290,98 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag. # schema: path/to/schema.yml ``` +* **clustering**: OBJECT + + Clustering specification for the destination table. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Clustering). + + Examples: + + ```yaml + clustering: + fields: + - field1 + ``` + +* **decimal_target_types**: STRING + + Defines the list of possible SQL data types to which the source decimal values are converted. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#decimaltargettype). + +* **encryption_configuration**: OBJECT + + Custom encryption configuration. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/EncryptionConfiguration). + + Examples: + + ```yaml + encryption_configuration: + kmsKeyName: key_name + ``` + +* **hive_partitioning_options**: OBJECT + + Options for configuring hive partitioning detect. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#hivepartitioningoptions). + + Examples: + + ```yaml + hive_partitioning_options: + mode: AUTO + sourceUriPrefix: gs://my_bucket/path + ``` + +* **json_extension**: STRING + + If sourceFormat is set to newline-delimited JSON, indicates whether it should be processed as a JSON variant such as GeoJSON. For a sourceFormat other than JSON, omit this field. If the sourceFormat is newline-delimited JSON: - for newline-delimited GeoJSON: set to GEOJSON. + +* **null_marker**: STRING + + Specifies a string that represents a null value in a CSV file. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). + +* **parquet_options**: OBJECT + + Parquet Options for load and make external tables. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions). + + Examples: + + ```yaml + parquet_options: + enableListInference: true + enumAsString: true + ``` + +* **use_avro_logical_types**: BOOLEAN + + If sourceFormat is set to "AVRO", indicates whether to interpret logical types as the corresponding BigQuery data type (for example, TIMESTAMP), instead of using the raw type (for example, INTEGER). For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). + +* **range_partitioning**: OBJECT + + Range partitioning specification for the destination table. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#RangePartitioning). + + Examples: + + ```yaml + range_partitioning: + field: id + range: + start: 0 + interval: 10 + end: 100 + ``` + +* **time_partitioning**: OBJECT + + Time-based partitioning specification for the destination table. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TimePartitioning). + + Examples: + + ```yaml + time_partitioning: + type: DAY + field: date + requirePartitionFilter: true + + ``` + ## Output parameters * **bq.last_job_id** diff --git a/digdag-standards/build.gradle b/digdag-standards/build.gradle index 9ac42484ad..bbf396a2de 100644 --- a/digdag-standards/build.gradle +++ b/digdag-standards/build.gradle @@ -35,7 +35,7 @@ dependencies { compile "com.amazonaws:aws-java-sdk-dynamodb:${project.ext.awsJavaSdkVersion}" // bigquery - compile ('com.google.apis:google-api-services-bigquery:v2-rev325-1.22.0') { exclude group: 'com.google.guava', module: 'guava-jdk5' } + compile('com.google.apis:google-api-services-bigquery:v2-rev20230520-2.0.0') { exclude group: 'com.google.guava', module: 'guava-jdk5' } // gcs compile ('com.google.apis:google-api-services-storage:v1-rev20190910-1.30.3') { diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseBqJobOperator.java b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseBqJobOperator.java index 5aebeac220..944a23c694 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseBqJobOperator.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseBqJobOperator.java @@ -2,6 +2,8 @@ import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfiguration; +import com.google.api.services.bigquery.model.RangePartitioning; +import com.google.api.services.bigquery.model.TimePartitioning; import com.google.common.base.Optional; import io.digdag.client.config.Config; import io.digdag.client.config.ConfigFactory; @@ -43,4 +45,33 @@ private TaskResult result(Job job) } protected abstract JobConfiguration jobConfiguration(String projectId); + + protected RangePartitioning rangePartitioning(Config params) { + Config rangeParams = params.getNested("range"); + RangePartitioning.Range range = new RangePartitioning.Range(); + range.setStart(rangeParams.get("start", Long.class)) + .setEnd(rangeParams.get("end", Long.class)) + .setInterval(rangeParams.get("interval", Long.class)); + + RangePartitioning rPart = new RangePartitioning(); + rPart.setField(params.get("field", String.class)) + .setRange(range); + + return rPart; + } + + protected TimePartitioning timePartitioning(Config params) { + TimePartitioning tPart = new TimePartitioning(); + // required fields + tPart.setType(params.get("type", String.class)); + + // optional fields + params.getOptional("field", String.class).transform(tPart::setField); + params.getOptional("requirePartitionFilter", Boolean.class).transform(tPart::setRequirePartitionFilter); + if (params.has("expirationMs")) { + tPart.setExpirationMs(params.get("expirationMs", Long.class)); + } + + return tPart; + } } diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseGcpOperator.java b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseGcpOperator.java index 883bb5cb0e..8a7f6040d6 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseGcpOperator.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseGcpOperator.java @@ -29,7 +29,9 @@ public TaskResult runTask() private String projectId(GcpCredential credential) { - Optional projectId = context.getSecrets().getSecretOptional("gcp.project") + Optional projectId = request.getConfig().getNestedOrGetEmpty("gcp") + .getOptional("project", String.class) + .or(context.getSecrets().getSecretOptional("gcp.project")) .or(credential.projectId()); if (!projectId.isPresent()) { throw new TaskExecutionException("Missing 'gcp.project' secret"); diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/Bq.java b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/Bq.java index 39c6be4279..83996a2881 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/Bq.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/Bq.java @@ -9,7 +9,8 @@ class Bq { - private static final Pattern TABLE_REFERENCE_PATTERN = Pattern.compile("^(?:(?[^:]+):)?(?:(?[^.]+)\\.)?(?[a-zA-Z0-9_]{1,1024}(?:\\$[0-9]{8})?)$"); + private static final Pattern TABLE_REFERENCE_PATTERN = Pattern.compile( + "^(?:(?[^:]+):)?(?:(?[^.]+)\\.)?(?
[a-zA-Z0-9_]{1,1024}(?:\\$[0-9]{4,10})?)$"); static TableReference tableReference(String defaultProjectId, Optional defaultDataset, String s) { diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqLoadOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqLoadOperatorFactory.java index c9cbad5ae4..fa8540d1ce 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqLoadOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqLoadOperatorFactory.java @@ -1,11 +1,14 @@ package io.digdag.standards.operator.gcp; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.api.services.bigquery.model.Clustering; import com.google.api.services.bigquery.model.DatasetReference; +import com.google.api.services.bigquery.model.EncryptionConfiguration; +import com.google.api.services.bigquery.model.HivePartitioningOptions; import com.google.api.services.bigquery.model.JobConfiguration; import com.google.api.services.bigquery.model.JobConfigurationLoad; +import com.google.api.services.bigquery.model.ParquetOptions; import com.google.api.services.bigquery.model.TableSchema; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; @@ -99,6 +102,22 @@ protected JobConfiguration jobConfiguration(String projectId) Optional.of(params.getListOrEmpty("projection_fields", String.class)).transform(cfg::setProjectionFields); params.getOptional("autodetect", boolean.class).transform(cfg::setAutodetect); Optional.of(params.getListOrEmpty("schema_update_options", String.class)).transform(cfg::setSchemaUpdateOptions); + params.getOptional("clustering", Clustering.class).transform(cfg::setClustering); + Optional.of(params.getListOrEmpty("decimal_target_types", String.class)).transform(cfg::setDecimalTargetTypes); + params.getOptional("encryption_configuration", EncryptionConfiguration.class).transform(cfg::setDestinationEncryptionConfiguration); + params.getOptional("hive_partitioning_options", HivePartitioningOptions.class).transform(cfg::setHivePartitioningOptions); + params.getOptional("json_extension", String.class).transform(cfg::setJsonExtension); + params.getOptional("null_marker", String.class).transform(cfg::setNullMarker); + params.getOptional("parquet_options", ParquetOptions.class).transform(cfg::setParquetOptions); + params.getOptional("use_avro_logical_types", boolean.class).transform(cfg::setUseAvroLogicalTypes); + + if (params.has("range_partitioning")) { + cfg.setRangePartitioning(rangePartitioning(params.getNested("range_partitioning"))); + } + + if (params.has("time_partitioning")) { + cfg.setTimePartitioning(timePartitioning(params.getNested("time_partitioning"))); + } return new JobConfiguration() .setLoad(cfg); diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqOperatorFactory.java index 0e20760973..53e699950c 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqOperatorFactory.java @@ -1,7 +1,9 @@ package io.digdag.standards.operator.gcp; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.api.services.bigquery.model.Clustering; import com.google.api.services.bigquery.model.DatasetReference; +import com.google.api.services.bigquery.model.EncryptionConfiguration; import com.google.api.services.bigquery.model.ExternalDataConfiguration; import com.google.api.services.bigquery.model.JobConfiguration; import com.google.api.services.bigquery.model.JobConfigurationQuery; @@ -86,6 +88,18 @@ protected JobConfiguration jobConfiguration(String projectId) params.getOptional("destination_table", String.class) .transform(s -> cfg.setDestinationTable(tableReference(projectId, defaultDataset, s))); + params.getOptional("clustering", Clustering.class).transform(cfg::setClustering); + params.getOptional("encryption_configuration", EncryptionConfiguration.class).transform(cfg::setDestinationEncryptionConfiguration); + params.getOptional("maximum_bytes_billed", Long.class).transform(cfg::setMaximumBytesBilled); + Optional.of(params.getListOrEmpty("schema_update_options", String.class)).transform(cfg::setSchemaUpdateOptions); + + if (params.has("range_partitioning")) { + cfg.setRangePartitioning(rangePartitioning(params.getNested("range_partitioning"))); + } + + if (params.has("time_partitioning")) { + cfg.setTimePartitioning(timePartitioning(params.getNested("time_partitioning"))); + } return new JobConfiguration() .setQuery(cfg); diff --git a/digdag-standards/src/test/java/io/digdag/standards/operator/gcp/BqLoadOperatorFactoryTest.java b/digdag-standards/src/test/java/io/digdag/standards/operator/gcp/BqLoadOperatorFactoryTest.java new file mode 100644 index 0000000000..6337a4f4bd --- /dev/null +++ b/digdag-standards/src/test/java/io/digdag/standards/operator/gcp/BqLoadOperatorFactoryTest.java @@ -0,0 +1,310 @@ +package io.digdag.standards.operator.gcp; + +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.file.Path; +import java.time.Instant; +import java.time.ZoneId; +import java.util.List; +import java.util.UUID; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; +import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.JobConfigurationLoad; +import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.JobStatus; +import com.google.api.services.bigquery.model.TimePartitioning; +import com.google.api.services.bigquery.model.RangePartitioning.Range; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import io.digdag.client.DigdagClient; +import io.digdag.client.config.Config; +import io.digdag.client.config.ConfigException; +import io.digdag.spi.ImmutableTaskRequest; +import io.digdag.spi.Operator; +import io.digdag.spi.OperatorContext; +import io.digdag.spi.SecretProvider; +import io.digdag.spi.TaskExecutionException; +import io.digdag.spi.TemplateEngine; + +import static io.digdag.client.config.ConfigUtils.configFactory; +import static io.digdag.client.config.ConfigUtils.newConfig; +import static io.digdag.core.workflow.OperatorTestingUtils.newContext; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; + +@RunWith(MockitoJUnitRunner.class) +public class BqLoadOperatorFactoryTest { + private static final String PROJECT_ID = "test-project"; + private static final ObjectMapper OBJECT_MAPPER = DigdagClient.objectMapper(); + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Mock + TemplateEngine templateEngine; + @Mock + SecretProvider secrets; + + @Mock + BqClient bqClient; + @Mock + BqClient.Factory bqClientFactory; + + @Mock + GoogleCredential googleCredential; + @Mock + GcpCredential gcpCredential; + @Mock + GcpCredentialProvider gcpCredentialProvider; + + private Path projectPath; + private OperatorContext operatorContext; + private BqLoadOperatorFactory factory; + + private static final String dummyPath = "gs://bucket/path"; + private static final String dummyDataset = "some_dataset"; + private static final String dummyTable = "some_table"; + + private void doTestBqLoad(Config config) throws IOException { + Answer answer = new Answer() { + public Void answer(InvocationOnMock invocation) { + JobConfigurationLoad jobConf = invocation.getArgumentAt(1, Job.class).getConfiguration().getLoad(); + + assertThat(jobConf.getSourceUris(), is(contains(sourceUris(config).toArray()))); + + String destinaionTable = jobConf.getDestinationTable().getDatasetId() + "." + + jobConf.getDestinationTable().getTableId(); + assertThat(destinaionTable, is(config.get("destination_table", String.class))); + assertThat(jobConf.getSourceFormat(), is(config.get("source_format", String.class, null))); + assertThat(jobConf.getAutodetect(), is(config.get("autodetect", Boolean.class, null))); + + Config clustering = config.getNestedOrGetEmpty("clustering"); + if (!clustering.isEmpty()) + assertThat(jobConf.getClustering().getFields(), + is(contains(clustering.getList("fields", String.class).toArray()))); + + if (config.has("decimal_target_types")) + assertThat(jobConf.getDecimalTargetTypes(), + is(contains(config.getList("decimal_target_types", String.class).toArray()))); + + Config encryptionConfiguration = config.getNestedOrGetEmpty("encryption_configuration"); + if (!encryptionConfiguration.isEmpty()) + assertThat(jobConf.getDestinationEncryptionConfiguration().getKmsKeyName(), + is(encryptionConfiguration.get("kmsKeyName", String.class))); + + Config hivePartitioningOptions = config.getNestedOrGetEmpty("hive_partitioning_options"); + if (!hivePartitioningOptions.isEmpty()) { + assertThat(jobConf.getHivePartitioningOptions().getMode(), + is(hivePartitioningOptions.get("mode", String.class))); + assertThat(jobConf.getHivePartitioningOptions().getSourceUriPrefix(), + is(hivePartitioningOptions.get("sourceUriPrefix", String.class))); + } + + assertThat(jobConf.getJsonExtension(), is(config.get("json_extension", String.class, null))); + assertThat(jobConf.getNullMarker(), is(config.get("null_marker", String.class, null))); + + Config parquetOptions = config.getNestedOrGetEmpty("parquet_options"); + if (!parquetOptions.isEmpty()) { + assertThat(jobConf.getParquetOptions().getEnableListInference(), + is(parquetOptions.get("enableListInference", Boolean.class, null))); + assertThat(jobConf.getParquetOptions().getEnumAsString(), + is(parquetOptions.get("enumAsString", Boolean.class, null))); + } + + assertThat(jobConf.getUseAvroLogicalTypes(), + is(config.get("use_avro_logical_types", Boolean.class, null))); + + Config rangePartitioning = config.getNestedOrGetEmpty("range_partitioning"); + if (!rangePartitioning.isEmpty()) { + assertThat(jobConf.getRangePartitioning().getField(), + is(rangePartitioning.get("field", String.class))); + Range range = jobConf.getRangePartitioning().getRange(); + Config rangeConfig = rangePartitioning.getNested("range"); + assertThat(range.getStart(), + is(rangeConfig.get("start", Long.class))); + assertThat(range.getInterval(), + is(rangeConfig.get("interval", Long.class))); + assertThat(range.getEnd(), + is(rangeConfig.get("end", Long.class))); + } + + Config timePartitioningConfig = config.getNestedOrGetEmpty("time_partitioning"); + if (!timePartitioningConfig.isEmpty()) { + TimePartitioning timePartitioning = jobConf.getTimePartitioning(); + assertThat(timePartitioning.getType(), + is(timePartitioningConfig.get("type", String.class))); + assertThat(timePartitioning.getField(), + is(timePartitioningConfig.get("field", String.class, null))); + assertThat(timePartitioning.getRequirePartitionFilter(), + is(timePartitioningConfig.get("requirePartitionFilter", Boolean.class, null))); + assertThat(timePartitioning.getExpirationMs(), + is(timePartitioningConfig.get("expirationMs", Long.class, null))); + } + + return null; + } + }; + doAnswer(answer).when(bqClient).submitJob(anyString(), any()); + + projectPath = temporaryFolder.newFolder().toPath(); + ImmutableTaskRequest taskRequest = ImmutableTaskRequest.builder() + .siteId(0) + .projectId(0) + .projectName("dummy_project") + .workflowName("dummy_workflow") + .taskId(0) + .attemptId(0) + .sessionId(0) + .taskName("dummy_task") + .lockId("dummy_lockId") + .timeZone(ZoneId.of("UTC")) + .sessionUuid(UUID.randomUUID()) + .sessionTime(Instant.now()) + .createdAt(Instant.now()) + .isCancelRequested(false) + .localConfig(newConfig()) + .config(config) + .lastStateParams(newConfig()) + .build(); + operatorContext = newContext(projectPath, taskRequest, secrets); + Operator operator = factory.newOperator(operatorContext); + + // first call of operator.run() set BigQuery job id in state param + // and throw TaskExecutionException + try { + operator.run(); + } catch (TaskExecutionException e) { + taskRequest = taskRequest.withLastStateParams(e.getStateParams(configFactory).get()); + } + + Job job = new Job(); + job.setStatus(new JobStatus().setState("DONE")); + when(bqClient.jobStatus(anyString(), anyString(), any())).thenReturn(job); + + // second call of operator.run() check BigQUery job status + operatorContext = newContext(projectPath, taskRequest, secrets); + operator = factory.newOperator(operatorContext); + operator.run(); + } + + private List sourceUris(Config params) { + try { + return params.parseList("_command", String.class); + } catch (ConfigException ignore) { + return ImmutableList.of(params.get("_command", String.class)); + } + } + + @Before + public void setUp() throws Exception + { + when(gcpCredential.credential()).thenReturn(googleCredential); + when(gcpCredential.projectId()).thenReturn(Optional.of(PROJECT_ID)); + when(gcpCredentialProvider.credential(secrets)).thenReturn(gcpCredential); + when(secrets.getSecretOptional("gcp.project")).thenReturn(Optional.of(PROJECT_ID)); + when(bqClientFactory.create(googleCredential)).thenReturn(bqClient); + + factory = new BqLoadOperatorFactory(OBJECT_MAPPER, templateEngine, bqClientFactory, gcpCredentialProvider); + } + + @Test + public void testBqLoadJsonTimePartitioningDay() throws IOException { + Config config = newConfig(); + config.set("_command", dummyPath); + config.set("destination_table", dummyDataset + "." + dummyTable); + config.set("source_format", "NEWLINE_DELIMITED_JSON"); + config.set("autodetect", true); + config.set("clustering", ImmutableMap.of( + "fields", ImmutableList.of("field1", "field2"))); + config.set("decimal_target_types", ImmutableList.of("BIGNUMERIC", "STRING")); + config.set("encryption_configuration", ImmutableMap.of( + "kmsKeyName", "dummy_key")); + config.set("json_extension", "GEOJSON"); + config.set("time_partitioning", ImmutableMap.of( + "type", "DAY", + "field", "date", + "requirePartitionFilter", true, + "expirationMs", 3600000L)); + + doTestBqLoad(config); + } + + @Test + public void testBqLoadCsvTimePartitioningYear() throws IOException { + Config config = newConfig(); + config.set("_command", dummyPath); + config.set("destination_table", dummyDataset + "." + dummyTable); + config.set("source_format", "CSV"); + config.set("autodetect", true); + config.set("null_marker", "NULL"); + config.set("time_partitioning", ImmutableMap.of( + "type", "YEAR", + "field", "date", + "requirePartitionFilter", true, + "expirationMs", 3600000L)); + + doTestBqLoad(config); + } + + @Test + public void testBqLoadJsonHivePartition() throws IOException { + Config config = newConfig(); + config.set("_command", dummyPath); + config.set("destination_table", dummyDataset + "." + dummyTable); + config.set("source_format", "NEWLINE_DELIMITED_JSON"); + config.set("hive_partitioning_options", ImmutableMap.of( + "mode", "AUTO", + "sourceUriPrefix", dummyPath)); + + doTestBqLoad(config); + } + + @Test + public void testBqLoadParquetRangePartition() throws IOException { + Config config = newConfig(); + config.set("_command", dummyPath); + config.set("destination_table", dummyDataset + "." + dummyTable); + config.set("source_format", "PARQUET"); + config.set("parquet_options", ImmutableMap.of( + "enableListInference", true, + "enumAsString", true)); + config.set("range_partitioning", ImmutableMap.of( + "field", "id", + "range", ImmutableMap.of( + "start", 0L, + "interval", 10L, + "end", 100L))); + + doTestBqLoad(config); + } + + @Test + public void testBqLoadAvro() throws IOException { + Config config = newConfig(); + config.set("_command", dummyPath); + config.set("destination_table", dummyDataset + "." + dummyTable); + config.set("source_format", "AVRO"); + config.set("use_avro_logical_types", true); + + doTestBqLoad(config); + } +} diff --git a/digdag-standards/src/test/java/io/digdag/standards/operator/gcp/BqOperatorFactoryTest.java b/digdag-standards/src/test/java/io/digdag/standards/operator/gcp/BqOperatorFactoryTest.java new file mode 100644 index 0000000000..0f22b8291d --- /dev/null +++ b/digdag-standards/src/test/java/io/digdag/standards/operator/gcp/BqOperatorFactoryTest.java @@ -0,0 +1,285 @@ +package io.digdag.standards.operator.gcp; + +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.when; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; +import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.JobStatus; +import com.google.api.services.bigquery.model.RangePartitioning.Range; +import com.google.api.services.bigquery.model.TimePartitioning; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import io.digdag.client.config.Config; +import io.digdag.spi.ImmutableTaskRequest; +import io.digdag.spi.Operator; +import io.digdag.spi.OperatorContext; +import io.digdag.spi.SecretProvider; +import io.digdag.spi.TaskExecutionException; +import io.digdag.spi.TemplateEngine; + +import java.io.IOException; +import java.nio.file.Path; +import java.time.Instant; +import java.time.ZoneId; +import java.util.List; +import java.util.UUID; + +import static io.digdag.client.config.ConfigUtils.configFactory; +import static io.digdag.client.config.ConfigUtils.newConfig; +import static io.digdag.core.workflow.OperatorTestingUtils.newContext; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; + +@RunWith(MockitoJUnitRunner.class) +public class BqOperatorFactoryTest { + private static final String PROJECT_ID = "test-project"; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Mock + TemplateEngine templateEngine; + @Mock + SecretProvider secrets; + + @Mock + BqClient bqClient; + @Mock + BqClient.Factory bqClientFactory; + + @Mock + GoogleCredential googleCredential; + @Mock + GcpCredential gcpCredential; + @Mock + GcpCredentialProvider gcpCredentialProvider; + + private Path projectPath; + private OperatorContext operatorContext; + private BqOperatorFactory factory; + + private static final String dummyQuery = "SELECT 1"; + private static final String dummyDataset = "some_dataset"; + private static final String dummyTable = "some_table"; + + private void doTestBq(Config config) throws IOException { + Answer answer = new Answer() { + public Void answer(InvocationOnMock invocation) { + JobConfigurationQuery jobConf = invocation.getArgumentAt(1, Job.class).getConfiguration().getQuery(); + + assertThat(jobConf.getQuery(), is(config.get("query", String.class))); + // bq operator sets default value of use_legacy_sql at false + assertThat(jobConf.getUseLegacySql(), is(config.get("use_legacy_sql", Boolean.class, false))); + assertThat(jobConf.getAllowLargeResults(), is(config.get("allow_large_results", Boolean.class, null))); + assertThat(jobConf.getUseQueryCache(), is(config.get("use_query_cache", Boolean.class, null))); + assertThat(jobConf.getCreateDisposition(), is(config.get("create_disposition", String.class, null))); + assertThat(jobConf.getWriteDisposition(), is(config.get("write_disposition", String.class, null))); + assertThat(jobConf.getFlattenResults(), is(config.get("flatten_results", Boolean.class, null))); + assertThat(jobConf.getMaximumBillingTier(), + is(config.get("maximum_billing_tier", Integer.class, null))); + assertThat(jobConf.getPriority(), is(config.get("priority", String.class, null))); + + String destinaionTable = jobConf.getDestinationTable().getDatasetId() + "." + + jobConf.getDestinationTable().getTableId(); + assertThat(destinaionTable, is(config.get("destination_table", String.class))); + + Config clustering = config.getNestedOrGetEmpty("clustering"); + if (!clustering.isEmpty()) + assertThat(jobConf.getClustering().getFields(), + is(contains(clustering.getList("fields", String.class).toArray()))); + + Config encryptionConfiguration = config.getNestedOrGetEmpty("encryption_configuration"); + if (!encryptionConfiguration.isEmpty()) + assertThat(jobConf.getDestinationEncryptionConfiguration().getKmsKeyName(), + is(encryptionConfiguration.get("kmsKeyName", String.class))); + + assertThat(jobConf.getMaximumBytesBilled(), is(config.get("maximum_bytes_billed", Long.class, null))); + + List schema_update_options = config.getListOrEmpty("schema_update_options", String.class); + if (!schema_update_options.isEmpty()) + assertThat(jobConf.getSchemaUpdateOptions(), is(contains(schema_update_options.toArray()))); + + Config rangePartitioning = config.getNestedOrGetEmpty("range_partitioning"); + if (!rangePartitioning.isEmpty()) { + assertThat(jobConf.getRangePartitioning().getField(), + is(rangePartitioning.get("field", String.class))); + Range range = jobConf.getRangePartitioning().getRange(); + Config rangeConfig = rangePartitioning.getNested("range"); + assertThat(range.getStart(), + is(rangeConfig.get("start", Long.class))); + assertThat(range.getInterval(), + is(rangeConfig.get("interval", Long.class))); + assertThat(range.getEnd(), + is(rangeConfig.get("end", Long.class))); + } + + Config timePartitioningConfig = config.getNestedOrGetEmpty("time_partitioning"); + if (!timePartitioningConfig.isEmpty()) { + TimePartitioning timePartitioning = jobConf.getTimePartitioning(); + assertThat(timePartitioning.getType(), + is(timePartitioningConfig.get("type", String.class))); + assertThat(timePartitioning.getField(), + is(timePartitioningConfig.get("field", String.class, null))); + assertThat(timePartitioning.getRequirePartitionFilter(), + is(timePartitioningConfig.get("requirePartitionFilter", Boolean.class, null))); + assertThat(timePartitioning.getExpirationMs(), + is(timePartitioningConfig.get("expirationMs", Long.class, null))); + } + + return null; + } + }; + doAnswer(answer).when(bqClient).submitJob(anyString(), any()); + + projectPath = temporaryFolder.newFolder().toPath(); + ImmutableTaskRequest taskRequest = ImmutableTaskRequest.builder() + .siteId(0) + .projectId(0) + .projectName("dummy_project") + .workflowName("dummy_workflow") + .taskId(0) + .attemptId(0) + .sessionId(0) + .taskName("dummy_task") + .lockId("dummy_lockId") + .timeZone(ZoneId.of("UTC")) + .sessionUuid(UUID.randomUUID()) + .sessionTime(Instant.now()) + .createdAt(Instant.now()) + .isCancelRequested(false) + .localConfig(newConfig()) + .config(config) + .lastStateParams(newConfig()) + .build(); + operatorContext = newContext(projectPath, taskRequest, secrets); + Operator operator = factory.newOperator(operatorContext); + + // first call of operator.run() set BigQuery job id in state param + // and throw TaskExecutionException + try { + operator.run(); + } catch (TaskExecutionException e) { + taskRequest = taskRequest.withLastStateParams(e.getStateParams(configFactory).get()); + } + + Job job = new Job(); + job.setStatus(new JobStatus().setState("DONE")); + when(bqClient.jobStatus(anyString(), anyString(), any())).thenReturn(job); + + // second call of operator.run() check BigQUery job status + operatorContext = newContext(projectPath, taskRequest, secrets); + operator = factory.newOperator(operatorContext); + operator.run(); + } + +@Before + public void setUp() throws Exception + { + when(gcpCredential.credential()).thenReturn(googleCredential); + when(gcpCredential.projectId()).thenReturn(Optional.of(PROJECT_ID)); + when(gcpCredentialProvider.credential(secrets)).thenReturn(gcpCredential); + when(secrets.getSecretOptional("gcp.project")).thenReturn(Optional.of(PROJECT_ID)); + when(bqClientFactory.create(googleCredential)).thenReturn(bqClient); + + factory = new BqOperatorFactory(templateEngine, bqClientFactory, gcpCredentialProvider); + } + + @Test + public void testBqTimePartitioningDay() throws IOException { + Config config = newConfig(); + config.set("query", dummyQuery); + config.set("destination_table", dummyDataset + "." + dummyTable + "$20230723"); + config.set("time_partitioning", ImmutableMap.of( + "type", "DAY", + "field", "date", + "requirePartitionFilter", true, + "expirationMs", 3600000L)); + config.set("clustering", ImmutableMap.of( + "fields", ImmutableList.of("field1", "field2"))); + config.set("schema_update_options", ImmutableList.of("ALLOW_FIELD_ADDITION")); + config.set("maximum_bytes_billed", 1024L); + config.set("priority", "BATCH"); + config.set("encryption_configuration", ImmutableMap.of( + "kmsKeyName", "dummy_key")); + + doTestBq(config); + } + + @Test + public void testBqTimePartitioningYear() throws IOException { + Config config = newConfig(); + config.set("query", dummyQuery); + config.set("use_legacy_sql", true); + config.set("destination_table", dummyDataset + "." + dummyTable + "$2023"); + config.set("time_partitioning", ImmutableMap.of( + "type", "YEAR", + "field", "date", + "requirePartitionFilter", true, + "expirationMs", 3600000L)); + config.set("clustering", ImmutableMap.of( + "fields", ImmutableList.of("field1", "field2"))); + config.set("schema_update_options", ImmutableList.of("ALLOW_FIELD_ADDITION")); + config.set("maximum_bytes_billed", 1024L); + config.set("priority", "BATCH"); + + doTestBq(config); + } + + @Test + public void testBqTimePartitioningHour() throws IOException { + Config config = newConfig(); + config.set("query", dummyQuery); + config.set("use_legacy_sql", true); + config.set("destination_table", dummyDataset + "." + dummyTable + "$2023072301"); + config.set("time_partitioning", ImmutableMap.of( + "type", "HOUR", + "field", "date", + "requirePartitionFilter", true, + "expirationMs", 3600000L)); + config.set("clustering", ImmutableMap.of( + "fields", ImmutableList.of("field1", "field2"))); + config.set("schema_update_options", ImmutableList.of("ALLOW_FIELD_ADDITION")); + config.set("maximum_bytes_billed", 1024L); + config.set("priority", "BATCH"); + + doTestBq(config); + } + + @Test + public void testBqRangePartitioning() throws IOException { + Config config = newConfig(); + config.set("query", dummyQuery); + config.set("destination_table", dummyDataset + "." + dummyTable); + config.set("range_partitioning", ImmutableMap.of( + "field", "id", + "range", ImmutableMap.of( + "start", 0L, + "interval", 10L, + "end", 100L))); + config.set("clustering", ImmutableMap.of( + "fields", ImmutableList.of("field1", "field2"))); + config.set("schema_update_options", ImmutableList.of("ALLOW_FIELD_ADDITION")); + config.set("maximum_bytes_billed", 1024L); + config.set("priority", "BATCH"); + + doTestBq(config); + } + +} \ No newline at end of file