Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@
import java.util.function.BiFunction;
import lombok.Getter;
import lombok.Setter;
import org.apache.calcite.config.NullCollation;
import org.apache.calcite.rex.RexCorrelVariable;
import org.apache.calcite.rex.RexLambdaRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.tools.FrameworkConfig;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.calcite.utils.CalciteToolsHelper;
import org.opensearch.sql.calcite.utils.CalciteToolsHelper.OpenSearchRelBuilder;
import org.opensearch.sql.calcite.validate.OpenSearchSparkSqlDialect;
import org.opensearch.sql.calcite.validate.PplTypeCoercion;
import org.opensearch.sql.calcite.validate.PplTypeCoercionRule;
import org.opensearch.sql.calcite.validate.PplValidator;
import org.opensearch.sql.calcite.validate.SqlOperatorTableProvider;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.executor.QueryType;
import org.opensearch.sql.expression.function.FunctionProperties;
Expand Down Expand Up @@ -72,6 +79,14 @@ public class CalcitePlanContext {
/** Whether we're currently inside a lambda context. */
@Getter @Setter private boolean inLambdaContext = false;

/**
* -- SETTER -- Sets the SQL operator table provider. This must be called during initialization by
* the opensearch module.
*
* @param provider the provider to use for obtaining operator tables
*/
@Setter private static SqlOperatorTableProvider operatorTableProvider;

private CalcitePlanContext(FrameworkConfig config, SysLimit sysLimit, QueryType queryType) {
this.config = config;
this.sysLimit = sysLimit;
Expand Down Expand Up @@ -101,6 +116,34 @@ private CalcitePlanContext(CalcitePlanContext parent) {
this.inLambdaContext = true; // Mark that we're inside a lambda
}

/**
* Creates a new SqlValidator instance. SqlValidator is stateful and should not be reused across
* validations, so a new instance is created for each call.
*
* @return new SqlValidator instance
*/
public SqlValidator getValidator() {
if (operatorTableProvider == null) {
throw new IllegalStateException(
"SqlOperatorTableProvider must be set before creating CalcitePlanContext");
}
SqlValidator.Config validatorConfig =
SqlValidator.Config.DEFAULT
.withTypeCoercionRules(PplTypeCoercionRule.instance())
.withTypeCoercionFactory(PplTypeCoercion::create)
// Use lenient conformance for PPL compatibility
.withConformance(OpenSearchSparkSqlDialect.DEFAULT.getConformance())
// Use Spark SQL's NULL collation (NULLs sorted LOW/FIRST)
.withDefaultNullCollation(NullCollation.LOW)
// This ensures that coerced arguments are replaced with cast version in sql
// select list because coercion is performed during select list expansion during
// sql validation. Affects 4356.yml
// See SqlValidatorImpl#validateSelectList and AggConverter#translateAgg
.withIdentifierExpansion(true);
return PplValidator.create(
config, operatorTableProvider.getOperatorTable(), TYPE_FACTORY, validatorConfig);
}

public RexNode resolveJoinCondition(
UnresolvedExpression expr,
BiFunction<UnresolvedExpression, CalcitePlanContext, RexNode> transformFunction) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;
import lombok.AllArgsConstructor;
import lombok.NonNull;
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.ViewExpanders;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.JoinRelType;
Expand All @@ -56,10 +59,14 @@
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexCorrelVariable;
import org.apache.calcite.rex.RexFieldCollation;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexOver;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.rex.RexWindow;
import org.apache.calcite.rex.RexWindowBounds;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.fun.SqlLibraryOperators;
Expand Down Expand Up @@ -765,8 +772,8 @@ public RelNode visitTranspose(
.map(
f ->
Map.entry(
ImmutableList.of(rx.makeLiteral(f)),
ImmutableList.of((RexNode) rx.makeCast(varchar, b.field(f), true))))
ImmutableList.of((RexLiteral) rx.makeLiteral(f, varchar, true)),
ImmutableList.of(rx.makeCast(varchar, b.field(f), true, true))))
.collect(Collectors.toList()));

// Step 3: Trim spaces from columnName column before pivot
Expand Down Expand Up @@ -1795,6 +1802,7 @@ public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context)
// Default: first get rawExpr
List<RexNode> overExpressions =
node.getWindowFunctionList().stream().map(w -> rexVisitor.analyze(w, context)).toList();
overExpressions = embedExistingCollationsIntoOver(overExpressions, context);

if (hasGroup) {
// only build sequence when there is by condition
Expand Down Expand Up @@ -1836,6 +1844,84 @@ public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context)
return context.relBuilder.peek();
}

/**
* Embed existing collation into window function's over clauses.
*
* <p>Window functions with frame specifications like {@code ROWS n PRECEDING} require ORDER BY to
* determine row order. Without it, results are non-deterministic.
*
* <p>Without this fix, the initial plan has ORDER BY separate from window functions:
*
* <pre>
* LogicalProject(SUM($5) OVER (ROWS 1 PRECEDING)) ← Missing ORDER BY
* LogicalSort(sort0=[$5])
* </pre>
*
* <p>This causes problems during validation as the order is not bound to the window. With this
* fix, sort collations are embeded into each {@code RexOver} window:
*
* <pre>
* LogicalProject(SUM($5) OVER (ORDER BY $5 ROWS 1 PRECEDING)) ← ORDER BY embedded
* </pre>
*
* @param overExpressions Window function expressions (may contain nested {@link RexOver})
* @param context Plan context for building RexNodes
* @return Expressions with ORDER BY embedded in all window specifications
*/
private List<RexNode> embedExistingCollationsIntoOver(
List<RexNode> overExpressions, CalcitePlanContext context) {
RelCollation existingCollation = context.relBuilder.peek().getTraitSet().getCollation();
List<@NonNull RelFieldCollation> relCollations =
existingCollation == null ? List.of() : existingCollation.getFieldCollations();
ImmutableList<@NonNull RexFieldCollation> rexCollations =
relCollations.stream()
.map(f -> relCollationToRexCollation(f, context.relBuilder))
.collect(ImmutableList.toImmutableList());
return overExpressions.stream()
.map(
n ->
n.accept(
new RexShuttle() {
@Override
public RexNode visitOver(RexOver over) {
RexWindow window = over.getWindow();
return context.rexBuilder.makeOver(
over.getType(),
over.getAggOperator(),
over.getOperands(),
window.partitionKeys,
rexCollations,
window.getLowerBound(),
window.getUpperBound(),
window.isRows(),
true,
false,
over.isDistinct(),
over.ignoreNulls());
}
}))
.collect(Collectors.toList());
}

private static RexFieldCollation relCollationToRexCollation(
RelFieldCollation relCollation, RelBuilder builder) {
RexNode fieldRef = builder.field(relCollation.getFieldIndex());

// Convert direction flags to SqlKind set
Set<SqlKind> flags = new HashSet<>();
if (relCollation.direction == RelFieldCollation.Direction.DESCENDING
|| relCollation.direction == RelFieldCollation.Direction.STRICTLY_DESCENDING) {
flags.add(SqlKind.DESCENDING);
}
if (relCollation.nullDirection == RelFieldCollation.NullDirection.FIRST) {
flags.add(SqlKind.NULLS_FIRST);
} else if (relCollation.nullDirection == RelFieldCollation.NullDirection.LAST) {
flags.add(SqlKind.NULLS_LAST);
}

return new RexFieldCollation(fieldRef, flags);
}

private List<RexNode> wrapWindowFunctionsWithGroupNotNull(
List<RexNode> overExpressions, RexNode groupNotNull, CalcitePlanContext context) {
List<RexNode> wrappedOverExprs = new ArrayList<>(overExpressions.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlCallBinding;
import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlTypeUtil;
import org.apache.calcite.sql.validate.implicit.TypeCoercionImpl;
import org.opensearch.sql.ast.expression.SpanUnit;
import org.opensearch.sql.calcite.type.AbstractExprRelDataType;
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
import org.opensearch.sql.calcite.utils.OpenSearchTypeUtil;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.exception.ExpressionEvaluationException;
import org.opensearch.sql.exception.SemanticCheckException;
Expand Down Expand Up @@ -146,7 +151,7 @@ public RexNode makeCast(
// SqlStdOperatorTable.NOT_EQUALS,
// ImmutableList.of(exp, makeZeroLiteral(sourceType)));
}
} else if (OpenSearchTypeFactory.isUserDefinedType(type)) {
} else if (OpenSearchTypeUtil.isUserDefinedType(type)) {
if (RexLiteral.isNullLiteral(exp)) {
return super.makeCast(pos, type, exp, matchNullability, safe, format);
}
Expand Down Expand Up @@ -185,4 +190,33 @@ else if ((SqlTypeUtil.isApproximateNumeric(sourceType) || SqlTypeUtil.isDecimal(
}
return super.makeCast(pos, type, exp, matchNullability, safe, format);
}

/**
* Derives the return type of call to an operator.
*
* <p>In Calcite, coercion between STRING and NUMERIC operands takes place during converting SQL
* to RelNode. However, as we are building logical plans directly, the coercion is not yet
* implemented at this point. Hence, we duplicate {@link
* TypeCoercionImpl#binaryArithmeticWithStrings} here to infer the correct type, enabling
* operations like {@code "5" / 10}. The actual coercion will be inserted later when performing
* validation on SqlNode.
*
* @see TypeCoercionImpl#binaryArithmeticCoercion(SqlCallBinding)
* @param op the operator being called
* @param exprs actual operands
* @return derived type
*/
@Override
public RelDataType deriveReturnType(SqlOperator op, List<? extends RexNode> exprs) {
if (op.getKind().belongsTo(SqlKind.BINARY_ARITHMETIC) && exprs.size() == 2) {
final RelDataType type1 = exprs.get(0).getType();
final RelDataType type2 = exprs.get(1).getType();
if (SqlTypeUtil.isNumeric(type1) && OpenSearchTypeUtil.isCharacter(type2)) {
return type1;
} else if (OpenSearchTypeUtil.isCharacter(type1) && SqlTypeUtil.isNumeric(type2)) {
return type2;
}
}
return super.deriveReturnType(op, exprs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,11 @@
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptTable.ViewExpander;
import org.apache.calcite.plan.hep.HepPlanner;
import org.apache.calcite.plan.hep.HepProgram;
import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.prepare.CalcitePrepareImpl;
import org.apache.calcite.prepare.Prepare.CatalogReader;
import org.apache.calcite.rel.RelHomogeneousShuttle;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
Expand All @@ -88,7 +86,6 @@
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql2rel.RelFieldTrimmer;
import org.apache.calcite.sql2rel.SqlRexConvertletTable;
import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.calcite.tools.FrameworkConfig;
Expand All @@ -98,12 +95,12 @@
import org.apache.calcite.tools.RelRunner;
import org.apache.calcite.util.Holder;
import org.apache.calcite.util.Util;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.calcite.plan.Scannable;
import org.opensearch.sql.calcite.plan.rule.OpenSearchRules;
import org.opensearch.sql.calcite.plan.rule.PPLSimplifyDedupRule;
import org.opensearch.sql.calcite.profile.PlanProfileBuilder;
import org.opensearch.sql.calcite.validate.converters.OpenSearchSqlToRelConverter;
import org.opensearch.sql.expression.function.PPLBuiltinOperators;
import org.opensearch.sql.monitor.profile.ProfileContext;
import org.opensearch.sql.monitor.profile.ProfileMetric;
Expand Down Expand Up @@ -259,7 +256,7 @@ private void registerCustomizedRules(RelOptPlanner planner) {
* return {@link OpenSearchCalcitePreparingStmt}
*/
@Override
protected CalcitePrepareImpl.CalcitePreparingStmt getPreparingStmt(
public CalcitePrepareImpl.CalcitePreparingStmt getPreparingStmt(
CalcitePrepare.Context context,
Type elementType,
CalciteCatalogReader catalogReader,
Expand Down Expand Up @@ -369,34 +366,6 @@ protected SqlToRelConverter getSqlToRelConverter(
}
}

public static class OpenSearchSqlToRelConverter extends SqlToRelConverter {
protected final RelBuilder relBuilder;

public OpenSearchSqlToRelConverter(
ViewExpander viewExpander,
@Nullable SqlValidator validator,
CatalogReader catalogReader,
RelOptCluster cluster,
SqlRexConvertletTable convertletTable,
Config config) {
super(viewExpander, validator, catalogReader, cluster, convertletTable, config);
this.relBuilder =
config
.getRelBuilderFactory()
.create(
cluster,
validator != null
? validator.getCatalogReader().unwrap(RelOptSchema.class)
: null)
.transform(config.getRelBuilderConfigTransform());
}

@Override
protected RelFieldTrimmer newFieldTrimmer() {
return new OpenSearchRelFieldTrimmer(validator, this.relBuilder);
}
}

public static class OpenSearchRelRunners {
/**
* Runs a relational expression by existing connection. This class copied from {@link
Expand Down Expand Up @@ -438,7 +407,8 @@ public RelNode visit(TableScan scan) {
"The 'bins' parameter on timestamp fields requires: (1) pushdown to be enabled"
+ " (controlled by plugins.calcite.pushdown.enabled, enabled by default), and"
+ " (2) the timestamp field to be used as an aggregation bucket (e.g., 'stats"
+ " count() by @timestamp').");
+ " count() by @timestamp').",
e);
}
throw Util.throwAsRuntime(e);
}
Expand Down
Loading
Loading