diff --git a/build-tools-internal/src/main/resources/changelog-schema.json b/build-tools-internal/src/main/resources/changelog-schema.json index 8bdfb8d6c0b9c..a38eb32062146 100644 --- a/build-tools-internal/src/main/resources/changelog-schema.json +++ b/build-tools-internal/src/main/resources/changelog-schema.json @@ -68,6 +68,7 @@ "Java High Level REST Client", "Java Low Level REST Client", "License", + "Logs", "Machine Learning", "Mapping", "Monitoring", diff --git a/docs/changelog/108896.yaml b/docs/changelog/108896.yaml new file mode 100644 index 0000000000000..c52f074b65605 --- /dev/null +++ b/docs/changelog/108896.yaml @@ -0,0 +1,6 @@ +pr: 108896 +summary: Introduce `logs` index mode as Tech Preview +area: Logs +type: feature +issues: + - 108896 diff --git a/docs/changelog/109174.yaml b/docs/changelog/109174.yaml new file mode 100644 index 0000000000000..5cd57ebd34ac6 --- /dev/null +++ b/docs/changelog/109174.yaml @@ -0,0 +1,5 @@ +pr: 109174 +summary: "ESQL: Change \"substring\" function to not return null on empty string" +area: ES|QL +type: bug +issues: [] diff --git a/docs/changelog/109205.yaml b/docs/changelog/109205.yaml new file mode 100644 index 0000000000000..10f13a6549fbc --- /dev/null +++ b/docs/changelog/109205.yaml @@ -0,0 +1,6 @@ +pr: 109205 +summary: "ESQL: Fix `IpPrefix` function not handling correctly `ByteRefs`" +area: ES|QL +type: bug +issues: + - 109198 diff --git a/docs/reference/index-modules.asciidoc b/docs/reference/index-modules.asciidoc index 1b6914e946c82..e826956440497 100644 --- a/docs/reference/index-modules.asciidoc +++ b/docs/reference/index-modules.asciidoc @@ -82,6 +82,37 @@ breaking change]. after segments are merged. Segment merging can be forced using <>. +[[index-mode-setting]] `index.mode`:: ++ +The `index.mode` setting is used to control settings applied in specific domains like ingestion of time series data or logs. +Different mutually exclusive modes exist, which are used to apply settings or default values controlling indexing of documents, +sorting and other parameters whose value affects indexing or query performance. ++ +[source,console] +---------------- +PUT my-index-000001 +{ + "settings": { + "index":{ + "mode":"standard" <1> + } + } +} +---------------- ++ +<1> This index uses the `standard` index mode ++ +Index mode supports the following values: + +`null`::: Default value (same as `standard`). + +`standard`::: Standard indexing with default settings. + +`time_series`::: Index mode optimized for storage of metrics documented in <>. + +`logs`::: Index mode optimized for storage of logs. It applies default sort settings on the `hostname` and `timestamp` fields and uses <>. <> on different fields is still allowed. +preview:[] + [[routing-partition-size]] `index.routing_partition_size`:: The number of shards a custom <> value can go to. diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java index c2727f206a07c..6eaddf51c02b4 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java @@ -16,7 +16,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.VersionInformation; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -63,7 +62,7 @@ protected Transport build(Settings settings, TransportVersion version, ClusterSe settings, version, threadPool, - new NetworkService(Collections.emptyList()), + networkService, PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService(), diff --git a/muted-tests.yml b/muted-tests.yml index 262b1509ca58e..b64f490524c86 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -49,9 +49,6 @@ tests: - class: "org.elasticsearch.upgrades.SearchStatesIT" issue: "https://github.com/elastic/elasticsearch/issues/109190" method: "testBWCSearchStates" -- class: "org.elasticsearch.xpack.esql.CsvTests" - issue: "https://github.com/elastic/elasticsearch/issues/109198" - method: "test {ip.IpPrefixLengthFromColumn}" - class: "org.elasticsearch.xpack.test.rest.XPackRestIT" issue: "https://github.com/elastic/elasticsearch/issues/109200" method: "test {p0=esql/70_locale/Date format with Italian locale}" diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/logsdb/10_settings.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/logsdb/10_settings.yml new file mode 100644 index 0000000000000..95075da20fe5e --- /dev/null +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/logsdb/10_settings.yml @@ -0,0 +1,598 @@ +--- +setup: + - requires: + test_runner_features: [capabilities] + capabilities: + - method: PUT + path: /{index} + capabilities: [logs_index_mode] + reason: "Support for 'logs' index mode capability required" + +--- +create logs index: + - requires: + test_runner_features: [ capabilities ] + capabilities: + - method: PUT + path: /{index} + capabilities: [ logs_index_mode ] + reason: "Support for 'logs' index mode capability required" + + - do: + indices.create: + index: test + body: + settings: + index: + mode: logs + number_of_replicas: 0 + number_of_shards: 2 + mappings: + properties: + "@timestamp": + type: date + hostname: + type: keyword + agent_id: + type: keyword + process_id: + type: integer + http_method: + type: keyword + message: + type: text + + - do: + bulk: + index: test + refresh: true + body: + - { "index": { } } + - { "@timestamp": "2024-02-12T10:30:00Z", "hostname": "foo", "agent_id": "darth-vader", "process_id": 101, "http_method": "GET", "message": "No, I am your father." } + - { "index": { } } + - { "@timestamp": "2024-02-12T10:31:00Z", "hostname": "bar", "agent_id": "yoda", "process_id": 102, "http_method": "PUT", "message": "Do. Or do not. There is no try." } + - { "index": { } } + - { "@timestamp": "2024-02-12T10:32:00Z", "hostname": "foo", "agent_id": "obi-wan", "process_id": 103, "http_method": "GET", "message": "May the force be with you." } + - { "index": { } } + - { "@timestamp": "2024-02-12T10:33:00Z", "hostname": "baz", "agent_id": "darth-vader", "process_id": 102, "http_method": "POST", "message": "I find your lack of faith disturbing." } + - { "index": { } } + - { "@timestamp": "2024-02-12T10:34:00Z", "hostname": "baz", "agent_id": "yoda", "process_id": 104, "http_method": "POST", "message": "Wars not make one great." } + - { "index": { } } + - { "@timestamp": "2024-02-12T10:35:00Z", "hostname": "foo", "agent_id": "obi-wan", "process_id": 105, "http_method": "GET", "message": "That's no moon. It's a space station." } + + + - do: + search: + index: test + body: + query: + match_all: {} + + - match: { hits.total.value: 6 } + + - do: + indices.get_settings: + index: test + + - is_true: test + - match: { test.settings.index.mode: "logs" } + + +--- +using default timestamp field mapping: + - requires: + test_runner_features: [ capabilities ] + capabilities: + - method: PUT + path: /{index} + capabilities: [ logs_index_mode ] + reason: "Support for 'logs' index mode capability required" + + - do: + indices.create: + index: test-timestamp-missing + body: + settings: + index: + mode: logs + number_of_replicas: 0 + number_of_shards: 2 + mappings: + properties: + hostname: + type: keyword + agent_id: + type: keyword + process_id: + type: integer + http_method: + type: keyword + message: + type: text + +--- +missing hostname field: + - requires: + test_runner_features: [ capabilities ] + capabilities: + - method: PUT + path: /{index} + capabilities: [ logs_index_mode ] + reason: "Support for 'logs' index mode capability required" + + - do: + catch: bad_request + indices.create: + index: test-hostname-missing + body: + settings: + index: + mode: logs + number_of_replicas: 0 + number_of_shards: 2 + mappings: + properties: + "@timestamp": + type: date + agent_id: + type: keyword + process_id: + type: integer + http_method: + type: keyword + message: + type: text + + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.type: "illegal_argument_exception" } + - match: { error.reason: "unknown index sort field:[hostname]" } + +--- +missing sort field: + - requires: + test_runner_features: [ capabilities ] + capabilities: + - method: PUT + path: /{index} + capabilities: [ logs_index_mode ] + reason: "Support for 'logs' index mode capability required" + + - do: + catch: bad_request + indices.create: + index: test-hostname-missing + body: + settings: + index: + mode: logs + number_of_replicas: 0 + number_of_shards: 2 + sort: + field: [ "host_name" ] + mappings: + properties: + "@timestamp": + type: date + hostname: + type: keyword + agent_id: + type: keyword + process_id: + type: integer + http_method: + type: keyword + message: + type: text + + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.type: "illegal_argument_exception" } + - match: { error.reason: "unknown index sort field:[host_name]" } + +--- +non-default sort settings: + - requires: + test_runner_features: [ capabilities ] + capabilities: + - method: PUT + path: /{index} + capabilities: [ logs_index_mode ] + reason: "Support for 'logs' index mode capability required" + + - do: + indices.create: + index: test-sort + body: + settings: + + index: + mode: logs + number_of_shards: 2 + number_of_replicas: 0 + sort: + field: [ "agent_id", "@timestamp" ] + order: [ "asc", "desc" ] + missing: [ "_last", "_first" ] + mode: [ "max", "max" ] + mappings: + properties: + "@timestamp": + type: date + agent_id: + type: keyword + process_id: + type: integer + http_method: + type: keyword + message: + type: text + + - do: + indices.get_settings: + index: test-sort + + - is_true: test-sort + - match: { test-sort.settings.index.mode: "logs" } + - match: { test-sort.settings.index.sort.field.0: "agent_id" } + - match: { test-sort.settings.index.sort.field.1: "@timestamp" } + - match: { test-sort.settings.index.sort.order.0: "asc" } + - match: { test-sort.settings.index.sort.order.1: "desc" } + - match: { test-sort.settings.index.sort.missing.0: "_last" } + - match: { test-sort.settings.index.sort.missing.1: "_first" } + - match: { test-sort.settings.index.sort.mode.0: "max" } + - match: { test-sort.settings.index.sort.mode.1: "max" } + +--- +override sort order settings: + - requires: + test_runner_features: [ capabilities ] + capabilities: + - method: PUT + path: /{index} + capabilities: [ logs_index_mode ] + reason: "Support for 'logs' index mode capability required" + + - do: + indices.create: + index: test-sort-order + body: + settings: + + index: + mode: logs + number_of_shards: 2 + number_of_replicas: 0 + sort: + order: [ "asc", "asc" ] + mappings: + properties: + "@timestamp": + type: date + hostname: + type: keyword + agent_id: + type: keyword + process_id: + type: integer + http_method: + type: keyword + message: + type: text + + - do: + indices.get_settings: + index: test-sort-order + + - is_true: test-sort-order + - match: { test-sort-order.settings.index.mode: "logs" } + - match: { test-sort-order.settings.index.sort.field.0: null } + - match: { test-sort-order.settings.index.sort.field.1: null } + - match: { test-sort-order.settings.index.sort.order.0: "asc" } + - match: { test-sort-order.settings.index.sort.order.1: "asc" } + +--- +override sort missing settings: + - requires: + test_runner_features: [ capabilities ] + capabilities: + - method: PUT + path: /{index} + capabilities: [ logs_index_mode ] + reason: "Support for 'logs' index mode capability required" + + - do: + indices.create: + index: test-sort-missing + body: + settings: + + index: + mode: logs + number_of_shards: 2 + number_of_replicas: 0 + sort: + missing: [ "_last", "_first" ] + mappings: + properties: + "@timestamp": + type: date + hostname: + type: keyword + agent_id: + type: keyword + process_id: + type: integer + http_method: + type: keyword + message: + type: text + + - do: + indices.get_settings: + index: test-sort-missing + + - is_true: test-sort-missing + - match: { test-sort-missing.settings.index.mode: "logs" } + - match: { test-sort-missing.settings.index.sort.field.0: null } + - match: { test-sort-missing.settings.index.sort.field.1: null } + - match: { test-sort-missing.settings.index.sort.missing.0: "_last" } + - match: { test-sort-missing.settings.index.sort.missing.1: "_first" } + +--- +override sort mode settings: + - requires: + test_runner_features: [ capabilities ] + capabilities: + - method: PUT + path: /{index} + capabilities: [ logs_index_mode ] + reason: "Support for 'logs' index mode capability required" + + - do: + indices.create: + index: test-sort-mode + body: + settings: + + index: + mode: logs + number_of_shards: 2 + number_of_replicas: 0 + sort: + mode: [ "max", "max" ] + mappings: + properties: + "@timestamp": + type: date + hostname: + type: keyword + agent_id: + type: keyword + process_id: + type: integer + http_method: + type: keyword + message: + type: text + + - do: + indices.get_settings: + index: test-sort-mode + + - is_true: test-sort-mode + - match: { test-sort-mode.settings.index.mode: "logs" } + - match: { test-sort-mode.settings.index.sort.field.0: null } + - match: { test-sort-mode.settings.index.sort.field.1: null } + - match: { test-sort-mode.settings.index.sort.mode.0: "max" } + - match: { test-sort-mode.settings.index.sort.mode.1: "max" } + +--- +override sort field using nested field type in sorting: + - requires: + test_runner_features: [ capabilities ] + capabilities: + - method: PUT + path: /{index} + capabilities: [ logs_index_mode ] + reason: "Support for 'logs' index mode capability required" + + - do: + catch: bad_request + indices.create: + index: test-nested-sort + body: + settings: + index: + mode: logs + number_of_replicas: 0 + number_of_shards: 2 + sort: + field: [ "hostname", "nested", "@timestamp" ] + mappings: + properties: + "@timestamp": + type: date + hostname: + type: keyword + agent_id: + type: keyword + process_id: + type: integer + http_method: + type: keyword + message: + type: text + nested: + type: nested + properties: + keywords: + type: keyword + + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.type: "illegal_argument_exception" } + - match: { error.reason: "cannot have nested fields when index sort is activated" } + +--- +override sort field using nested field type: + - requires: + test_runner_features: [ capabilities ] + capabilities: + - method: PUT + path: /{index} + capabilities: [ logs_index_mode ] + reason: "Support for 'logs' index mode capability required" + + - do: + catch: bad_request + indices.create: + index: test-nested + body: + settings: + index: + mode: logs + number_of_replicas: 0 + number_of_shards: 2 + mappings: + properties: + "@timestamp": + type: date + hostname: + type: keyword + agent_id: + type: keyword + process_id: + type: integer + http_method: + type: keyword + message: + type: text + nested: + type: nested + properties: + keywords: + type: keyword + + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.type: "illegal_argument_exception" } + - match: { error.reason: "cannot have nested fields when index sort is activated" } + +--- +routing path not allowed in logs mode: + - requires: + test_runner_features: [ capabilities ] + capabilities: + - method: PUT + path: /{index} + capabilities: [ logs_index_mode ] + reason: "Support for 'logs' index mode capability required" + + - do: + catch: bad_request + indices.create: + index: test + body: + settings: + index: + mode: logs + number_of_replicas: 0 + number_of_shards: 2 + routing_path: [ "hostname", "agent_id" ] + mappings: + properties: + "@timestamp": + type: date + hostname: + type: keyword + agent_id: + type: keyword + process_id: + type: integer + http_method: + type: keyword + message: + type: text + + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.type: "illegal_argument_exception" } + - match: { error.reason: "[index.routing_path] requires [index.mode=time_series]" } + +--- +start time not allowed in logs mode: + - requires: + test_runner_features: [ capabilities ] + capabilities: + - method: PUT + path: /{index} + capabilities: [ logs_index_mode ] + reason: "Support for 'logs' index mode capability required" + + - do: + catch: bad_request + indices.create: + index: test + body: + settings: + index: + mode: logs + number_of_replicas: 0 + number_of_shards: 2 + time_series: + start_time: 2023-01-01T00:00:00Z + mappings: + properties: + "@timestamp": + type: date + hostname: + type: keyword + agent_id: + type: keyword + process_id: + type: integer + http_method: + type: keyword + message: + type: text + + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.type: "illegal_argument_exception" } + - match: { error.reason: "[index.time_series.start_time] requires [index.mode=time_series]" } + +--- +end time not allowed in logs mode: + - requires: + test_runner_features: [ capabilities ] + capabilities: + - method: PUT + path: /{index} + capabilities: [ logs_index_mode ] + reason: "Support for 'logs' index mode capability required" + + - do: + catch: bad_request + indices.create: + index: test + body: + settings: + index: + mode: logs + number_of_replicas: 0 + number_of_shards: 2 + time_series: + end_time: 2023-01-30T00:00:00Z + mappings: + properties: + "@timestamp": + type: date + hostname: + type: keyword + agent_id: + type: keyword + process_id: + type: integer + http_method: + type: keyword + message: + type: text + + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.type: "illegal_argument_exception" } + - match: { error.reason: "[index.time_series.end_time] requires [index.mode=time_series]" } diff --git a/server/src/main/java/org/elasticsearch/common/ReferenceDocs.java b/server/src/main/java/org/elasticsearch/common/ReferenceDocs.java index 8e370158d166a..2cac6ddb159bc 100644 --- a/server/src/main/java/org/elasticsearch/common/ReferenceDocs.java +++ b/server/src/main/java/org/elasticsearch/common/ReferenceDocs.java @@ -72,6 +72,7 @@ public enum ReferenceDocs { CONTACT_SUPPORT, UNASSIGNED_SHARDS, EXECUTABLE_JNA_TMPDIR, + NETWORK_THREADING_MODEL, // this comment keeps the ';' on the next line so every entry above has a trailing ',' which makes the diff for adding new links cleaner ; diff --git a/server/src/main/java/org/elasticsearch/index/IndexMode.java b/server/src/main/java/org/elasticsearch/index/IndexMode.java index 05169836d6617..3df5b3fe288a2 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexMode.java +++ b/server/src/main/java/org/elasticsearch/index/IndexMode.java @@ -53,15 +53,7 @@ public enum IndexMode { STANDARD("standard") { @Override void validateWithOtherSettings(Map, Object> settings) { - settingRequiresTimeSeries(settings, IndexMetadata.INDEX_ROUTING_PATH); - settingRequiresTimeSeries(settings, IndexSettings.TIME_SERIES_START_TIME); - settingRequiresTimeSeries(settings, IndexSettings.TIME_SERIES_END_TIME); - } - - private static void settingRequiresTimeSeries(Map, Object> settings, Setting setting) { - if (false == Objects.equals(setting.getDefault(Settings.EMPTY), settings.get(setting))) { - throw new IllegalArgumentException("[" + setting.getKey() + "] requires " + tsdbMode()); - } + IndexMode.validateTimeSeriesSettings(settings); } @Override @@ -225,12 +217,95 @@ public void validateSourceFieldMapper(SourceFieldMapper sourceFieldMapper) { } } + @Override + public boolean isSyntheticSourceEnabled() { + return true; + } + }, + LOGS("logs") { + @Override + void validateWithOtherSettings(Map, Object> settings) { + IndexMode.validateTimeSeriesSettings(settings); + } + + @Override + public void validateMapping(MappingLookup lookup) {} + + @Override + public void validateAlias(String indexRouting, String searchRouting) { + + } + + @Override + public void validateTimestampFieldMapping(boolean isDataStream, MappingLookup mappingLookup) throws IOException { + if (isDataStream) { + MetadataCreateDataStreamService.validateTimestampFieldMapping(mappingLookup); + } + } + + @Override + public CompressedXContent getDefaultMapping() { + return DEFAULT_LOGS_TIMESTAMP_MAPPING; + } + + @Override + public IdFieldMapper buildIdFieldMapper(BooleanSupplier fieldDataEnabled) { + return new ProvidedIdFieldMapper(fieldDataEnabled); + } + + @Override + public IdFieldMapper idFieldMapperWithoutFieldData() { + return ProvidedIdFieldMapper.NO_FIELD_DATA; + } + + @Override + public TimestampBounds getTimestampBound(IndexMetadata indexMetadata) { + return null; + } + + @Override + public MetadataFieldMapper timeSeriesIdFieldMapper() { + // non time-series indices must not have a TimeSeriesIdFieldMapper + return null; + } + + @Override + public MetadataFieldMapper timeSeriesRoutingHashFieldMapper() { + // non time-series indices must not have a TimeSeriesRoutingIdFieldMapper + return null; + } + + @Override + public DocumentDimensions buildDocumentDimensions(IndexSettings settings) { + return new DocumentDimensions.OnlySingleValueAllowed(); + } + + @Override + public boolean shouldValidateTimestamp() { + return false; + } + + @Override + public void validateSourceFieldMapper(SourceFieldMapper sourceFieldMapper) {} + @Override public boolean isSyntheticSourceEnabled() { return true; } }; + private static void validateTimeSeriesSettings(Map, Object> settings) { + settingRequiresTimeSeries(settings, IndexMetadata.INDEX_ROUTING_PATH); + settingRequiresTimeSeries(settings, IndexSettings.TIME_SERIES_START_TIME); + settingRequiresTimeSeries(settings, IndexSettings.TIME_SERIES_END_TIME); + } + + private static void settingRequiresTimeSeries(Map, Object> settings, Setting setting) { + if (false == Objects.equals(setting.getDefault(Settings.EMPTY), settings.get(setting))) { + throw new IllegalArgumentException("[" + setting.getKey() + "] requires " + tsdbMode()); + } + } + protected static String tsdbMode() { return "[" + IndexSettings.MODE.getKey() + "=time_series]"; } @@ -257,6 +332,27 @@ protected static String tsdbMode() { } } + public static final CompressedXContent DEFAULT_LOGS_TIMESTAMP_MAPPING; + + static { + try { + DEFAULT_LOGS_TIMESTAMP_MAPPING = new CompressedXContent( + ((builder, params) -> builder.startObject(MapperService.SINGLE_MAPPING_NAME) + .startObject(DataStreamTimestampFieldMapper.NAME) + .field("enabled", true) + .endObject() + .startObject("properties") + .startObject(DataStreamTimestampFieldMapper.DEFAULT_PATH) + .field("type", DateFieldMapper.CONTENT_TYPE) + .endObject() + .endObject() + .endObject()) + ); + } catch (IOException e) { + throw new AssertionError(e); + } + } + private static final List> TIME_SERIES_UNSUPPORTED = List.of( IndexSortConfig.INDEX_SORT_FIELD_SETTING, IndexSortConfig.INDEX_SORT_ORDER_SETTING, @@ -368,6 +464,7 @@ public static IndexMode fromString(String value) { return switch (value) { case "standard" -> IndexMode.STANDARD; case "time_series" -> IndexMode.TIME_SERIES; + case "logs" -> IndexMode.LOGS; default -> throw new IllegalArgumentException( "[" + value diff --git a/server/src/main/java/org/elasticsearch/index/IndexSortConfig.java b/server/src/main/java/org/elasticsearch/index/IndexSortConfig.java index 98c2e31838379..74c2c57594e72 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSortConfig.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSortConfig.java @@ -12,6 +12,7 @@ import org.apache.lucene.search.SortField; import org.apache.lucene.search.SortedNumericSortField; import org.apache.lucene.search.SortedSetSortField; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.common.logging.DeprecationCategory; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.settings.Setting; @@ -152,10 +153,16 @@ public IndexSortConfig(IndexSettings indexSettings) { } List fields = INDEX_SORT_FIELD_SETTING.get(settings); - this.sortSpecs = fields.stream().map((name) -> new FieldSortSpec(name)).toArray(FieldSortSpec[]::new); + if (this.indexMode == IndexMode.LOGS && fields.isEmpty()) { + fields = List.of("hostname", DataStream.TIMESTAMP_FIELD_NAME); + } + this.sortSpecs = fields.stream().map(FieldSortSpec::new).toArray(FieldSortSpec[]::new); if (INDEX_SORT_ORDER_SETTING.exists(settings)) { List orders = INDEX_SORT_ORDER_SETTING.get(settings); + if (this.indexMode == IndexMode.LOGS && orders.isEmpty()) { + orders = List.of(SortOrder.DESC, SortOrder.DESC); + } if (orders.size() != sortSpecs.length) { throw new IllegalArgumentException( "index.sort.field:" + fields + " index.sort.order:" + orders.toString() + ", size mismatch" @@ -168,6 +175,9 @@ public IndexSortConfig(IndexSettings indexSettings) { if (INDEX_SORT_MODE_SETTING.exists(settings)) { List modes = INDEX_SORT_MODE_SETTING.get(settings); + if (this.indexMode == IndexMode.LOGS && modes.isEmpty()) { + modes = List.of(MultiValueMode.MIN, MultiValueMode.MIN); + } if (modes.size() != sortSpecs.length) { throw new IllegalArgumentException("index.sort.field:" + fields + " index.sort.mode:" + modes + ", size mismatch"); } @@ -178,6 +188,9 @@ public IndexSortConfig(IndexSettings indexSettings) { if (INDEX_SORT_MISSING_SETTING.exists(settings)) { List missingValues = INDEX_SORT_MISSING_SETTING.get(settings); + if (this.indexMode == IndexMode.LOGS && missingValues.isEmpty()) { + missingValues = List.of("_first", "_first"); + } if (missingValues.size() != sortSpecs.length) { throw new IllegalArgumentException( "index.sort.field:" + fields + " index.sort.missing:" + missingValues + ", size mismatch" diff --git a/server/src/main/java/org/elasticsearch/index/codec/PerFieldFormatSupplier.java b/server/src/main/java/org/elasticsearch/index/codec/PerFieldFormatSupplier.java index 81fc2c0b4a065..0b4bb9dfc10ae 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/PerFieldFormatSupplier.java +++ b/server/src/main/java/org/elasticsearch/index/codec/PerFieldFormatSupplier.java @@ -107,7 +107,9 @@ boolean useTSDBDocValuesFormat(final String field) { return false; } - return mapperService != null && isTimeSeriesModeIndex() && mapperService.getIndexSettings().isES87TSDBCodecEnabled(); + return mapperService != null + && (isTimeSeriesModeIndex() || isLogsModeIndex()) + && mapperService.getIndexSettings().isES87TSDBCodecEnabled(); } private boolean excludeFields(String fieldName) { @@ -120,4 +122,8 @@ private boolean isTimeSeriesModeIndex() { return mapperService != null && IndexMode.TIME_SERIES == mapperService.getIndexSettings().getMode(); } + private boolean isLogsModeIndex() { + return mapperService != null && IndexMode.LOGS == mapperService.getIndexSettings().getMode(); + } + } diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/CreateIndexCapabilities.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/CreateIndexCapabilities.java new file mode 100644 index 0000000000000..700baac09865e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/CreateIndexCapabilities.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.rest.action.admin.indices; + +import java.util.Set; + +/** + * A {@link Set} of "capabilities" supported by the {@link RestCreateIndexAction}. + */ +public class CreateIndexCapabilities { + + /** + * Support for using the 'logs' index mode. + */ + private static final String LOGS_INDEX_MODE_CAPABILITY = "logs_index_mode"; + + public static Set CAPABILITIES = Set.of(LOGS_INDEX_MODE_CAPABILITY); +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateIndexAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateIndexAction.java index 3ef96bd44a597..4f9fd9d03521d 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateIndexAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateIndexAction.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import static java.util.Collections.singletonMap; import static org.elasticsearch.rest.RestRequest.Method.PUT; @@ -145,4 +146,9 @@ static Map prepareMappings(Map source) { newSource.put("mappings", singletonMap(MapperService.SINGLE_MAPPING_NAME, mappings)); return newSource; } + + @Override + public Set supportedCapabilities() { + return CreateIndexCapabilities.CAPABILITIES; + } } diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index 79b6daef671af..37cb8931d8cb0 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.BytesRef; import org.elasticsearch.TransportVersion; +import org.elasticsearch.common.ReferenceDocs; import org.elasticsearch.common.io.stream.ByteBufferStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -169,15 +170,13 @@ private TransportResponseHandler findResponseHandler(Header header) { private static void logSlowMessage(InboundMessage message, long took, long logThreshold, TransportResponseHandler responseHandler) { if (message.getHeader().isRequest()) { - logger.warn("handling request [{}] took [{}ms] which is above the warn threshold of [{}ms]", message, took, logThreshold); + logger.warn(""" + handling request [{}] took [{}ms] which is above the warn threshold of [{}ms]; \ + for more information, see {}""", message, took, logThreshold, ReferenceDocs.NETWORK_THREADING_MODEL); } else { - logger.warn( - "handling response [{}] on handler [{}] took [{}ms] which is above the warn threshold of [{}ms]", - message, - responseHandler, - took, - logThreshold - ); + logger.warn(""" + handling response [{}] on handler [{}] took [{}ms] which is above the warn threshold of [{}ms]; \ + for more information, see {}""", message, responseHandler, took, logThreshold, ReferenceDocs.NETWORK_THREADING_MODEL); } } diff --git a/server/src/main/resources/org/elasticsearch/common/reference-docs-links.json b/server/src/main/resources/org/elasticsearch/common/reference-docs-links.json index 503f02b25eb8d..f3e5bd7a375f1 100644 --- a/server/src/main/resources/org/elasticsearch/common/reference-docs-links.json +++ b/server/src/main/resources/org/elasticsearch/common/reference-docs-links.json @@ -32,5 +32,6 @@ "BOOTSTRAP_CHECK_SECURITY_MINIMAL_SETUP": "security-minimal-setup.html", "CONTACT_SUPPORT": "troubleshooting.html#troubleshooting-contact-support", "UNASSIGNED_SHARDS": "red-yellow-cluster-status.html", - "EXECUTABLE_JNA_TMPDIR": "executable-jna-tmpdir.html" + "EXECUTABLE_JNA_TMPDIR": "executable-jna-tmpdir.html", + "NETWORK_THREADING_MODEL": "modules-network.html#modules-network-threading-model" } diff --git a/server/src/test/java/org/elasticsearch/index/LogsIndexModeTests.java b/server/src/test/java/org/elasticsearch/index/LogsIndexModeTests.java new file mode 100644 index 0000000000000..fd73a8c9f8f52 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/LogsIndexModeTests.java @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.index; + +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.mapper.MapperServiceTestCase; +import org.hamcrest.Matchers; + +public class LogsIndexModeTests extends MapperServiceTestCase { + public void testLogsIndexModeSetting() { + assertThat(IndexSettings.MODE.get(buildSettings()), Matchers.equalTo(IndexMode.LOGS)); + } + + public void testSortField() { + final Settings sortSettings = Settings.builder() + .put(buildSettings()) + .put(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey(), "agent_id") + .build(); + final IndexMetadata metadata = IndexSettingsTests.newIndexMeta("test", sortSettings); + final IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY); + assertThat("agent_id", Matchers.equalTo(getIndexSetting(settings, IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey()))); + } + + public void testSortMode() { + final Settings sortSettings = Settings.builder() + .put(buildSettings()) + .put(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey(), "agent_id") + .put(IndexSortConfig.INDEX_SORT_MODE_SETTING.getKey(), "max") + .build(); + final IndexMetadata metadata = IndexSettingsTests.newIndexMeta("test", sortSettings); + final IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY); + assertThat("agent_id", Matchers.equalTo(getIndexSetting(settings, IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey()))); + assertThat("max", Matchers.equalTo(getIndexSetting(settings, IndexSortConfig.INDEX_SORT_MODE_SETTING.getKey()))); + } + + public void testSortOrder() { + final Settings sortSettings = Settings.builder() + .put(buildSettings()) + .put(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey(), "agent_id") + .put(IndexSortConfig.INDEX_SORT_ORDER_SETTING.getKey(), "desc") + .build(); + final IndexMetadata metadata = IndexSettingsTests.newIndexMeta("test", sortSettings); + final IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY); + assertThat("agent_id", Matchers.equalTo(getIndexSetting(settings, IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey()))); + assertThat("desc", Matchers.equalTo(getIndexSetting(settings, IndexSortConfig.INDEX_SORT_ORDER_SETTING.getKey()))); + } + + public void testSortMissing() { + final Settings sortSettings = Settings.builder() + .put(buildSettings()) + .put(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey(), "agent_id") + .put(IndexSortConfig.INDEX_SORT_MISSING_SETTING.getKey(), "_last") + .build(); + final IndexMetadata metadata = IndexSettingsTests.newIndexMeta("test", sortSettings); + final IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY); + assertThat("agent_id", Matchers.equalTo(getIndexSetting(settings, IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey()))); + assertThat("_last", Matchers.equalTo(getIndexSetting(settings, IndexSortConfig.INDEX_SORT_MISSING_SETTING.getKey()))); + } + + private Settings buildSettings() { + return Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOGS.getName()).build(); + } + + private String getIndexSetting(final IndexSettings settings, final String name) { + return settings.getIndexMetadata().getSettings().get(name); + } +} diff --git a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java index 23f9a7367298f..c9e1a7dbc0cfd 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java @@ -272,7 +272,12 @@ public void testLogsSlowInboundProcessing() throws Exception { final TransportVersion remoteVersion = TransportVersion.current(); mockLog.addExpectation( - new MockLog.SeenEventExpectation("expected slow request", EXPECTED_LOGGER_NAME, Level.WARN, "handling request ") + new MockLog.SeenEventExpectation( + "expected slow request", + EXPECTED_LOGGER_NAME, + Level.WARN, + "handling request*modules-network.html#modules-network-threading-model" + ) ); final long requestId = randomNonNegativeLong(); @@ -285,13 +290,11 @@ public void testLogsSlowInboundProcessing() throws Exception { BytesStreamOutput byteData = new BytesStreamOutput(); TaskId.EMPTY_TASK_ID.writeTo(byteData); TransportVersion.writeVersion(remoteVersion, byteData); - final InboundMessage requestMessage = new InboundMessage(requestHeader, ReleasableBytesReference.wrap(byteData.bytes()), () -> { - try { - TimeUnit.SECONDS.sleep(1L); - } catch (InterruptedException e) { - throw new AssertionError(e); - } - }); + final InboundMessage requestMessage = new InboundMessage( + requestHeader, + ReleasableBytesReference.wrap(byteData.bytes()), + () -> safeSleep(TimeValue.timeValueSeconds(1)) + ); requestHeader.actionName = TransportHandshaker.HANDSHAKE_ACTION_NAME; requestHeader.headers = Tuple.tuple(Map.of(), Map.of()); handler.inboundMessage(channel, requestMessage); @@ -299,7 +302,12 @@ public void testLogsSlowInboundProcessing() throws Exception { mockLog.assertAllExpectationsMatched(); mockLog.addExpectation( - new MockLog.SeenEventExpectation("expected slow response", EXPECTED_LOGGER_NAME, Level.WARN, "handling response ") + new MockLog.SeenEventExpectation( + "expected slow response", + EXPECTED_LOGGER_NAME, + Level.WARN, + "handling response*modules-network.html#modules-network-threading-model" + ) ); final long responseId = randomNonNegativeLong(); @@ -310,11 +318,7 @@ public void testLogsSlowInboundProcessing() throws Exception { @SuppressWarnings("rawtypes") public void onResponseReceived(long requestId, Transport.ResponseContext context) { assertEquals(responseId, requestId); - try { - TimeUnit.SECONDS.sleep(1L); - } catch (InterruptedException e) { - throw new AssertionError(e); - } + safeSleep(TimeValue.timeValueSeconds(1)); } }); handler.inboundMessage(channel, new InboundMessage(responseHeader, ReleasableBytesReference.empty(), () -> {})); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 14269a8835f57..02be27b92a1eb 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -2187,6 +2187,10 @@ public static T safeGet(Future future) { } } + public static void safeSleep(TimeValue timeValue) { + safeSleep(timeValue.millis()); + } + public static void safeSleep(long millis) { try { Thread.sleep(millis); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index d966a21a56b5f..6ced86156d008 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkAddress; +import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; @@ -152,6 +153,8 @@ protected Set> getSupportedSettings() { return ClusterSettings.BUILT_IN_CLUSTER_SETTINGS; } + protected static final NetworkService networkService = new NetworkService(List.of()); + @Override @Before public void setUp() throws Exception { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 0abfc6e911d2d..15e358b68e648 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -696,6 +696,9 @@ public void testProcessOnceOnPrimary() throws Exception { case TIME_SERIES: settingsBuilder.put("index.mode", "time_series").put("index.routing_path", "foo"); break; + case LOGS: + settingsBuilder.put("index.mode", "logs"); + break; default: throw new UnsupportedOperationException("Unknown index mode [" + indexMode + "]"); } diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java index d1749ac3bf3e8..1dacc394c8d21 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java @@ -7,12 +7,16 @@ package org.elasticsearch.xpack.esql.core.type; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.search.sort.ScriptSortBuilder; +import java.io.IOException; import java.util.Locale; import java.util.Objects; -public class DataType { +public class DataType implements Writeable { private final String typeName; @@ -118,4 +122,21 @@ public boolean equals(Object obj) { public String toString() { return name; } + + public static DataType readFrom(StreamInput in) throws IOException { + String name = in.readString(); + if (name.equalsIgnoreCase(DataTypes.DOC_DATA_TYPE.name())) { + return DataTypes.DOC_DATA_TYPE; + } + DataType dataType = DataTypes.fromTypeName(name); + if (dataType == null) { + throw new IOException("Unknown DataType for type name: " + name); + } + return dataType; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(typeName); + } } diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataTypes.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataTypes.java index c28bf96728ad0..8d4129d1abbcc 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataTypes.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataTypes.java @@ -78,6 +78,9 @@ public final class DataTypes { public static final DataType COUNTER_INTEGER = new DataType("counter_integer", Integer.BYTES, false, false, true); public static final DataType COUNTER_DOUBLE = new DataType("counter_double", Double.BYTES, false, false, true); + public static final DataType DOC_DATA_TYPE = new DataType("_doc", Integer.BYTES * 3, false, false, false); + public static final DataType TSID_DATA_TYPE = new DataType("_tsid", Integer.MAX_VALUE, false, false, true); + private static final Collection TYPES = Stream.of( UNSUPPORTED, NULL, diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java index 7d1c168bd203f..d78905cc2cffa 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java @@ -354,7 +354,8 @@ public static ExpectedResults loadCsvSpecValues(String csv) { for (int i = 0; i < row.size(); i++) { String value = row.get(i); if (value == null) { - rowValues.add(null); + // Empty cells are converted to null by SuperCSV. We convert them back to empty strings. + rowValues.add(""); continue; } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec index 0de64d3e2d9d4..61f529d60bf90 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec @@ -531,7 +531,7 @@ required_capability: fn_ip_prefix from hosts | where host == "alpha" | sort card -| eval prefix = ip_prefix(ip0, 24, 128) +| eval prefix = ip_prefix(ip0, 24, 120) | keep card, ip0, prefix; card:keyword | ip0:ip | prefix:ip diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/string.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/string.csv-spec index 6322746318230..13616e5146949 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/string.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/string.csv-spec @@ -195,6 +195,14 @@ emp_no:integer | last_name:keyword | x:keyword | z:keyword 10010 | Piveteau | P | a ; +substring empty string +required_capability: fn_substring_empty_null +row sub = substring("", 1, 3); + +sub:keyword +"" +; + substring Emoji#[skip:-8.13.99,reason:bug fix in 8.14] row a = "🐱Meow!🐶Woof!" | eval sub1 = substring(a, 2) | eval sub2 = substring(a, 2, 100); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index e8f136c297ce0..675b99c61bfbe 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -32,6 +32,11 @@ public class EsqlCapabilities { */ private static final String FN_IP_PREFIX = "fn_ip_prefix"; + /** + * Fix on function {@code SUBSTRING} that makes it not return null on empty strings. + */ + private static final String FN_SUBSTRING_EMPTY_NULL = "fn_substring_empty_null"; + /** * Optimization for ST_CENTROID changed some results in cartesian data. #108713 */ @@ -53,6 +58,7 @@ private static Set capabilities() { List caps = new ArrayList<>(); caps.add(FN_CBRT); caps.add(FN_IP_PREFIX); + caps.add(FN_SUBSTRING_EMPTY_NULL); caps.add(ST_CENTROID_AGG_OPTIMIZED); caps.add(METADATA_IGNORED_FIELD); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/ip/IpPrefix.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/ip/IpPrefix.java index 5c576101668b3..9d429a620ed6e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/ip/IpPrefix.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/ip/IpPrefix.java @@ -136,7 +136,14 @@ static BytesRef process( throw new IllegalArgumentException("Prefix length v6 must be in range [0, 128], found " + prefixLengthV6); } - boolean isIpv4 = Arrays.compareUnsigned(ip.bytes, 0, IPV4_PREFIX.length, IPV4_PREFIX, 0, IPV4_PREFIX.length) == 0; + boolean isIpv4 = Arrays.compareUnsigned( + ip.bytes, + ip.offset, + ip.offset + IPV4_PREFIX.length, + IPV4_PREFIX, + 0, + IPV4_PREFIX.length + ) == 0; if (isIpv4) { makePrefix(ip, scratch, 12 + prefixLengthV4 / 8, prefixLengthV4 % 8); @@ -154,7 +161,7 @@ private static void makePrefix(BytesRef ip, BytesRef scratch, int fullBytes, int // Copy the last byte ignoring the trailing bits if (remainingBits > 0) { byte lastByteMask = (byte) (0xFF << (8 - remainingBits)); - scratch.bytes[fullBytes] = (byte) (ip.bytes[fullBytes] & lastByteMask); + scratch.bytes[fullBytes] = (byte) (ip.bytes[ip.offset + fullBytes] & lastByteMask); } // Copy the last empty bytes diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/Substring.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/Substring.java index 5c84d5bcee853..4f3d0e15eef72 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/Substring.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/Substring.java @@ -111,12 +111,12 @@ static BytesRef process(BytesRef str, int start) { @Evaluator static BytesRef process(BytesRef str, int start, int length) { - if (str.length == 0) { - return null; - } if (length < 0) { throw new IllegalArgumentException("Length parameter cannot be negative, found [" + length + "]"); } + if (str.length == 0) { + return str; + } int codePointCount = UnicodeUtil.codePointCount(str); int indexStart = indexStart(codePointCount, start); int indexEnd = Math.min(codePointCount, indexStart + length); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java index 33c5937c0fd0a..7a7cbb55f5629 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java @@ -1070,7 +1070,7 @@ static FieldAttribute readFieldAttribute(PlanStreamInput in) throws IOException Source.readFrom(in), in.readOptionalWithReader(PlanNamedTypes::readFieldAttribute), in.readString(), - in.dataTypeFromTypeName(in.readString()), + DataType.readFrom(in), in.readEsFieldNamed(), in.readOptionalString(), in.readEnum(Nullability.class), @@ -1095,7 +1095,7 @@ static ReferenceAttribute readReferenceAttr(PlanStreamInput in) throws IOExcepti return new ReferenceAttribute( Source.readFrom(in), in.readString(), - in.dataTypeFromTypeName(in.readString()), + DataType.readFrom(in), in.readOptionalString(), in.readEnum(Nullability.class), NameId.readFrom(in), @@ -1117,7 +1117,7 @@ static MetadataAttribute readMetadataAttr(PlanStreamInput in) throws IOException return new MetadataAttribute( Source.readFrom(in), in.readString(), - in.dataTypeFromTypeName(in.readString()), + DataType.readFrom(in), in.readOptionalString(), in.readEnum(Nullability.class), NameId.readFrom(in), @@ -1160,7 +1160,7 @@ static void writeUnsupportedAttr(PlanStreamOutput out, UnsupportedAttribute unsu static EsField readEsField(PlanStreamInput in) throws IOException { return new EsField( in.readString(), - in.dataTypeFromTypeName(in.readString()), + DataType.readFrom(in), in.readImmutableMap(StreamInput::readString, readerFromPlanReader(PlanStreamInput::readEsFieldNamed)), in.readBoolean(), in.readBoolean() @@ -1875,7 +1875,7 @@ static void writeAlias(PlanStreamOutput out, Alias alias) throws IOException { static Literal readLiteral(PlanStreamInput in) throws IOException { Source source = Source.readFrom(in); Object value = in.readGenericValue(); - DataType dataType = in.dataTypeFromTypeName(in.readString()); + DataType dataType = DataType.readFrom(in); return new Literal(source, mapToLiteralValue(in, dataType, value), dataType); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamInput.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamInput.java index d5b1990e69c5f..6c1b7db24fe46 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamInput.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamInput.java @@ -30,14 +30,11 @@ import org.elasticsearch.xpack.esql.core.expression.NameId; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.plan.logical.LogicalPlan; -import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.type.EsField; import org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.PlanNamedReader; import org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.PlanReader; -import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.session.EsqlConfiguration; -import org.elasticsearch.xpack.esql.type.EsqlDataTypes; import java.io.IOException; import java.util.Collection; @@ -90,19 +87,6 @@ public PlanStreamInput( this.nameIdFunction = new NameIdMapper(); } - DataType dataTypeFromTypeName(String typeName) throws IOException { - DataType dataType; - if (typeName.equalsIgnoreCase(EsQueryExec.DOC_DATA_TYPE.name())) { - dataType = EsQueryExec.DOC_DATA_TYPE; - } else { - dataType = EsqlDataTypes.fromTypeName(typeName); - } - if (dataType == null) { - throw new IOException("Unknown DataType for type name: " + typeName); - } - return dataType; - } - public LogicalPlan readLogicalPlanNode() throws IOException { return readNamed(LogicalPlan.class); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java index b354e519c0b99..30986f9c626da 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java @@ -20,7 +20,6 @@ import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.NodeUtils; import org.elasticsearch.xpack.esql.core.tree.Source; -import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.type.DataTypes; import org.elasticsearch.xpack.esql.core.type.EsField; @@ -29,11 +28,8 @@ import java.util.Objects; public class EsQueryExec extends LeafExec implements EstimatesRowSize { - public static final DataType DOC_DATA_TYPE = new DataType("_doc", Integer.BYTES * 3, false, false, false); - public static final DataType TSID_DATA_TYPE = new DataType("_tsid", Integer.MAX_VALUE, false, false, true); - - static final EsField DOC_ID_FIELD = new EsField("_doc", DOC_DATA_TYPE, Map.of(), false); - static final EsField TSID_FIELD = new EsField("_tsid", TSID_DATA_TYPE, Map.of(), true); + static final EsField DOC_ID_FIELD = new EsField("_doc", DataTypes.DOC_DATA_TYPE, Map.of(), false); + static final EsField TSID_FIELD = new EsField("_tsid", DataTypes.TSID_DATA_TYPE, Map.of(), true); static final EsField TIMESTAMP_FIELD = new EsField("@timestamp", DataTypes.DATETIME, Map.of(), true); static final EsField INTERVAL_FIELD = new EsField("@timestamp_interval", DataTypes.DATETIME, Map.of(), true); @@ -86,7 +82,7 @@ public EsQueryExec( private static List sourceAttributes(Source source, IndexMode indexMode) { return switch (indexMode) { - case STANDARD -> List.of(new FieldAttribute(source, DOC_ID_FIELD.getName(), DOC_ID_FIELD)); + case STANDARD, LOGS -> List.of(new FieldAttribute(source, DOC_ID_FIELD.getName(), DOC_ID_FIELD)); case TIME_SERIES -> List.of( new FieldAttribute(source, DOC_ID_FIELD.getName(), DOC_ID_FIELD), new FieldAttribute(source, TSID_FIELD.getName(), TSID_FIELD), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index 0b4ee46af69ee..39b641a872c58 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -39,7 +39,6 @@ import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.TopN; import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; -import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec; import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize; import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec; @@ -268,10 +267,10 @@ public static ElementType toElementType(DataType dataType, MappedFieldType.Field if (dataType == DataTypes.BOOLEAN) { return ElementType.BOOLEAN; } - if (dataType == EsQueryExec.DOC_DATA_TYPE) { + if (dataType == DataTypes.DOC_DATA_TYPE) { return ElementType.DOC; } - if (dataType == EsQueryExec.TSID_DATA_TYPE) { + if (dataType == DataTypes.TSID_DATA_TYPE) { return ElementType.BYTES_REF; } if (EsqlDataTypes.isSpatialPoint(dataType)) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/SubstringTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/SubstringTests.java index 36ace66290fc1..c2df6f61c1d25 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/SubstringTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/SubstringTests.java @@ -78,6 +78,24 @@ public static Iterable parameters() { equalTo(new BytesRef(text.substring(start - 1, start + length - 1))) ); } + ), + new TestCaseSupplier( + "Substring empty string", + List.of(DataTypes.TEXT, DataTypes.INTEGER, DataTypes.INTEGER), + () -> { + int start = between(1, 8); + int length = between(1, 10 - start); + return new TestCaseSupplier.TestCase( + List.of( + new TestCaseSupplier.TypedData(new BytesRef(""), DataTypes.TEXT, "str"), + new TestCaseSupplier.TypedData(start, DataTypes.INTEGER, "start"), + new TestCaseSupplier.TypedData(length, DataTypes.INTEGER, "end") + ), + "SubstringEvaluator[str=Attribute[channel=0], start=Attribute[channel=1], length=Attribute[channel=2]]", + DataTypes.KEYWORD, + equalTo(new BytesRef("")) + ); + } ) ) ) diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportTests.java index a9d8c1dfc8d9e..0663172fa2e9c 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportTests.java @@ -323,19 +323,20 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th } ) ) { - final ChannelHandler handler = transport.configureServerChannelHandler(); - final EmbeddedChannel ch = new EmbeddedChannel(handler); - // remove these pipeline handlers as they interfere in the test scenario - for (String pipelineHandlerName : ch.pipeline().names()) { - if (pipelineHandlerName.equals("decoder") - || pipelineHandlerName.equals("encoder") - || pipelineHandlerName.equals("encoder_compress") - || pipelineHandlerName.equals("chunked_writer")) { - ch.pipeline().remove(pipelineHandlerName); + safeGet(testThreadPool.generic().submit(() -> { + final ChannelHandler handler = transport.configureServerChannelHandler(); + final EmbeddedChannel ch = new EmbeddedChannel(handler); + // remove these pipeline handlers as they interfere in the test scenario + for (String pipelineHandlerName : ch.pipeline().names()) { + if (pipelineHandlerName.equals("decoder") + || pipelineHandlerName.equals("encoder") + || pipelineHandlerName.equals("encoder_compress") + || pipelineHandlerName.equals("chunked_writer")) { + ch.pipeline().remove(pipelineHandlerName); + } } - } - // STEP 0: send a "wrapped" request - var writeFuture = testThreadPool.generic().submit(() -> { + + // STEP 0: send a "wrapped" request ch.writeInbound( HttpHeadersAuthenticatorUtils.wrapAsMessageWithAuthenticationContext( new DefaultHttpRequest(HTTP_1_1, HttpMethod.GET, "/wrapped_request") @@ -343,8 +344,8 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th ); ch.writeInbound(new DefaultLastHttpContent()); ch.flushInbound(); - }); - writeFuture.get(); + })); + // STEP 3: assert the wrapped context var storedAuthnContext = HttpHeadersAuthenticatorUtils.extractAuthenticationContext(dispatchedHttpRequestReference.get()); assertThat(storedAuthnContext, notNullValue()); @@ -378,35 +379,33 @@ public void testHttpHeaderAuthnBypassHeaderValidator() throws Exception { (httpPreRequest, channel, listener) -> listener.onResponse(null) ) ) { - final ChannelHandler handler = transport.configureServerChannelHandler(); - final EmbeddedChannel ch = new EmbeddedChannel(handler); - for (String pipelineHandlerName : ch.pipeline().names()) { - // remove the decoder AND the header_validator - if (pipelineHandlerName.equals("decoder") || pipelineHandlerName.equals("header_validator") - // remove these pipeline handlers as they interfere in the test scenario - || pipelineHandlerName.equals("encoder") - || pipelineHandlerName.equals("encoder_compress") - || pipelineHandlerName.equals("chunked_writer")) { - ch.pipeline().remove(pipelineHandlerName); + safeGet(testThreadPool.generic().submit(() -> { + + final ChannelHandler handler = transport.configureServerChannelHandler(); + final EmbeddedChannel ch = new EmbeddedChannel(handler); + for (String pipelineHandlerName : ch.pipeline().names()) { + // remove the decoder AND the header_validator + if (pipelineHandlerName.equals("decoder") || pipelineHandlerName.equals("header_validator") + // remove these pipeline handlers as they interfere in the test scenario + || pipelineHandlerName.equals("encoder") + || pipelineHandlerName.equals("encoder_compress") + || pipelineHandlerName.equals("chunked_writer")) { + ch.pipeline().remove(pipelineHandlerName); + } } - } - // this tests a request that cannot be authenticated, but somehow passed authentication - // this is the case of an erroneous internal state - var writeFuture = testThreadPool.generic().submit(() -> { + // this tests a request that cannot be authenticated, but somehow passed authentication + // this is the case of an erroneous internal state ch.writeInbound(new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, "/unauthenticable_request")); ch.flushInbound(); - }); - writeFuture.get(); - ch.flushOutbound(); - Netty4FullHttpResponse response = ch.readOutbound(); - assertThat(response.status(), is(HttpResponseStatus.INTERNAL_SERVER_ERROR)); - String responseContentString = new String(ByteBufUtil.getBytes(response.content()), StandardCharsets.UTF_8); - assertThat( - responseContentString, - containsString("\"type\":\"security_exception\",\"reason\":\"Request is not authenticated\"") - ); - // this tests a request that CAN be authenticated, but that, somehow, has not been - writeFuture = testThreadPool.generic().submit(() -> { + ch.flushOutbound(); + Netty4FullHttpResponse response = ch.readOutbound(); + assertThat(response.status(), is(HttpResponseStatus.INTERNAL_SERVER_ERROR)); + String responseContentString = new String(ByteBufUtil.getBytes(response.content()), StandardCharsets.UTF_8); + assertThat( + responseContentString, + containsString("\"type\":\"security_exception\",\"reason\":\"Request is not authenticated\"") + ); + // this tests a request that CAN be authenticated, but that, somehow, has not been ch.writeInbound( HttpHeadersAuthenticatorUtils.wrapAsMessageWithAuthenticationContext( new DefaultHttpRequest(HTTP_1_1, HttpMethod.GET, "/_request") @@ -414,19 +413,16 @@ public void testHttpHeaderAuthnBypassHeaderValidator() throws Exception { ); ch.writeInbound(new DefaultLastHttpContent()); ch.flushInbound(); - }); - writeFuture.get(); - ch.flushOutbound(); - response = ch.readOutbound(); - assertThat(response.status(), is(HttpResponseStatus.INTERNAL_SERVER_ERROR)); - responseContentString = new String(ByteBufUtil.getBytes(response.content()), StandardCharsets.UTF_8); - assertThat( - responseContentString, - containsString("\"type\":\"security_exception\",\"reason\":\"Request is not authenticated\"") - ); - // this tests the case where authentication passed and the request is to be dispatched, BUT that the authentication context - // cannot be instated before dispatching the request - writeFuture = testThreadPool.generic().submit(() -> { + ch.flushOutbound(); + response = ch.readOutbound(); + assertThat(response.status(), is(HttpResponseStatus.INTERNAL_SERVER_ERROR)); + responseContentString = new String(ByteBufUtil.getBytes(response.content()), StandardCharsets.UTF_8); + assertThat( + responseContentString, + containsString("\"type\":\"security_exception\",\"reason\":\"Request is not authenticated\"") + ); + // this tests the case where authentication passed and the request is to be dispatched, BUT that the authentication context + // cannot be instated before dispatching the request HttpMessage authenticableMessage = HttpHeadersAuthenticatorUtils.wrapAsMessageWithAuthenticationContext( new DefaultHttpRequest(HTTP_1_1, HttpMethod.GET, "/unauthenticated_request") ); @@ -436,13 +432,12 @@ public void testHttpHeaderAuthnBypassHeaderValidator() throws Exception { ch.writeInbound(authenticableMessage); ch.writeInbound(new DefaultLastHttpContent()); ch.flushInbound(); - }); - writeFuture.get(); - ch.flushOutbound(); - response = ch.readOutbound(); - assertThat(response.status(), is(HttpResponseStatus.INTERNAL_SERVER_ERROR)); - responseContentString = new String(ByteBufUtil.getBytes(response.content()), StandardCharsets.UTF_8); - assertThat(responseContentString, containsString("\"type\":\"exception\",\"reason\":\"Boom\"")); + ch.flushOutbound(); + response = ch.readOutbound(); + assertThat(response.status(), is(HttpResponseStatus.INTERNAL_SERVER_ERROR)); + responseContentString = new String(ByteBufUtil.getBytes(response.content()), StandardCharsets.UTF_8); + assertThat(responseContentString, containsString("\"type\":\"exception\",\"reason\":\"Boom\"")); + })); } finally { testThreadPool.shutdownNow(); } @@ -483,43 +478,41 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th (httpPreRequest, channel, listener) -> listener.onResponse(null) ) ) { - final ChannelHandler handler = transport.configureServerChannelHandler(); - final EmbeddedChannel ch = new EmbeddedChannel(handler); - // replace the decoder with the vanilla one that does no wrapping and will trip the header validator - ch.pipeline().replace("decoder", "decoder", new HttpRequestDecoder()); - // remove these pipeline handlers as they interfere in the test scenario - for (String pipelineHandlerName : ch.pipeline().names()) { - if (pipelineHandlerName.equals("encoder") - || pipelineHandlerName.equals("encoder_compress") - || pipelineHandlerName.equals("chunked_writer")) { - ch.pipeline().remove(pipelineHandlerName); + safeGet(testThreadPool.generic().submit(() -> { + final ChannelHandler handler = transport.configureServerChannelHandler(); + final EmbeddedChannel ch = new EmbeddedChannel(handler); + // replace the decoder with the vanilla one that does no wrapping and will trip the header validator + ch.pipeline().replace("decoder", "decoder", new HttpRequestDecoder()); + // remove these pipeline handlers as they interfere in the test scenario + for (String pipelineHandlerName : ch.pipeline().names()) { + if (pipelineHandlerName.equals("encoder") + || pipelineHandlerName.equals("encoder_compress") + || pipelineHandlerName.equals("chunked_writer")) { + ch.pipeline().remove(pipelineHandlerName); + } } - } - // tests requests that are not wrapped by the "decoder" and so cannot be authenticated - testThreadPool.generic().submit(() -> { + // tests requests that are not wrapped by the "decoder" and so cannot be authenticated ch.writeInbound(new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, "/unwrapped_full_request")); ch.flushInbound(); - }).get(); - ch.flushOutbound(); - Netty4FullHttpResponse response = ch.readOutbound(); - assertThat(response.status(), is(HttpResponseStatus.INTERNAL_SERVER_ERROR)); - var responseContentString = new String(ByteBufUtil.getBytes(response.content()), StandardCharsets.UTF_8); - assertThat( - responseContentString, - containsString("\"type\":\"illegal_state_exception\",\"reason\":\"Cannot authenticate unwrapped requests\"") - ); - testThreadPool.generic().submit(() -> { + ch.flushOutbound(); + Netty4FullHttpResponse response = ch.readOutbound(); + assertThat(response.status(), is(HttpResponseStatus.INTERNAL_SERVER_ERROR)); + var responseContentString = new String(ByteBufUtil.getBytes(response.content()), StandardCharsets.UTF_8); + assertThat( + responseContentString, + containsString("\"type\":\"illegal_state_exception\",\"reason\":\"Cannot authenticate unwrapped requests\"") + ); ch.writeInbound(new DefaultHttpRequest(HTTP_1_1, HttpMethod.GET, "/unwrapped_request")); ch.flushInbound(); - }).get(); - ch.flushOutbound(); - response = ch.readOutbound(); - assertThat(response.status(), is(HttpResponseStatus.INTERNAL_SERVER_ERROR)); - responseContentString = new String(ByteBufUtil.getBytes(response.content()), StandardCharsets.UTF_8); - assertThat( - responseContentString, - containsString("\"type\":\"illegal_state_exception\",\"reason\":\"Cannot authenticate unwrapped requests\"") - ); + ch.flushOutbound(); + response = ch.readOutbound(); + assertThat(response.status(), is(HttpResponseStatus.INTERNAL_SERVER_ERROR)); + responseContentString = new String(ByteBufUtil.getBytes(response.content()), StandardCharsets.UTF_8); + assertThat( + responseContentString, + containsString("\"type\":\"illegal_state_exception\",\"reason\":\"Cannot authenticate unwrapped requests\"") + ); + })); } finally { testThreadPool.shutdownNow(); } @@ -571,111 +564,97 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th } ) ) { - final ChannelHandler handler = transport.configureServerChannelHandler(); - assertThat(authnInvocationCount.get(), is(0)); - assertThat(badDispatchInvocationCount.get(), is(0)); - // case 1: invalid initial line - { - EmbeddedChannel ch = new EmbeddedChannel(handler); - ByteBuf buf = ch.alloc().buffer(); - ByteBufUtil.copy(AsciiString.of("This is not a valid HTTP line"), buf); - buf.writeByte(HttpConstants.LF); - buf.writeByte(HttpConstants.LF); - var writeFuture = testThreadPool.generic().submit(() -> { + safeGet(testThreadPool.generic().submit(() -> { + + final ChannelHandler handler = transport.configureServerChannelHandler(); + assertThat(authnInvocationCount.get(), is(0)); + assertThat(badDispatchInvocationCount.get(), is(0)); + // case 1: invalid initial line + { + EmbeddedChannel ch = new EmbeddedChannel(handler); + ByteBuf buf = ch.alloc().buffer(); + ByteBufUtil.copy(AsciiString.of("This is not a valid HTTP line"), buf); + buf.writeByte(HttpConstants.LF); + buf.writeByte(HttpConstants.LF); ch.writeInbound(buf); ch.flushInbound(); - }); - writeFuture.get(); - assertThat(dispatchThrowableReference.get().toString(), containsString("NOT A VALID HTTP LINE")); - assertThat(badDispatchInvocationCount.get(), is(1)); - assertThat(authnInvocationCount.get(), is(0)); - } - // case 2: too long initial line - { - EmbeddedChannel ch = new EmbeddedChannel(handler); - ByteBuf buf = ch.alloc().buffer(); - ByteBufUtil.copy(AsciiString.of("GET /this/is/a/valid/but/too/long/initial/line HTTP/1.1"), buf); - buf.writeByte(HttpConstants.LF); - buf.writeByte(HttpConstants.LF); - var writeFuture = testThreadPool.generic().submit(() -> { + assertThat(dispatchThrowableReference.get().toString(), containsString("NOT A VALID HTTP LINE")); + assertThat(badDispatchInvocationCount.get(), is(1)); + assertThat(authnInvocationCount.get(), is(0)); + } + // case 2: too long initial line + { + EmbeddedChannel ch = new EmbeddedChannel(handler); + ByteBuf buf = ch.alloc().buffer(); + ByteBufUtil.copy(AsciiString.of("GET /this/is/a/valid/but/too/long/initial/line HTTP/1.1"), buf); + buf.writeByte(HttpConstants.LF); + buf.writeByte(HttpConstants.LF); ch.writeInbound(buf); ch.flushInbound(); - }); - writeFuture.get(); - assertThat(dispatchThrowableReference.get().toString(), containsString("HTTP line is larger than")); - assertThat(badDispatchInvocationCount.get(), is(2)); - assertThat(authnInvocationCount.get(), is(0)); - } - // case 3: invalid header with no colon - { - EmbeddedChannel ch = new EmbeddedChannel(handler); - ByteBuf buf = ch.alloc().buffer(); - ByteBufUtil.copy(AsciiString.of("GET /url HTTP/1.1"), buf); - buf.writeByte(HttpConstants.LF); - ByteBufUtil.copy(AsciiString.of("Host"), buf); - buf.writeByte(HttpConstants.LF); - buf.writeByte(HttpConstants.LF); - var writeFuture = testThreadPool.generic().submit(() -> { + assertThat(dispatchThrowableReference.get().toString(), containsString("HTTP line is larger than")); + assertThat(badDispatchInvocationCount.get(), is(2)); + assertThat(authnInvocationCount.get(), is(0)); + } + // case 3: invalid header with no colon + { + EmbeddedChannel ch = new EmbeddedChannel(handler); + ByteBuf buf = ch.alloc().buffer(); + ByteBufUtil.copy(AsciiString.of("GET /url HTTP/1.1"), buf); + buf.writeByte(HttpConstants.LF); + ByteBufUtil.copy(AsciiString.of("Host"), buf); + buf.writeByte(HttpConstants.LF); + buf.writeByte(HttpConstants.LF); ch.writeInbound(buf); ch.flushInbound(); - }); - writeFuture.get(); - assertThat(dispatchThrowableReference.get().toString(), containsString("No colon found")); - assertThat(badDispatchInvocationCount.get(), is(3)); - assertThat(authnInvocationCount.get(), is(0)); - } - // case 4: invalid header longer than max allowed - { - EmbeddedChannel ch = new EmbeddedChannel(handler); - ByteBuf buf = ch.alloc().buffer(); - ByteBufUtil.copy(AsciiString.of("GET /url HTTP/1.1"), buf); - buf.writeByte(HttpConstants.LF); - ByteBufUtil.copy(AsciiString.of("Host: this.looks.like.a.good.url.but.is.longer.than.permitted"), buf); - buf.writeByte(HttpConstants.LF); - buf.writeByte(HttpConstants.LF); - var writeFuture = testThreadPool.generic().submit(() -> { + assertThat(dispatchThrowableReference.get().toString(), containsString("No colon found")); + assertThat(badDispatchInvocationCount.get(), is(3)); + assertThat(authnInvocationCount.get(), is(0)); + } + // case 4: invalid header longer than max allowed + { + EmbeddedChannel ch = new EmbeddedChannel(handler); + ByteBuf buf = ch.alloc().buffer(); + ByteBufUtil.copy(AsciiString.of("GET /url HTTP/1.1"), buf); + buf.writeByte(HttpConstants.LF); + ByteBufUtil.copy(AsciiString.of("Host: this.looks.like.a.good.url.but.is.longer.than.permitted"), buf); + buf.writeByte(HttpConstants.LF); + buf.writeByte(HttpConstants.LF); ch.writeInbound(buf); ch.flushInbound(); - }); - writeFuture.get(); - assertThat(dispatchThrowableReference.get().toString(), containsString("HTTP header is larger than")); - assertThat(badDispatchInvocationCount.get(), is(4)); - assertThat(authnInvocationCount.get(), is(0)); - } - // case 5: invalid header format - { - EmbeddedChannel ch = new EmbeddedChannel(handler); - ByteBuf buf = ch.alloc().buffer(); - ByteBufUtil.copy(AsciiString.of("GET /url HTTP/1.1"), buf); - buf.writeByte(HttpConstants.LF); - ByteBufUtil.copy(AsciiString.of("Host: invalid header value"), buf); - buf.writeByte(0x01); - buf.writeByte(HttpConstants.LF); - buf.writeByte(HttpConstants.LF); - var writeFuture = testThreadPool.generic().submit(() -> { + assertThat(dispatchThrowableReference.get().toString(), containsString("HTTP header is larger than")); + assertThat(badDispatchInvocationCount.get(), is(4)); + assertThat(authnInvocationCount.get(), is(0)); + } + // case 5: invalid header format + { + EmbeddedChannel ch = new EmbeddedChannel(handler); + ByteBuf buf = ch.alloc().buffer(); + ByteBufUtil.copy(AsciiString.of("GET /url HTTP/1.1"), buf); + buf.writeByte(HttpConstants.LF); + ByteBufUtil.copy(AsciiString.of("Host: invalid header value"), buf); + buf.writeByte(0x01); + buf.writeByte(HttpConstants.LF); + buf.writeByte(HttpConstants.LF); ch.writeInbound(buf); ch.flushInbound(); - }); - writeFuture.get(); - assertThat(dispatchThrowableReference.get().toString(), containsString("Validation failed for header 'Host'")); - assertThat(badDispatchInvocationCount.get(), is(5)); - assertThat(authnInvocationCount.get(), is(0)); - } - // case 6: connection closed before all headers are sent - { - EmbeddedChannel ch = new EmbeddedChannel(handler); - ByteBuf buf = ch.alloc().buffer(); - ByteBufUtil.copy(AsciiString.of("GET /url HTTP/1.1"), buf); - buf.writeByte(HttpConstants.LF); - ByteBufUtil.copy(AsciiString.of("Host: localhost"), buf); - buf.writeByte(HttpConstants.LF); - testThreadPool.generic().submit(() -> { + assertThat(dispatchThrowableReference.get().toString(), containsString("Validation failed for header 'Host'")); + assertThat(badDispatchInvocationCount.get(), is(5)); + assertThat(authnInvocationCount.get(), is(0)); + } + // case 6: connection closed before all headers are sent + { + EmbeddedChannel ch = new EmbeddedChannel(handler); + ByteBuf buf = ch.alloc().buffer(); + ByteBufUtil.copy(AsciiString.of("GET /url HTTP/1.1"), buf); + buf.writeByte(HttpConstants.LF); + ByteBufUtil.copy(AsciiString.of("Host: localhost"), buf); + buf.writeByte(HttpConstants.LF); ch.writeInbound(buf); ch.flushInbound(); - }).get(); - testThreadPool.generic().submit(() -> ch.close().get()).get(); - assertThat(authnInvocationCount.get(), is(0)); - } + safeGet(ch.close()); + assertThat(authnInvocationCount.get(), is(0)); + } + })); } finally { testThreadPool.shutdownNow(); } @@ -717,130 +696,125 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th } ) ) { - final ChannelHandler handler = transport.configureServerChannelHandler(); - final EmbeddedChannel ch = new EmbeddedChannel(handler); - // OPTIONS request with fixed length content written in one chunk - { - ByteBuf buf = ch.alloc().buffer(); - ByteBufUtil.copy(AsciiString.of("OPTIONS /url/whatever/fixed-length-single-chunk HTTP/1.1"), buf); - buf.writeByte(HttpConstants.LF); - if (randomBoolean()) { - ByteBufUtil.copy(AsciiString.of("Host: localhost"), buf); + safeGet(testThreadPool.generic().submit(() -> { + final ChannelHandler handler = transport.configureServerChannelHandler(); + final EmbeddedChannel ch = new EmbeddedChannel(handler); + // OPTIONS request with fixed length content written in one chunk + { + ByteBuf buf = ch.alloc().buffer(); + ByteBufUtil.copy(AsciiString.of("OPTIONS /url/whatever/fixed-length-single-chunk HTTP/1.1"), buf); buf.writeByte(HttpConstants.LF); - } - if (randomBoolean()) { - ByteBufUtil.copy(AsciiString.of("Accept: */*"), buf); + if (randomBoolean()) { + ByteBufUtil.copy(AsciiString.of("Host: localhost"), buf); + buf.writeByte(HttpConstants.LF); + } + if (randomBoolean()) { + ByteBufUtil.copy(AsciiString.of("Accept: */*"), buf); + buf.writeByte(HttpConstants.LF); + } + if (randomBoolean()) { + ByteBufUtil.copy(AsciiString.of("Content-Encoding: gzip"), buf); + buf.writeByte(HttpConstants.LF); + } + if (randomBoolean()) { + ByteBufUtil.copy( + AsciiString.of("Content-Type: " + randomFrom("text/plain; charset=utf-8", "application/json; charset=utf-8")), + buf + ); + buf.writeByte(HttpConstants.LF); + } + String content = randomAlphaOfLengthBetween(4, 1024); + // having a "Content-Length" request header is what makes it "fixed length" + ByteBufUtil.copy(AsciiString.of("Content-Length: " + content.length()), buf); buf.writeByte(HttpConstants.LF); - } - if (randomBoolean()) { - ByteBufUtil.copy(AsciiString.of("Content-Encoding: gzip"), buf); + // end of headers buf.writeByte(HttpConstants.LF); - } - if (randomBoolean()) { - ByteBufUtil.copy( - AsciiString.of("Content-Type: " + randomFrom("text/plain; charset=utf-8", "application/json; charset=utf-8")), - buf - ); - buf.writeByte(HttpConstants.LF); - } - String content = randomAlphaOfLengthBetween(4, 1024); - // having a "Content-Length" request header is what makes it "fixed length" - ByteBufUtil.copy(AsciiString.of("Content-Length: " + content.length()), buf); - buf.writeByte(HttpConstants.LF); - // end of headers - buf.writeByte(HttpConstants.LF); - ByteBufUtil.copy(AsciiString.of(content), buf); - // write everything in one single chunk - testThreadPool.generic().submit(() -> { + ByteBufUtil.copy(AsciiString.of(content), buf); + // write everything in one single chunk ch.writeInbound(buf); ch.flushInbound(); - }).get(); - ch.runPendingTasks(); - Throwable badRequestCause = badRequestCauseReference.get(); - assertThat(badRequestCause, instanceOf(HttpHeadersValidationException.class)); - assertThat(badRequestCause.getCause(), instanceOf(ElasticsearchException.class)); - assertThat(((ElasticsearchException) badRequestCause.getCause()).status(), is(RestStatus.BAD_REQUEST)); - assertThat( - ((ElasticsearchException) badRequestCause.getCause()).getDetailedMessage(), - containsString("OPTIONS requests with a payload body are not supported") - ); - } - { - ByteBuf buf = ch.alloc().buffer(); - ByteBufUtil.copy(AsciiString.of("OPTIONS /url/whatever/chunked-transfer?encoding HTTP/1.1"), buf); - buf.writeByte(HttpConstants.LF); - if (randomBoolean()) { - ByteBufUtil.copy(AsciiString.of("Host: localhost"), buf); - buf.writeByte(HttpConstants.LF); - } - if (randomBoolean()) { - ByteBufUtil.copy(AsciiString.of("Accept: */*"), buf); - buf.writeByte(HttpConstants.LF); - } - if (randomBoolean()) { - ByteBufUtil.copy(AsciiString.of("Content-Encoding: gzip"), buf); - buf.writeByte(HttpConstants.LF); - } - if (randomBoolean()) { - ByteBufUtil.copy( - AsciiString.of("Content-Type: " + randomFrom("text/plain; charset=utf-8", "application/json; charset=utf-8")), - buf + ch.runPendingTasks(); + Throwable badRequestCause = badRequestCauseReference.get(); + assertThat(badRequestCause, instanceOf(HttpHeadersValidationException.class)); + assertThat(badRequestCause.getCause(), instanceOf(ElasticsearchException.class)); + assertThat(((ElasticsearchException) badRequestCause.getCause()).status(), is(RestStatus.BAD_REQUEST)); + assertThat( + ((ElasticsearchException) badRequestCause.getCause()).getDetailedMessage(), + containsString("OPTIONS requests with a payload body are not supported") ); - buf.writeByte(HttpConstants.LF); } - // do not write a "Content-Length" header to make the request "variable length" - if (randomBoolean()) { - ByteBufUtil.copy(AsciiString.of("Transfer-Encoding: " + randomFrom("chunked", "gzip, chunked")), buf); - } else { - ByteBufUtil.copy(AsciiString.of("Transfer-Encoding: chunked"), buf); - } - buf.writeByte(HttpConstants.LF); - buf.writeByte(HttpConstants.LF); - // maybe append some chunks as well - String[] contentParts = randomArray(0, 4, String[]::new, () -> randomAlphaOfLengthBetween(1, 64)); - for (String content : contentParts) { - ByteBufUtil.copy(AsciiString.of(Integer.toHexString(content.length())), buf); - buf.writeByte(HttpConstants.CR); + { + ByteBuf buf = ch.alloc().buffer(); + ByteBufUtil.copy(AsciiString.of("OPTIONS /url/whatever/chunked-transfer?encoding HTTP/1.1"), buf); buf.writeByte(HttpConstants.LF); - ByteBufUtil.copy(AsciiString.of(content), buf); - buf.writeByte(HttpConstants.CR); + if (randomBoolean()) { + ByteBufUtil.copy(AsciiString.of("Host: localhost"), buf); + buf.writeByte(HttpConstants.LF); + } + if (randomBoolean()) { + ByteBufUtil.copy(AsciiString.of("Accept: */*"), buf); + buf.writeByte(HttpConstants.LF); + } + if (randomBoolean()) { + ByteBufUtil.copy(AsciiString.of("Content-Encoding: gzip"), buf); + buf.writeByte(HttpConstants.LF); + } + if (randomBoolean()) { + ByteBufUtil.copy( + AsciiString.of("Content-Type: " + randomFrom("text/plain; charset=utf-8", "application/json; charset=utf-8")), + buf + ); + buf.writeByte(HttpConstants.LF); + } + // do not write a "Content-Length" header to make the request "variable length" + if (randomBoolean()) { + ByteBufUtil.copy(AsciiString.of("Transfer-Encoding: " + randomFrom("chunked", "gzip, chunked")), buf); + } else { + ByteBufUtil.copy(AsciiString.of("Transfer-Encoding: chunked"), buf); + } buf.writeByte(HttpConstants.LF); - } - testThreadPool.generic().submit(() -> { + buf.writeByte(HttpConstants.LF); + // maybe append some chunks as well + String[] contentParts = randomArray(0, 4, String[]::new, () -> randomAlphaOfLengthBetween(1, 64)); + for (String content : contentParts) { + ByteBufUtil.copy(AsciiString.of(Integer.toHexString(content.length())), buf); + buf.writeByte(HttpConstants.CR); + buf.writeByte(HttpConstants.LF); + ByteBufUtil.copy(AsciiString.of(content), buf); + buf.writeByte(HttpConstants.CR); + buf.writeByte(HttpConstants.LF); + } ch.writeInbound(buf); ch.flushInbound(); - }).get(); - // append some more chunks as well - ByteBuf buf2 = ch.alloc().buffer(); - contentParts = randomArray(1, 4, String[]::new, () -> randomAlphaOfLengthBetween(1, 64)); - for (String content : contentParts) { - ByteBufUtil.copy(AsciiString.of(Integer.toHexString(content.length())), buf2); + ByteBuf buf2 = ch.alloc().buffer(); + contentParts = randomArray(1, 4, String[]::new, () -> randomAlphaOfLengthBetween(1, 64)); + for (String content : contentParts) { + ByteBufUtil.copy(AsciiString.of(Integer.toHexString(content.length())), buf2); + buf2.writeByte(HttpConstants.CR); + buf2.writeByte(HttpConstants.LF); + ByteBufUtil.copy(AsciiString.of(content), buf2); + buf2.writeByte(HttpConstants.CR); + buf2.writeByte(HttpConstants.LF); + } + // finish chunked request + ByteBufUtil.copy(AsciiString.of("0"), buf2); buf2.writeByte(HttpConstants.CR); buf2.writeByte(HttpConstants.LF); - ByteBufUtil.copy(AsciiString.of(content), buf2); buf2.writeByte(HttpConstants.CR); buf2.writeByte(HttpConstants.LF); - } - // finish chunked request - ByteBufUtil.copy(AsciiString.of("0"), buf2); - buf2.writeByte(HttpConstants.CR); - buf2.writeByte(HttpConstants.LF); - buf2.writeByte(HttpConstants.CR); - buf2.writeByte(HttpConstants.LF); - testThreadPool.generic().submit(() -> { ch.writeInbound(buf2); ch.flushInbound(); - }).get(); - ch.runPendingTasks(); - Throwable badRequestCause = badRequestCauseReference.get(); - assertThat(badRequestCause, instanceOf(HttpHeadersValidationException.class)); - assertThat(badRequestCause.getCause(), instanceOf(ElasticsearchException.class)); - assertThat(((ElasticsearchException) badRequestCause.getCause()).status(), is(RestStatus.BAD_REQUEST)); - assertThat( - ((ElasticsearchException) badRequestCause.getCause()).getDetailedMessage(), - containsString("OPTIONS requests with a payload body are not supported") - ); - } + ch.runPendingTasks(); + Throwable badRequestCause = badRequestCauseReference.get(); + assertThat(badRequestCause, instanceOf(HttpHeadersValidationException.class)); + assertThat(badRequestCause.getCause(), instanceOf(ElasticsearchException.class)); + assertThat(((ElasticsearchException) badRequestCause.getCause()).status(), is(RestStatus.BAD_REQUEST)); + assertThat( + ((ElasticsearchException) badRequestCause.getCause()).getDetailedMessage(), + containsString("OPTIONS requests with a payload body are not supported") + ); + } + })); } finally { testThreadPool.shutdownNow(); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java index 9ff23e5e7b9d8..74b02c1d63bbf 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java @@ -115,7 +115,6 @@ public class SimpleSecurityNetty4ServerTransportTests extends AbstractSimpleTran @Override protected Transport build(Settings settings, TransportVersion version, ClusterSettings clusterSettings, boolean doHandshake) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); - NetworkService networkService = new NetworkService(Collections.emptyList()); Settings settings1 = Settings.builder().put(settings).put("xpack.security.transport.ssl.enabled", true).build(); return new TestSecurityNetty4ServerTransport( settings1,