From c82714956d67ffe90d8a6434ff7173c93c8f1395 Mon Sep 17 00:00:00 2001 From: ryoji-hasegawa Date: Fri, 15 Oct 2021 19:22:27 +0900 Subject: [PATCH 1/5] update gcp operators --- digdag-standards/build.gradle | 2 +- .../operator/gcp/BaseBqJobOperator.java | 31 +++++++++++++++++++ .../operator/gcp/BaseGcpOperator.java | 4 ++- .../operator/gcp/BqLoadOperatorFactory.java | 21 ++++++++++++- .../operator/gcp/BqOperatorFactory.java | 14 +++++++++ 5 files changed, 69 insertions(+), 3 deletions(-) diff --git a/digdag-standards/build.gradle b/digdag-standards/build.gradle index 9ac42484ad..bbf396a2de 100644 --- a/digdag-standards/build.gradle +++ b/digdag-standards/build.gradle @@ -35,7 +35,7 @@ dependencies { compile "com.amazonaws:aws-java-sdk-dynamodb:${project.ext.awsJavaSdkVersion}" // bigquery - compile ('com.google.apis:google-api-services-bigquery:v2-rev325-1.22.0') { exclude group: 'com.google.guava', module: 'guava-jdk5' } + compile('com.google.apis:google-api-services-bigquery:v2-rev20230520-2.0.0') { exclude group: 'com.google.guava', module: 'guava-jdk5' } // gcs compile ('com.google.apis:google-api-services-storage:v1-rev20190910-1.30.3') { diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseBqJobOperator.java b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseBqJobOperator.java index 5aebeac220..944a23c694 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseBqJobOperator.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseBqJobOperator.java @@ -2,6 +2,8 @@ import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfiguration; +import com.google.api.services.bigquery.model.RangePartitioning; +import com.google.api.services.bigquery.model.TimePartitioning; import com.google.common.base.Optional; import io.digdag.client.config.Config; import io.digdag.client.config.ConfigFactory; @@ -43,4 +45,33 @@ private TaskResult result(Job job) } protected abstract JobConfiguration jobConfiguration(String projectId); + + protected RangePartitioning rangePartitioning(Config params) { + Config rangeParams = params.getNested("range"); + RangePartitioning.Range range = new RangePartitioning.Range(); + range.setStart(rangeParams.get("start", Long.class)) + .setEnd(rangeParams.get("end", Long.class)) + .setInterval(rangeParams.get("interval", Long.class)); + + RangePartitioning rPart = new RangePartitioning(); + rPart.setField(params.get("field", String.class)) + .setRange(range); + + return rPart; + } + + protected TimePartitioning timePartitioning(Config params) { + TimePartitioning tPart = new TimePartitioning(); + // required fields + tPart.setType(params.get("type", String.class)); + + // optional fields + params.getOptional("field", String.class).transform(tPart::setField); + params.getOptional("requirePartitionFilter", Boolean.class).transform(tPart::setRequirePartitionFilter); + if (params.has("expirationMs")) { + tPart.setExpirationMs(params.get("expirationMs", Long.class)); + } + + return tPart; + } } diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseGcpOperator.java b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseGcpOperator.java index 883bb5cb0e..8a7f6040d6 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseGcpOperator.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseGcpOperator.java @@ -29,7 +29,9 @@ public TaskResult runTask() private String projectId(GcpCredential credential) { - Optional projectId = context.getSecrets().getSecretOptional("gcp.project") + Optional projectId = request.getConfig().getNestedOrGetEmpty("gcp") + .getOptional("project", String.class) + .or(context.getSecrets().getSecretOptional("gcp.project")) .or(credential.projectId()); if (!projectId.isPresent()) { throw new TaskExecutionException("Missing 'gcp.project' secret"); diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqLoadOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqLoadOperatorFactory.java index c9cbad5ae4..fa8540d1ce 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqLoadOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqLoadOperatorFactory.java @@ -1,11 +1,14 @@ package io.digdag.standards.operator.gcp; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.api.services.bigquery.model.Clustering; import com.google.api.services.bigquery.model.DatasetReference; +import com.google.api.services.bigquery.model.EncryptionConfiguration; +import com.google.api.services.bigquery.model.HivePartitioningOptions; import com.google.api.services.bigquery.model.JobConfiguration; import com.google.api.services.bigquery.model.JobConfigurationLoad; +import com.google.api.services.bigquery.model.ParquetOptions; import com.google.api.services.bigquery.model.TableSchema; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; @@ -99,6 +102,22 @@ protected JobConfiguration jobConfiguration(String projectId) Optional.of(params.getListOrEmpty("projection_fields", String.class)).transform(cfg::setProjectionFields); params.getOptional("autodetect", boolean.class).transform(cfg::setAutodetect); Optional.of(params.getListOrEmpty("schema_update_options", String.class)).transform(cfg::setSchemaUpdateOptions); + params.getOptional("clustering", Clustering.class).transform(cfg::setClustering); + Optional.of(params.getListOrEmpty("decimal_target_types", String.class)).transform(cfg::setDecimalTargetTypes); + params.getOptional("encryption_configuration", EncryptionConfiguration.class).transform(cfg::setDestinationEncryptionConfiguration); + params.getOptional("hive_partitioning_options", HivePartitioningOptions.class).transform(cfg::setHivePartitioningOptions); + params.getOptional("json_extension", String.class).transform(cfg::setJsonExtension); + params.getOptional("null_marker", String.class).transform(cfg::setNullMarker); + params.getOptional("parquet_options", ParquetOptions.class).transform(cfg::setParquetOptions); + params.getOptional("use_avro_logical_types", boolean.class).transform(cfg::setUseAvroLogicalTypes); + + if (params.has("range_partitioning")) { + cfg.setRangePartitioning(rangePartitioning(params.getNested("range_partitioning"))); + } + + if (params.has("time_partitioning")) { + cfg.setTimePartitioning(timePartitioning(params.getNested("time_partitioning"))); + } return new JobConfiguration() .setLoad(cfg); diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqOperatorFactory.java index 0e20760973..53e699950c 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqOperatorFactory.java @@ -1,7 +1,9 @@ package io.digdag.standards.operator.gcp; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.api.services.bigquery.model.Clustering; import com.google.api.services.bigquery.model.DatasetReference; +import com.google.api.services.bigquery.model.EncryptionConfiguration; import com.google.api.services.bigquery.model.ExternalDataConfiguration; import com.google.api.services.bigquery.model.JobConfiguration; import com.google.api.services.bigquery.model.JobConfigurationQuery; @@ -86,6 +88,18 @@ protected JobConfiguration jobConfiguration(String projectId) params.getOptional("destination_table", String.class) .transform(s -> cfg.setDestinationTable(tableReference(projectId, defaultDataset, s))); + params.getOptional("clustering", Clustering.class).transform(cfg::setClustering); + params.getOptional("encryption_configuration", EncryptionConfiguration.class).transform(cfg::setDestinationEncryptionConfiguration); + params.getOptional("maximum_bytes_billed", Long.class).transform(cfg::setMaximumBytesBilled); + Optional.of(params.getListOrEmpty("schema_update_options", String.class)).transform(cfg::setSchemaUpdateOptions); + + if (params.has("range_partitioning")) { + cfg.setRangePartitioning(rangePartitioning(params.getNested("range_partitioning"))); + } + + if (params.has("time_partitioning")) { + cfg.setTimePartitioning(timePartitioning(params.getNested("time_partitioning"))); + } return new JobConfiguration() .setQuery(cfg); From 56c383380477d27054f4cf791d042d0057304fb5 Mon Sep 17 00:00:00 2001 From: ryoji-hasegawa Date: Sat, 22 Jul 2023 17:13:23 +0900 Subject: [PATCH 2/5] add tests for bq operator --- .../operator/gcp/BqOperatorFactoryTest.java | 265 ++++++++++++++++++ 1 file changed, 265 insertions(+) create mode 100644 digdag-standards/src/test/java/io/digdag/standards/operator/gcp/BqOperatorFactoryTest.java diff --git a/digdag-standards/src/test/java/io/digdag/standards/operator/gcp/BqOperatorFactoryTest.java b/digdag-standards/src/test/java/io/digdag/standards/operator/gcp/BqOperatorFactoryTest.java new file mode 100644 index 0000000000..42730dd7c4 --- /dev/null +++ b/digdag-standards/src/test/java/io/digdag/standards/operator/gcp/BqOperatorFactoryTest.java @@ -0,0 +1,265 @@ +package io.digdag.standards.operator.gcp; + +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.when; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; +import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.JobStatus; +import com.google.api.services.bigquery.model.RangePartitioning.Range; +import com.google.api.services.bigquery.model.TimePartitioning; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import io.digdag.client.config.Config; +import io.digdag.spi.ImmutableTaskRequest; +import io.digdag.spi.Operator; +import io.digdag.spi.OperatorContext; +import io.digdag.spi.SecretProvider; +import io.digdag.spi.TaskExecutionException; +import io.digdag.spi.TemplateEngine; + +import java.io.IOException; +import java.nio.file.Path; +import java.time.Instant; +import java.time.ZoneId; +import java.util.List; +import java.util.UUID; + +import static io.digdag.client.config.ConfigUtils.configFactory; +import static io.digdag.client.config.ConfigUtils.newConfig; +import static io.digdag.core.workflow.OperatorTestingUtils.newContext; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; + +@RunWith(MockitoJUnitRunner.class) +public class BqOperatorFactoryTest { + private static final String PROJECT_ID = "test-project"; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Mock + TemplateEngine templateEngine; + @Mock + SecretProvider secrets; + + @Mock + BqClient bqClient; + @Mock + BqClient.Factory bqClientFactory; + + @Mock + GoogleCredential googleCredential; + @Mock + GcpCredential gcpCredential; + @Mock + GcpCredentialProvider gcpCredentialProvider; + + private Path projectPath; + private OperatorContext operatorContext; + private BqOperatorFactory factory; + + private static final String dummyQuery = "SELECT 1"; + private static final String dummyDataset = "some_dataset"; + private static final String dummyTable = "some_table"; + + private void doTestBq(Config config) throws IOException { + Answer answer = new Answer() { + public Void answer(InvocationOnMock invocation) { + JobConfigurationQuery jobConf = invocation.getArgumentAt(1, Job.class).getConfiguration().getQuery(); + + assertThat(jobConf.getQuery(), is(config.get("query", String.class))); + // bq operator sets default value of use_legacy_sql at false + assertThat(jobConf.getUseLegacySql(), is(config.get("use_legacy_sql", Boolean.class, false))); + assertThat(jobConf.getAllowLargeResults(), is(config.get("allow_large_results", Boolean.class, null))); + assertThat(jobConf.getUseQueryCache(), is(config.get("use_query_cache", Boolean.class, null))); + assertThat(jobConf.getCreateDisposition(), is(config.get("create_disposition", String.class, null))); + assertThat(jobConf.getWriteDisposition(), is(config.get("write_disposition", String.class, null))); + assertThat(jobConf.getFlattenResults(), is(config.get("flatten_results", Boolean.class, null))); + assertThat(jobConf.getMaximumBillingTier(), + is(config.get("maximum_billing_tier", Integer.class, null))); + assertThat(jobConf.getPriority(), is(config.get("priority", String.class, null))); + + String destinaionTable = jobConf.getDestinationTable().getDatasetId() + "." + + jobConf.getDestinationTable().getTableId(); + assertThat(destinaionTable, is(config.get("destination_table", String.class))); + + Config clustering = config.getNestedOrGetEmpty("clustering"); + if (!clustering.isEmpty()) + assertThat(jobConf.getClustering().getFields(), + is(contains(clustering.getList("fields", String.class).toArray()))); + + Config encryptionConfiguration = config.getNestedOrGetEmpty("encryption_configuration"); + if (!encryptionConfiguration.isEmpty()) + assertThat(jobConf.getDestinationEncryptionConfiguration().getKmsKeyName(), + is(encryptionConfiguration.get("kmsKeyName", String.class))); + + assertThat(jobConf.getMaximumBytesBilled(), is(config.get("maximum_bytes_billed", Long.class, null))); + + List schema_update_options = config.getListOrEmpty("schema_update_options", String.class); + if (!schema_update_options.isEmpty()) + assertThat(jobConf.getSchemaUpdateOptions(), is(contains(schema_update_options.toArray()))); + + Config rangePartitioning = config.getNestedOrGetEmpty("range_partitioning"); + if (!rangePartitioning.isEmpty()) { + assertThat(jobConf.getRangePartitioning().getField(), + is(rangePartitioning.get("field", String.class))); + Range range = jobConf.getRangePartitioning().getRange(); + Config rangeConfig = rangePartitioning.getNested("range"); + assertThat(range.getStart(), + is(rangeConfig.get("start", Long.class))); + assertThat(range.getInterval(), + is(rangeConfig.get("interval", Long.class))); + assertThat(range.getEnd(), + is(rangeConfig.get("end", Long.class))); + } + + Config timePartitioningConfig = config.getNestedOrGetEmpty("time_partitioning"); + if (!timePartitioningConfig.isEmpty()) { + TimePartitioning timePartitioning = jobConf.getTimePartitioning(); + assertThat(timePartitioning.getType(), + is(timePartitioningConfig.get("type", String.class))); + assertThat(timePartitioning.getField(), + is(timePartitioningConfig.get("field", String.class, null))); + assertThat(timePartitioning.getRequirePartitionFilter(), + is(timePartitioningConfig.get("requirePartitionFilter", Boolean.class, null))); + assertThat(timePartitioning.getExpirationMs(), + is(timePartitioningConfig.get("expirationMs", Long.class, null))); + } + + return null; + } + }; + doAnswer(answer).when(bqClient).submitJob(anyString(), any()); + + projectPath = temporaryFolder.newFolder().toPath(); + ImmutableTaskRequest taskRequest = ImmutableTaskRequest.builder() + .siteId(0) + .projectId(0) + .projectName("dummy_project") + .workflowName("dummy_workflow") + .taskId(0) + .attemptId(0) + .sessionId(0) + .taskName("dummy_task") + .lockId("dummy_lockId") + .timeZone(ZoneId.of("UTC")) + .sessionUuid(UUID.randomUUID()) + .sessionTime(Instant.now()) + .createdAt(Instant.now()) + .isCancelRequested(false) + .localConfig(newConfig()) + .config(config) + .lastStateParams(newConfig()) + .build(); + operatorContext = newContext(projectPath, taskRequest, secrets); + Operator operator = factory.newOperator(operatorContext); + + // first call of operator.run() set BigQuery job id in state param + // and throw TaskExecutionException + try { + operator.run(); + } catch (TaskExecutionException e) { + taskRequest = taskRequest.withLastStateParams(e.getStateParams(configFactory).get()); + } + + Job job = new Job(); + job.setStatus(new JobStatus().setState("DONE")); + when(bqClient.jobStatus(anyString(), anyString(), any())).thenReturn(job); + + // second call of operator.run() check BigQUery job status + operatorContext = newContext(projectPath, taskRequest, secrets); + operator = factory.newOperator(operatorContext); + operator.run(); + } + +@Before + public void setUp() throws Exception + { + when(gcpCredential.credential()).thenReturn(googleCredential); + when(gcpCredential.projectId()).thenReturn(Optional.of(PROJECT_ID)); + when(gcpCredentialProvider.credential(secrets)).thenReturn(gcpCredential); + when(secrets.getSecretOptional("gcp.project")).thenReturn(Optional.of(PROJECT_ID)); + when(bqClientFactory.create(googleCredential)).thenReturn(bqClient); + + factory = new BqOperatorFactory(templateEngine, bqClientFactory, gcpCredentialProvider); + } + + @Test + public void testBqTimePartitioningDay() throws IOException { + Config config = newConfig(); + config.set("query", dummyQuery); + config.set("destination_table", dummyDataset + "." + dummyTable); + config.set("time_partitioning", ImmutableMap.of( + "type", "DAY", + "field", "date", + "requirePartitionFilter", true, + "expirationMs", 3600000L)); + config.set("clustering", ImmutableMap.of( + "fields", ImmutableList.of("field1", "field2"))); + config.set("schema_update_options", ImmutableList.of("ALLOW_FIELD_ADDITION")); + config.set("maximum_bytes_billed", 1024L); + config.set("priority", "BATCH"); + config.set("encryption_configuration", ImmutableMap.of( + "kmsKeyName", "dummy_key")); + + doTestBq(config); + } + + @Test + public void testBqTimePartitioningYear() throws IOException { + Config config = newConfig(); + config.set("query", dummyQuery); + config.set("use_legacy_sql", true); + config.set("destination_table", dummyDataset + "." + dummyTable); + config.set("time_partitioning", ImmutableMap.of( + "type", "YEAR", + "field", "date", + "requirePartitionFilter", true, + "expirationMs", 3600000L)); + config.set("clustering", ImmutableMap.of( + "fields", ImmutableList.of("field1", "field2"))); + config.set("schema_update_options", ImmutableList.of("ALLOW_FIELD_ADDITION")); + config.set("maximum_bytes_billed", 1024L); + config.set("priority", "BATCH"); + + doTestBq(config); + } + + @Test + public void testBqRangePartitioning() throws IOException { + Config config = newConfig(); + config.set("query", dummyQuery); + config.set("destination_table", dummyDataset + "." + dummyTable); + config.set("range_partitioning", ImmutableMap.of( + "field", "id", + "range", ImmutableMap.of( + "start", 0L, + "interval", 10L, + "end", 100L))); + config.set("clustering", ImmutableMap.of( + "fields", ImmutableList.of("field1", "field2"))); + config.set("schema_update_options", ImmutableList.of("ALLOW_FIELD_ADDITION")); + config.set("maximum_bytes_billed", 1024L); + config.set("priority", "BATCH"); + + doTestBq(config); + } + +} \ No newline at end of file From f326f9e95b7908525b296cbbc7c8aff5aab82a20 Mon Sep 17 00:00:00 2001 From: ryoji-hasegawa Date: Sun, 23 Jul 2023 15:41:11 +0900 Subject: [PATCH 3/5] add tests for bq_load operator --- .../gcp/BqLoadOperatorFactoryTest.java | 310 ++++++++++++++++++ 1 file changed, 310 insertions(+) create mode 100644 digdag-standards/src/test/java/io/digdag/standards/operator/gcp/BqLoadOperatorFactoryTest.java diff --git a/digdag-standards/src/test/java/io/digdag/standards/operator/gcp/BqLoadOperatorFactoryTest.java b/digdag-standards/src/test/java/io/digdag/standards/operator/gcp/BqLoadOperatorFactoryTest.java new file mode 100644 index 0000000000..6337a4f4bd --- /dev/null +++ b/digdag-standards/src/test/java/io/digdag/standards/operator/gcp/BqLoadOperatorFactoryTest.java @@ -0,0 +1,310 @@ +package io.digdag.standards.operator.gcp; + +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.file.Path; +import java.time.Instant; +import java.time.ZoneId; +import java.util.List; +import java.util.UUID; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; +import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.JobConfigurationLoad; +import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.JobStatus; +import com.google.api.services.bigquery.model.TimePartitioning; +import com.google.api.services.bigquery.model.RangePartitioning.Range; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import io.digdag.client.DigdagClient; +import io.digdag.client.config.Config; +import io.digdag.client.config.ConfigException; +import io.digdag.spi.ImmutableTaskRequest; +import io.digdag.spi.Operator; +import io.digdag.spi.OperatorContext; +import io.digdag.spi.SecretProvider; +import io.digdag.spi.TaskExecutionException; +import io.digdag.spi.TemplateEngine; + +import static io.digdag.client.config.ConfigUtils.configFactory; +import static io.digdag.client.config.ConfigUtils.newConfig; +import static io.digdag.core.workflow.OperatorTestingUtils.newContext; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; + +@RunWith(MockitoJUnitRunner.class) +public class BqLoadOperatorFactoryTest { + private static final String PROJECT_ID = "test-project"; + private static final ObjectMapper OBJECT_MAPPER = DigdagClient.objectMapper(); + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Mock + TemplateEngine templateEngine; + @Mock + SecretProvider secrets; + + @Mock + BqClient bqClient; + @Mock + BqClient.Factory bqClientFactory; + + @Mock + GoogleCredential googleCredential; + @Mock + GcpCredential gcpCredential; + @Mock + GcpCredentialProvider gcpCredentialProvider; + + private Path projectPath; + private OperatorContext operatorContext; + private BqLoadOperatorFactory factory; + + private static final String dummyPath = "gs://bucket/path"; + private static final String dummyDataset = "some_dataset"; + private static final String dummyTable = "some_table"; + + private void doTestBqLoad(Config config) throws IOException { + Answer answer = new Answer() { + public Void answer(InvocationOnMock invocation) { + JobConfigurationLoad jobConf = invocation.getArgumentAt(1, Job.class).getConfiguration().getLoad(); + + assertThat(jobConf.getSourceUris(), is(contains(sourceUris(config).toArray()))); + + String destinaionTable = jobConf.getDestinationTable().getDatasetId() + "." + + jobConf.getDestinationTable().getTableId(); + assertThat(destinaionTable, is(config.get("destination_table", String.class))); + assertThat(jobConf.getSourceFormat(), is(config.get("source_format", String.class, null))); + assertThat(jobConf.getAutodetect(), is(config.get("autodetect", Boolean.class, null))); + + Config clustering = config.getNestedOrGetEmpty("clustering"); + if (!clustering.isEmpty()) + assertThat(jobConf.getClustering().getFields(), + is(contains(clustering.getList("fields", String.class).toArray()))); + + if (config.has("decimal_target_types")) + assertThat(jobConf.getDecimalTargetTypes(), + is(contains(config.getList("decimal_target_types", String.class).toArray()))); + + Config encryptionConfiguration = config.getNestedOrGetEmpty("encryption_configuration"); + if (!encryptionConfiguration.isEmpty()) + assertThat(jobConf.getDestinationEncryptionConfiguration().getKmsKeyName(), + is(encryptionConfiguration.get("kmsKeyName", String.class))); + + Config hivePartitioningOptions = config.getNestedOrGetEmpty("hive_partitioning_options"); + if (!hivePartitioningOptions.isEmpty()) { + assertThat(jobConf.getHivePartitioningOptions().getMode(), + is(hivePartitioningOptions.get("mode", String.class))); + assertThat(jobConf.getHivePartitioningOptions().getSourceUriPrefix(), + is(hivePartitioningOptions.get("sourceUriPrefix", String.class))); + } + + assertThat(jobConf.getJsonExtension(), is(config.get("json_extension", String.class, null))); + assertThat(jobConf.getNullMarker(), is(config.get("null_marker", String.class, null))); + + Config parquetOptions = config.getNestedOrGetEmpty("parquet_options"); + if (!parquetOptions.isEmpty()) { + assertThat(jobConf.getParquetOptions().getEnableListInference(), + is(parquetOptions.get("enableListInference", Boolean.class, null))); + assertThat(jobConf.getParquetOptions().getEnumAsString(), + is(parquetOptions.get("enumAsString", Boolean.class, null))); + } + + assertThat(jobConf.getUseAvroLogicalTypes(), + is(config.get("use_avro_logical_types", Boolean.class, null))); + + Config rangePartitioning = config.getNestedOrGetEmpty("range_partitioning"); + if (!rangePartitioning.isEmpty()) { + assertThat(jobConf.getRangePartitioning().getField(), + is(rangePartitioning.get("field", String.class))); + Range range = jobConf.getRangePartitioning().getRange(); + Config rangeConfig = rangePartitioning.getNested("range"); + assertThat(range.getStart(), + is(rangeConfig.get("start", Long.class))); + assertThat(range.getInterval(), + is(rangeConfig.get("interval", Long.class))); + assertThat(range.getEnd(), + is(rangeConfig.get("end", Long.class))); + } + + Config timePartitioningConfig = config.getNestedOrGetEmpty("time_partitioning"); + if (!timePartitioningConfig.isEmpty()) { + TimePartitioning timePartitioning = jobConf.getTimePartitioning(); + assertThat(timePartitioning.getType(), + is(timePartitioningConfig.get("type", String.class))); + assertThat(timePartitioning.getField(), + is(timePartitioningConfig.get("field", String.class, null))); + assertThat(timePartitioning.getRequirePartitionFilter(), + is(timePartitioningConfig.get("requirePartitionFilter", Boolean.class, null))); + assertThat(timePartitioning.getExpirationMs(), + is(timePartitioningConfig.get("expirationMs", Long.class, null))); + } + + return null; + } + }; + doAnswer(answer).when(bqClient).submitJob(anyString(), any()); + + projectPath = temporaryFolder.newFolder().toPath(); + ImmutableTaskRequest taskRequest = ImmutableTaskRequest.builder() + .siteId(0) + .projectId(0) + .projectName("dummy_project") + .workflowName("dummy_workflow") + .taskId(0) + .attemptId(0) + .sessionId(0) + .taskName("dummy_task") + .lockId("dummy_lockId") + .timeZone(ZoneId.of("UTC")) + .sessionUuid(UUID.randomUUID()) + .sessionTime(Instant.now()) + .createdAt(Instant.now()) + .isCancelRequested(false) + .localConfig(newConfig()) + .config(config) + .lastStateParams(newConfig()) + .build(); + operatorContext = newContext(projectPath, taskRequest, secrets); + Operator operator = factory.newOperator(operatorContext); + + // first call of operator.run() set BigQuery job id in state param + // and throw TaskExecutionException + try { + operator.run(); + } catch (TaskExecutionException e) { + taskRequest = taskRequest.withLastStateParams(e.getStateParams(configFactory).get()); + } + + Job job = new Job(); + job.setStatus(new JobStatus().setState("DONE")); + when(bqClient.jobStatus(anyString(), anyString(), any())).thenReturn(job); + + // second call of operator.run() check BigQUery job status + operatorContext = newContext(projectPath, taskRequest, secrets); + operator = factory.newOperator(operatorContext); + operator.run(); + } + + private List sourceUris(Config params) { + try { + return params.parseList("_command", String.class); + } catch (ConfigException ignore) { + return ImmutableList.of(params.get("_command", String.class)); + } + } + + @Before + public void setUp() throws Exception + { + when(gcpCredential.credential()).thenReturn(googleCredential); + when(gcpCredential.projectId()).thenReturn(Optional.of(PROJECT_ID)); + when(gcpCredentialProvider.credential(secrets)).thenReturn(gcpCredential); + when(secrets.getSecretOptional("gcp.project")).thenReturn(Optional.of(PROJECT_ID)); + when(bqClientFactory.create(googleCredential)).thenReturn(bqClient); + + factory = new BqLoadOperatorFactory(OBJECT_MAPPER, templateEngine, bqClientFactory, gcpCredentialProvider); + } + + @Test + public void testBqLoadJsonTimePartitioningDay() throws IOException { + Config config = newConfig(); + config.set("_command", dummyPath); + config.set("destination_table", dummyDataset + "." + dummyTable); + config.set("source_format", "NEWLINE_DELIMITED_JSON"); + config.set("autodetect", true); + config.set("clustering", ImmutableMap.of( + "fields", ImmutableList.of("field1", "field2"))); + config.set("decimal_target_types", ImmutableList.of("BIGNUMERIC", "STRING")); + config.set("encryption_configuration", ImmutableMap.of( + "kmsKeyName", "dummy_key")); + config.set("json_extension", "GEOJSON"); + config.set("time_partitioning", ImmutableMap.of( + "type", "DAY", + "field", "date", + "requirePartitionFilter", true, + "expirationMs", 3600000L)); + + doTestBqLoad(config); + } + + @Test + public void testBqLoadCsvTimePartitioningYear() throws IOException { + Config config = newConfig(); + config.set("_command", dummyPath); + config.set("destination_table", dummyDataset + "." + dummyTable); + config.set("source_format", "CSV"); + config.set("autodetect", true); + config.set("null_marker", "NULL"); + config.set("time_partitioning", ImmutableMap.of( + "type", "YEAR", + "field", "date", + "requirePartitionFilter", true, + "expirationMs", 3600000L)); + + doTestBqLoad(config); + } + + @Test + public void testBqLoadJsonHivePartition() throws IOException { + Config config = newConfig(); + config.set("_command", dummyPath); + config.set("destination_table", dummyDataset + "." + dummyTable); + config.set("source_format", "NEWLINE_DELIMITED_JSON"); + config.set("hive_partitioning_options", ImmutableMap.of( + "mode", "AUTO", + "sourceUriPrefix", dummyPath)); + + doTestBqLoad(config); + } + + @Test + public void testBqLoadParquetRangePartition() throws IOException { + Config config = newConfig(); + config.set("_command", dummyPath); + config.set("destination_table", dummyDataset + "." + dummyTable); + config.set("source_format", "PARQUET"); + config.set("parquet_options", ImmutableMap.of( + "enableListInference", true, + "enumAsString", true)); + config.set("range_partitioning", ImmutableMap.of( + "field", "id", + "range", ImmutableMap.of( + "start", 0L, + "interval", 10L, + "end", 100L))); + + doTestBqLoad(config); + } + + @Test + public void testBqLoadAvro() throws IOException { + Config config = newConfig(); + config.set("_command", dummyPath); + config.set("destination_table", dummyDataset + "." + dummyTable); + config.set("source_format", "AVRO"); + config.set("use_avro_logical_types", true); + + doTestBqLoad(config); + } +} From b86882812dc28692d10833433e69f63e14ffe512 Mon Sep 17 00:00:00 2001 From: ryoji-hasegawa Date: Sun, 23 Jul 2023 18:24:15 +0900 Subject: [PATCH 4/5] fix table validation for yearly, monthly, hourly partition --- .../io/digdag/standards/operator/gcp/Bq.java | 3 ++- .../operator/gcp/BqOperatorFactoryTest.java | 24 +++++++++++++++++-- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/Bq.java b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/Bq.java index 39c6be4279..83996a2881 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/Bq.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/Bq.java @@ -9,7 +9,8 @@ class Bq { - private static final Pattern TABLE_REFERENCE_PATTERN = Pattern.compile("^(?:(?[^:]+):)?(?:(?[^.]+)\\.)?(?[a-zA-Z0-9_]{1,1024}(?:\\$[0-9]{8})?)$"); + private static final Pattern TABLE_REFERENCE_PATTERN = Pattern.compile( + "^(?:(?[^:]+):)?(?:(?[^.]+)\\.)?(?
[a-zA-Z0-9_]{1,1024}(?:\\$[0-9]{4,10})?)$"); static TableReference tableReference(String defaultProjectId, Optional defaultDataset, String s) { diff --git a/digdag-standards/src/test/java/io/digdag/standards/operator/gcp/BqOperatorFactoryTest.java b/digdag-standards/src/test/java/io/digdag/standards/operator/gcp/BqOperatorFactoryTest.java index 42730dd7c4..0f22b8291d 100644 --- a/digdag-standards/src/test/java/io/digdag/standards/operator/gcp/BqOperatorFactoryTest.java +++ b/digdag-standards/src/test/java/io/digdag/standards/operator/gcp/BqOperatorFactoryTest.java @@ -205,7 +205,7 @@ public void setUp() throws Exception public void testBqTimePartitioningDay() throws IOException { Config config = newConfig(); config.set("query", dummyQuery); - config.set("destination_table", dummyDataset + "." + dummyTable); + config.set("destination_table", dummyDataset + "." + dummyTable + "$20230723"); config.set("time_partitioning", ImmutableMap.of( "type", "DAY", "field", "date", @@ -227,7 +227,7 @@ public void testBqTimePartitioningYear() throws IOException { Config config = newConfig(); config.set("query", dummyQuery); config.set("use_legacy_sql", true); - config.set("destination_table", dummyDataset + "." + dummyTable); + config.set("destination_table", dummyDataset + "." + dummyTable + "$2023"); config.set("time_partitioning", ImmutableMap.of( "type", "YEAR", "field", "date", @@ -242,6 +242,26 @@ public void testBqTimePartitioningYear() throws IOException { doTestBq(config); } + @Test + public void testBqTimePartitioningHour() throws IOException { + Config config = newConfig(); + config.set("query", dummyQuery); + config.set("use_legacy_sql", true); + config.set("destination_table", dummyDataset + "." + dummyTable + "$2023072301"); + config.set("time_partitioning", ImmutableMap.of( + "type", "HOUR", + "field", "date", + "requirePartitionFilter", true, + "expirationMs", 3600000L)); + config.set("clustering", ImmutableMap.of( + "fields", ImmutableList.of("field1", "field2"))); + config.set("schema_update_options", ImmutableList.of("ALLOW_FIELD_ADDITION")); + config.set("maximum_bytes_billed", 1024L); + config.set("priority", "BATCH"); + + doTestBq(config); + } + @Test public void testBqRangePartitioning() throws IOException { Config config = newConfig(); From 45a351d024e4713902805b643cb27508adcca6a4 Mon Sep 17 00:00:00 2001 From: ryoji-hasegawa Date: Sun, 22 Oct 2023 13:55:51 +0900 Subject: [PATCH 5/5] update document --- digdag-docs/src/operators/bq.md | 77 ++++++++++++++++++++ digdag-docs/src/operators/bq_load.md | 101 ++++++++++++++++++++++++++- 2 files changed, 177 insertions(+), 1 deletion(-) diff --git a/digdag-docs/src/operators/bq.md b/digdag-docs/src/operators/bq.md index 3d80738ade..f8adad6be9 100644 --- a/digdag-docs/src/operators/bq.md +++ b/digdag-docs/src/operators/bq.md @@ -3,6 +3,8 @@ **bq>** operator runs a query on Google BigQuery. _export: + gcp: + project: my_project_id bq: dataset: my_dataset @@ -56,6 +58,17 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag. location: asia-northeast1 ``` +* **gcp.project**: NAME + + Specifies the default Google Cloud project to use in the query and in the `destination_table` parameter. + + Examples: + + ``` + gcp: + project: my_project_id + ``` + * **dataset**: NAME Specifies the default dataset to use in the query and in the `destination_table` parameter. @@ -167,6 +180,70 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag. Describes user-defined function resources used in the query. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.userDefinedFunctionResources). +* **clustering**: OBJECT + + Clustering specification for the destination table. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Clustering). + + Examples: + + ```yaml + clustering: + fields: + - field1 + ``` + +* **encryption_configuration**: OBJECT + + Custom encryption configuration. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/EncryptionConfiguration). + + Examples: + + ```yaml + encryption_configuration: + kmsKeyName: key_name + ``` + +* **maximum_bytes_billed**: LONG + + Limits the bytes billed for this job. Queries that will have bytes billed beyond this limit will fail (without incurring a charge). If unspecified, this will be set to your project default. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationquery). + +* **schema_update_options**: LIST + + Allows the schema of the destination table to be updated as a side effect of the query job. Schema update options are supported in two cases: when writeDisposition is WRITE_APPEND; when writeDisposition is WRITE_TRUNCATE and the destination table is a partition of a table, specified by partition decorators. For normal tables, WRITE_TRUNCATE will always overwrite the schema. One or more of the following values are specified: + - ALLOW_FIELD_ADDITION: allow adding a nullable field to the schema. + - ALLOW_FIELD_RELAXATION: allow relaxing a required field in the original schema to nullable. + + For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationquery). + +* **range_partitioning**: OBJECT + + Range partitioning specification for the destination table. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#RangePartitioning). + + Examples: + + ```yaml + range_partitioning: + field: id + range: + start: 0 + interval: 10 + end: 100 + ``` + +* **time_partitioning**: OBJECT + + Time-based partitioning specification for the destination table. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TimePartitioning). + + Examples: + + ```yaml + time_partitioning: + type: DAY + field: date + requirePartitionFilter: true + + ``` + ## Output parameters * **bq.last_job_id** diff --git a/digdag-docs/src/operators/bq_load.md b/digdag-docs/src/operators/bq_load.md index 96b2c85307..56702daea3 100644 --- a/digdag-docs/src/operators/bq_load.md +++ b/digdag-docs/src/operators/bq_load.md @@ -90,10 +90,17 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag. location: asia-northeast1 ``` -* **project**: NAME +* **gcp.project**: NAME The project that the table is located in or should be created in. Can also be specified directly in the table reference or the dataset parameter. + Examples: + + ``` + gcp: + project: my_project_id + ``` + * **source_format**: CSV | NEWLINE_DELIMITED_JSON | AVRO | DATASTORE_BACKUP The format of the files to be imported. *Default*: `CSV`. @@ -283,6 +290,98 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag. # schema: path/to/schema.yml ``` +* **clustering**: OBJECT + + Clustering specification for the destination table. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Clustering). + + Examples: + + ```yaml + clustering: + fields: + - field1 + ``` + +* **decimal_target_types**: STRING + + Defines the list of possible SQL data types to which the source decimal values are converted. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#decimaltargettype). + +* **encryption_configuration**: OBJECT + + Custom encryption configuration. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/EncryptionConfiguration). + + Examples: + + ```yaml + encryption_configuration: + kmsKeyName: key_name + ``` + +* **hive_partitioning_options**: OBJECT + + Options for configuring hive partitioning detect. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#hivepartitioningoptions). + + Examples: + + ```yaml + hive_partitioning_options: + mode: AUTO + sourceUriPrefix: gs://my_bucket/path + ``` + +* **json_extension**: STRING + + If sourceFormat is set to newline-delimited JSON, indicates whether it should be processed as a JSON variant such as GeoJSON. For a sourceFormat other than JSON, omit this field. If the sourceFormat is newline-delimited JSON: - for newline-delimited GeoJSON: set to GEOJSON. + +* **null_marker**: STRING + + Specifies a string that represents a null value in a CSV file. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). + +* **parquet_options**: OBJECT + + Parquet Options for load and make external tables. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions). + + Examples: + + ```yaml + parquet_options: + enableListInference: true + enumAsString: true + ``` + +* **use_avro_logical_types**: BOOLEAN + + If sourceFormat is set to "AVRO", indicates whether to interpret logical types as the corresponding BigQuery data type (for example, TIMESTAMP), instead of using the raw type (for example, INTEGER). For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). + +* **range_partitioning**: OBJECT + + Range partitioning specification for the destination table. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#RangePartitioning). + + Examples: + + ```yaml + range_partitioning: + field: id + range: + start: 0 + interval: 10 + end: 100 + ``` + +* **time_partitioning**: OBJECT + + Time-based partitioning specification for the destination table. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TimePartitioning). + + Examples: + + ```yaml + time_partitioning: + type: DAY + field: date + requirePartitionFilter: true + + ``` + ## Output parameters * **bq.last_job_id**