From 243292a9ed6c1b8acfaeb420ee9658109807b486 Mon Sep 17 00:00:00 2001 From: York Cao <978176728@qq.com> Date: Wed, 27 May 2026 09:59:52 +0000 Subject: [PATCH] [fix](nereids): quote slot labels in routine-load legacy expr translation to avoid reserved-keyword parse failure --- .../apache/doris/nereids/util/PlanUtils.java | 3 +- .../doris/nereids/util/PlanUtilsTest.java | 21 +++ regression-test/conf/regression-conf.groovy | 7 +- .../data/test_preceding_filter_keyword.csv | 3 + ...utine_load_preceding_filter_keyword.groovy | 127 ++++++++++++++++++ 5 files changed, 159 insertions(+), 2 deletions(-) create mode 100644 regression-test/suites/load_p0/routine_load/data/test_preceding_filter_keyword.csv create mode 100644 regression-test/suites/load_p0/routine_load/test_routine_load_preceding_filter_keyword.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java index 0f8bb5e5ed1f7f..11773fd4c0385d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java @@ -21,6 +21,7 @@ import org.apache.doris.analysis.SlotRef; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.util.SqlUtils; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.analyzer.Scope; @@ -482,7 +483,7 @@ private static class ExpressionToExpr extends ExpressionTranslator { @Override public Expr visitSlotReference(SlotReference slotReference, PlanTranslatorContext context) { SlotRef slotRef = new SlotRef(slotReference.getDataType().toCatalogDataType(), slotReference.nullable()); - slotRef.setLabel(slotReference.getName()); + slotRef.setLabel(SqlUtils.getIdentSql(slotReference.getName())); slotRef.setCol(slotReference.getName()); return slotRef; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanUtilsTest.java index a13825a8854bdd..d5b8557e94872c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanUtilsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanUtilsTest.java @@ -17,10 +17,18 @@ package org.apache.doris.nereids.util; +import org.apache.doris.analysis.Expr; +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.load.NereidsLoadUtils; +import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.OriginStatement; +import org.apache.doris.utframe.UtFrameUtils; import com.google.common.collect.ImmutableSet; import org.junit.jupiter.api.Assertions; @@ -36,4 +44,17 @@ void filterOrSelf() { Plan filter = PlanUtils.filterOrSelf(ImmutableSet.of(BooleanLiteral.TRUE), scan); Assertions.assertTrue(filter instanceof LogicalFilter); } + + @Test + void translateToLegacyExprShouldQuoteReservedColumnForRoutineLoadReparse() throws Exception { + ConnectContext ctx = UtFrameUtils.createDefaultCtx(); + ctx.setStatementContext(new StatementContext(ctx, new OriginStatement("", 0))); + Expression expr = new NereidsParser().parseExpression("`group` is not null"); + + Expr legacyExpr = PlanUtils.translateToLegacyExpr(expr, null, ctx); + String exprSql = legacyExpr.toSqlWithoutTbl(); + + Assertions.assertTrue(exprSql.contains("`group`")); + Assertions.assertDoesNotThrow(() -> NereidsLoadUtils.parseExpressionSeq(exprSql)); + } } diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy index 824b8d553d13a0..5d01952288917a 100644 --- a/regression-test/conf/regression-conf.groovy +++ b/regression-test/conf/regression-conf.groovy @@ -302,7 +302,7 @@ hudiEmrCatalog = "" icebergS3TablesCatalog="" icebergS3TablesCatalogGlueRest="" -// The path of the cert configuration file for the testing framework +// The path of the cert configuration file for the testing framework // is consistent with the path of the cert file for the cluster enableTLS=false tlsVerifyMode="strict" @@ -332,6 +332,11 @@ hudiHmsPort=19083 hudiMinioPort=19100 hudiMinioAccessKey="minio" hudiMinioSecretKey="minio123" +otherConfigs = [ + enableKafkaTest: "true", + externalEnvIp: "127.0.0.1", + kafka_port: "9092" +] icebergDlfRestCatalog="'type' = 'iceberg', 'warehouse' = 'new_dlf_iceberg_catalog', 'iceberg.catalog.type' = 'rest', 'iceberg.rest.uri' = 'http://cn-beijing-vpc.dlf.aliyuncs.com/iceberg', 'iceberg.rest.sigv4-enabled' = 'true', 'iceberg.rest.signing-name' = 'DlfNext', 'iceberg.rest.access-key-id' = 'ak', 'iceberg.rest.secret-access-key' = 'sk', 'iceberg.rest.signing-region' = 'cn-beijing', 'iceberg.rest.vended-credentials-enabled' = 'true', 'io-impl' = 'org.apache.iceberg.rest.DlfFileIO', 'fs.oss.support' = 'true'" diff --git a/regression-test/suites/load_p0/routine_load/data/test_preceding_filter_keyword.csv b/regression-test/suites/load_p0/routine_load/data/test_preceding_filter_keyword.csv new file mode 100644 index 00000000000000..8cd94a98c437e1 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/data/test_preceding_filter_keyword.csv @@ -0,0 +1,3 @@ +app_a|grp_a|msg_a +app_b|grp_b|msg_b +app_c|grp_c|msg_c diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_preceding_filter_keyword.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_preceding_filter_keyword.groovy new file mode 100644 index 00000000000000..9de606b0a25411 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_preceding_filter_keyword.groovy @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.clients.producer.ProducerRecord + +suite("test_routine_load_preceding_filter_keyword", "p0") { + String enabled = context.config.otherConfigs.get("enableKafkaTest") + String kafkaPort = context.config.otherConfigs.get("kafka_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + if (enabled != null && enabled.equalsIgnoreCase("true")) { + def topicName = "test_preceding_filter_keyword" + def tableName = "test_routine_load_preceding_filter_keyword_tbl" + def jobName = "test_preceding_filter_keyword_job" + + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${externalEnvIp}:${kafkaPort}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000") + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + + def producer = new KafkaProducer<>(props) + try { + def txt = new File("""${context.file.parent}/data/${topicName}.csv""").text + def lines = txt.readLines() + lines.each { line -> + def record = new ProducerRecord<>(topicName, null, line) + producer.send(record) + } + } finally { + producer.close() + } + + try { + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + app STRING, + `group` STRING, + msg STRING + ) + DUPLICATE KEY(app) + DISTRIBUTED BY HASH(app) BUCKETS 1 + PROPERTIES ("replication_num" = "1"); + """ + + sql """ + CREATE ROUTINE LOAD ${jobName} ON ${tableName} + COLUMNS(app, `group`, msg), + COLUMNS TERMINATED BY "|", + PRECEDING FILTER app IS NOT NULL AND `group` IS NOT NULL + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafkaPort}", + "kafka_topic" = "${topicName}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + + int retry = 0 + while (true) { + sleep(1000) + def res = sql "show routine load for ${jobName}" + def state = res[0][8].toString() + def reason = res[0][17].toString() + if (state == "RUNNING") { + break + } + if (state == "PAUSED") { + assertTrue(false, "routine load should not be paused, reason: ${reason}") + } + retry++ + if (retry > 60) { + assertTrue(false, "routine load should become RUNNING, current state: ${state}, reason: ${reason}") + } + } + + retry = 0 + while (true) { + sleep(1000) + def state = sql "show routine load for ${jobName}" + if (state[0][8].toString() == "PAUSED") { + assertTrue(false, "routine load should keep running, reason: ${state[0][17].toString()}") + } + def cnt = sql "select count(*) from ${tableName}" + if (cnt[0][0] > 0) { + break + } + retry++ + if (retry > 60) { + assertTrue(false, "routine load did not ingest data in time") + } + } + } finally { + try { + sql "stop routine load for ${jobName}" + } catch (Exception e) { + logger.info("stop routine load failed: ${e.message}".toString()) + } + sql """ DROP TABLE IF EXISTS ${tableName} """ + } + } +}