Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.util.SqlUtils;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.Scope;
Expand Down Expand Up @@ -482,7 +483,7 @@ private static class ExpressionToExpr extends ExpressionTranslator {
@Override
public Expr visitSlotReference(SlotReference slotReference, PlanTranslatorContext context) {
SlotRef slotRef = new SlotRef(slotReference.getDataType().toCatalogDataType(), slotReference.nullable());
slotRef.setLabel(slotReference.getName());
slotRef.setLabel(SqlUtils.getIdentSql(slotReference.getName()));
slotRef.setCol(slotReference.getName());
return slotRef;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,18 @@

package org.apache.doris.nereids.util;

import org.apache.doris.analysis.Expr;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.load.NereidsLoadUtils;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.utframe.UtFrameUtils;

import com.google.common.collect.ImmutableSet;
import org.junit.jupiter.api.Assertions;
Expand All @@ -36,4 +44,17 @@ void filterOrSelf() {
Plan filter = PlanUtils.filterOrSelf(ImmutableSet.of(BooleanLiteral.TRUE), scan);
Assertions.assertTrue(filter instanceof LogicalFilter);
}

@Test
void translateToLegacyExprShouldQuoteReservedColumnForRoutineLoadReparse() throws Exception {
ConnectContext ctx = UtFrameUtils.createDefaultCtx();
ctx.setStatementContext(new StatementContext(ctx, new OriginStatement("", 0)));
Expression expr = new NereidsParser().parseExpression("`group` is not null");

Expr legacyExpr = PlanUtils.translateToLegacyExpr(expr, null, ctx);
String exprSql = legacyExpr.toSqlWithoutTbl();

Assertions.assertTrue(exprSql.contains("`group`"));
Assertions.assertDoesNotThrow(() -> NereidsLoadUtils.parseExpressionSeq(exprSql));
}
}
7 changes: 6 additions & 1 deletion regression-test/conf/regression-conf.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ hudiEmrCatalog = ""
icebergS3TablesCatalog=""
icebergS3TablesCatalogGlueRest=""

// The path of the cert configuration file for the testing framework
// The path of the cert configuration file for the testing framework
// is consistent with the path of the cert file for the cluster
enableTLS=false
tlsVerifyMode="strict"
Expand Down Expand Up @@ -332,6 +332,11 @@ hudiHmsPort=19083
hudiMinioPort=19100
hudiMinioAccessKey="minio"
hudiMinioSecretKey="minio123"
otherConfigs = [
enableKafkaTest: "true",
externalEnvIp: "127.0.0.1",
kafka_port: "9092"
]

icebergDlfRestCatalog="'type' = 'iceberg', 'warehouse' = 'new_dlf_iceberg_catalog', 'iceberg.catalog.type' = 'rest', 'iceberg.rest.uri' = 'http://cn-beijing-vpc.dlf.aliyuncs.com/iceberg', 'iceberg.rest.sigv4-enabled' = 'true', 'iceberg.rest.signing-name' = 'DlfNext', 'iceberg.rest.access-key-id' = 'ak', 'iceberg.rest.secret-access-key' = 'sk', 'iceberg.rest.signing-region' = 'cn-beijing', 'iceberg.rest.vended-credentials-enabled' = 'true', 'io-impl' = 'org.apache.iceberg.rest.DlfFileIO', 'fs.oss.support' = 'true'"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
app_a|grp_a|msg_a
app_b|grp_b|msg_b
app_c|grp_c|msg_c
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord

suite("test_routine_load_preceding_filter_keyword", "p0") {
String enabled = context.config.otherConfigs.get("enableKafkaTest")
String kafkaPort = context.config.otherConfigs.get("kafka_port")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")

if (enabled != null && enabled.equalsIgnoreCase("true")) {
def topicName = "test_preceding_filter_keyword"
def tableName = "test_routine_load_preceding_filter_keyword_tbl"
def jobName = "test_preceding_filter_keyword_job"

def props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${externalEnvIp}:${kafkaPort}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")

def producer = new KafkaProducer<>(props)
try {
def txt = new File("""${context.file.parent}/data/${topicName}.csv""").text
def lines = txt.readLines()
lines.each { line ->
def record = new ProducerRecord<>(topicName, null, line)
producer.send(record)
}
} finally {
producer.close()
}

try {
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
app STRING,
`group` STRING,
msg STRING
)
DUPLICATE KEY(app)
DISTRIBUTED BY HASH(app) BUCKETS 1
PROPERTIES ("replication_num" = "1");
"""

sql """
CREATE ROUTINE LOAD ${jobName} ON ${tableName}
COLUMNS(app, `group`, msg),
COLUMNS TERMINATED BY "|",
PRECEDING FILTER app IS NOT NULL AND `group` IS NOT NULL
PROPERTIES
(
"max_batch_interval" = "5",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafkaPort}",
"kafka_topic" = "${topicName}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"

int retry = 0
while (true) {
sleep(1000)
def res = sql "show routine load for ${jobName}"
def state = res[0][8].toString()
def reason = res[0][17].toString()
if (state == "RUNNING") {
break
}
if (state == "PAUSED") {
assertTrue(false, "routine load should not be paused, reason: ${reason}")
}
retry++
if (retry > 60) {
assertTrue(false, "routine load should become RUNNING, current state: ${state}, reason: ${reason}")
}
}

retry = 0
while (true) {
sleep(1000)
def state = sql "show routine load for ${jobName}"
if (state[0][8].toString() == "PAUSED") {
assertTrue(false, "routine load should keep running, reason: ${state[0][17].toString()}")
}
def cnt = sql "select count(*) from ${tableName}"
if (cnt[0][0] > 0) {
break
}
retry++
if (retry > 60) {
assertTrue(false, "routine load did not ingest data in time")
}
}
} finally {
try {
sql "stop routine load for ${jobName}"
} catch (Exception e) {
logger.info("stop routine load failed: ${e.message}".toString())
}
sql """ DROP TABLE IF EXISTS ${tableName} """
}
}
}
Loading