Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,18 @@ public RexNode visitWindowFunction(WindowFunction node, CalcitePlanContext conte
(arguments.isEmpty() || arguments.size() == 1)
? Collections.emptyList()
: arguments.subList(1, arguments.size());
// No-arg ranking functions (row_number, rank, dense_rank) bypass
// aggregate signature validation since they have no field arguments.
if (field == null && args.isEmpty()) {
return PlanUtils.makeOver(
context,
functionName,
null,
args,
partitions,
List.of(),
node.getWindowFrame());
}
List<RexNode> nodes =
PPLFuncImpTable.INSTANCE.validateAggFunctionSignature(
functionName, field, args, context.rexBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,22 @@ static RexNode makeOver(
true,
lowerBound,
upperBound);
case RANK:
return withOver(
context.relBuilder.aggregateCall(SqlStdOperatorTable.RANK),
partitions,
orderKeys,
true,
lowerBound,
upperBound);
case DENSE_RANK:
return withOver(
context.relBuilder.aggregateCall(SqlStdOperatorTable.DENSE_RANK),
partitions,
orderKeys,
true,
lowerBound,
upperBound);
case NTH_VALUE:
return withOver(
context.relBuilder.aggregateCall(SqlStdOperatorTable.NTH_VALUE, field, argList.get(0)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,9 @@ public enum BuiltinFunctionName {
.put("dc", BuiltinFunctionName.DISTINCT_COUNT_APPROX)
.put("distinct_count", BuiltinFunctionName.DISTINCT_COUNT_APPROX)
.put("pattern", BuiltinFunctionName.INTERNAL_PATTERN)
.put("row_number", BuiltinFunctionName.ROW_NUMBER)
.put("rank", BuiltinFunctionName.RANK)
.put("dense_rank", BuiltinFunctionName.DENSE_RANK)
.build();

public static Optional<BuiltinFunctionName> of(String str) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.remote;

import static org.opensearch.sql.legacy.TestsConstants.*;
import static org.opensearch.sql.util.MatcherUtils.*;

import java.io.IOException;
import org.json.JSONObject;
import org.junit.jupiter.api.Test;
import org.opensearch.sql.ppl.PPLIntegTestCase;

public class CalciteEventstatsRankingIT extends PPLIntegTestCase {

@Override
public void init() throws Exception {
super.init();
enableCalcite();
loadIndex(Index.STATE_COUNTRY);
}

@Test
public void testEventstatsRowNumber() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eventstats row_number() by country", TEST_INDEX_STATE_COUNTRY));

verifySchemaInOrder(
actual,
schema("name", "string"),
schema("country", "string"),
schema("state", "string"),
schema("month", "int"),
schema("year", "int"),
schema("age", "int"),
schema("row_number()", "bigint"));
verifyNumOfRows(actual, 4);
}

@Test
public void testEventstatsRank() throws IOException {
JSONObject actual =
executeQuery(
String.format("source=%s | eventstats rank() by country", TEST_INDEX_STATE_COUNTRY));

verifySchemaInOrder(
actual,
schema("name", "string"),
schema("country", "string"),
schema("state", "string"),
schema("month", "int"),
schema("year", "int"),
schema("age", "int"),
schema("rank()", "bigint"));
verifyNumOfRows(actual, 4);
}

@Test
public void testEventstatsDenseRank() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eventstats dense_rank() by country", TEST_INDEX_STATE_COUNTRY));

verifySchemaInOrder(
actual,
schema("name", "string"),
schema("country", "string"),
schema("state", "string"),
schema("month", "int"),
schema("year", "int"),
schema("age", "int"),
schema("dense_rank()", "bigint"));
verifyNumOfRows(actual, 4);
}

@Test
public void testStreamstatsRowNumber() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | streamstats row_number() by country", TEST_INDEX_STATE_COUNTRY));

verifySchemaInOrder(
actual,
schema("name", "string"),
schema("country", "string"),
schema("state", "string"),
schema("month", "int"),
schema("year", "int"),
schema("age", "int"),
schema("row_number()", "bigint"));
verifyNumOfRows(actual, 4);
}

@Test
public void testStreamstatsRank() throws IOException {
JSONObject actual =
executeQuery(
String.format("source=%s | streamstats rank() by country", TEST_INDEX_STATE_COUNTRY));

verifySchemaInOrder(
actual,
schema("name", "string"),
schema("country", "string"),
schema("state", "string"),
schema("month", "int"),
schema("year", "int"),
schema("age", "int"),
schema("rank()", "bigint"));
verifyNumOfRows(actual, 4);
}

@Test
public void testStreamstatsDenseRank() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | streamstats dense_rank() by country", TEST_INDEX_STATE_COUNTRY));

verifySchemaInOrder(
actual,
schema("name", "string"),
schema("country", "string"),
schema("state", "string"),
schema("month", "int"),
schema("year", "int"),
schema("age", "int"),
schema("dense_rank()", "bigint"));
verifyNumOfRows(actual, 4);
}
}
41 changes: 41 additions & 0 deletions integ-test/src/test/java/org/opensearch/sql/ppl/DataTypeIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,47 @@ public void testNumericFieldFromString() throws Exception {
client().performRequest(deleteRequest);
}

@Test
public void testBooleanFieldFromNumberAcrossWildcardIndices() throws Exception {
// Reproduce issue #5269: querying across indices where same field has conflicting types
// (boolean vs text) and the text-typed index stores a numeric value like 0.
String indexBool = "repro_bool_test_bb";
String indexText = "repro_bool_test_aa";

try {
// Create index with boolean mapping
Request createBool = new Request("PUT", "/" + indexBool);
createBool.setJsonEntity(
"{\"mappings\":{\"properties\":{\"flag\":{\"type\":\"boolean\"},"
+ "\"startTime\":{\"type\":\"date_nanos\"}}}}");
client().performRequest(createBool);

// Create index with text mapping
Request createText = new Request("PUT", "/" + indexText);
createText.setJsonEntity(
"{\"mappings\":{\"properties\":{\"flag\":{\"type\":\"text\"},"
+ "\"startTime\":{\"type\":\"date_nanos\"}}}}");
client().performRequest(createText);

// Insert boolean value into boolean-typed index
Request insertBool = new Request("PUT", "/" + indexBool + "/_doc/1?refresh=true");
insertBool.setJsonEntity("{\"startTime\":\"2026-03-25T20:25:00.000Z\",\"flag\":false}");
client().performRequest(insertBool);

// Insert numeric value into text-typed index
Request insertText = new Request("PUT", "/" + indexText + "/_doc/1?refresh=true");
insertText.setJsonEntity("{\"startTime\":\"2026-03-24T20:25:00.000Z\",\"flag\":0}");
client().performRequest(insertText);

// Query across both indices with wildcard — should not throw an error
JSONObject result = executeQuery("source=repro_bool_test_* | fields flag");
assertEquals(2, result.getJSONArray("datarows").length());
} finally {
client().performRequest(new Request("DELETE", "/" + indexBool));
client().performRequest(new Request("DELETE", "/" + indexText));
}
}

@Test
public void testBooleanFieldFromString() throws Exception {
final int docId = 2;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
setup:
- do:
indices.create:
index: test_issue_5168
body:
settings:
number_of_shards: 1
number_of_replicas: 0
mappings:
properties:
int_field:
type: integer
str_field:
type: keyword

- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "test_issue_5168", "_id": "1"}}'
- '{"int_field": 42, "str_field": "alpha"}'
- '{"index": {"_index": "test_issue_5168", "_id": "2"}}'
- '{"int_field": -1, "str_field": "alpha"}'
- '{"index": {"_index": "test_issue_5168", "_id": "3"}}'
- '{"int_field": 0, "str_field": "beta"}'

- do:
query.settings:
body:
transient:
plugins.calcite.enabled: true

---
teardown:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled: false

- do:
indices.delete:
index: test_issue_5168
ignore_unavailable: true

---
"Issue 5168: eventstats row_number by partition should succeed":
- skip:
features:
- headers
- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=test_issue_5168 | eventstats row_number() by str_field

- match: { total: 3 }
- length: { datarows: 3 }

---
"Issue 5168: eventstats rank by partition should succeed":
- skip:
features:
- headers
- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=test_issue_5168 | eventstats rank() by str_field

- match: { total: 3 }
- length: { datarows: 3 }

---
"Issue 5168: eventstats dense_rank by partition should succeed":
- skip:
features:
- headers
- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=test_issue_5168 | eventstats dense_rank() by str_field

- match: { total: 3 }
- length: { datarows: 3 }

---
"Issue 5168: streamstats rank by partition should succeed":
- skip:
features:
- headers
- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=test_issue_5168 | streamstats rank() by str_field

- match: { total: 3 }
- length: { datarows: 3 }
Loading
Loading