Skip to content

Commit

Permalink
custom codecs upgrade to lucene99 codec
Browse files Browse the repository at this point in the history
Signed-off-by: Sarthak Aggarwal <[email protected]>
  • Loading branch information
sarthakaggarwal97 committed Jan 17, 2024
1 parent 804fe11 commit 678be65
Show file tree
Hide file tree
Showing 13 changed files with 355 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ public CustomCodecService(MapperService mapperService, IndexSettings indexSettin
int compressionLevel = indexSettings.getValue(INDEX_CODEC_COMPRESSION_LEVEL_SETTING);
final MapBuilder<String, Codec> codecs = MapBuilder.<String, Codec>newMapBuilder();
if (mapperService == null) {
codecs.put(ZSTD_CODEC, new ZstdCodec(compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(compressionLevel));
codecs.put(ZSTD_CODEC, new Zstd99Codec(compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDict99Codec(compressionLevel));
} else {
codecs.put(ZSTD_CODEC, new ZstdCodec(mapperService, logger, compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(mapperService, logger, compressionLevel));
codecs.put(ZSTD_CODEC, new Zstd99Codec(mapperService, logger, compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDict99Codec(mapperService, logger, compressionLevel));
}
this.codecs = codecs.immutableMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@

package org.opensearch.index.codec.customcodecs;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.FilterCodec;
import org.apache.lucene.codecs.StoredFieldsFormat;
import org.apache.lucene.codecs.lucene95.Lucene95Codec;
import org.opensearch.index.codec.PerFieldMappingPostingFormatCodec;
import org.opensearch.index.mapper.MapperService;
import org.apache.lucene.backward_codecs.lucene95.Lucene95Codec;

import java.util.Collections;
import java.util.Set;
Expand All @@ -22,6 +19,7 @@
*
* Extends {@link FilterCodec} to reuse the functionality of Lucene Codec.
* Supports two modes zstd and zstd_no_dict.
* Uses Lucene95 as the delegate codec
*
* @opensearch.internal
*/
Expand Down Expand Up @@ -73,40 +71,14 @@ public Set<String> getAliases() {
private final StoredFieldsFormat storedFieldsFormat;

/**
* Creates a new compression codec with the default compression level.
* Creates a new compression codec.
*
* @param mode The compression codec (ZSTD or ZSTDNODICT).
*/
public Lucene95CustomCodec(Mode mode) {
this(mode, DEFAULT_COMPRESSION_LEVEL);
}

/**
* Creates a new compression codec with the given compression level. We use
* lowercase letters when registering the codec so that we remain consistent with
* the other compression codecs: default, lucene_default, and best_compression.
*
* @param mode The compression codec (ZSTD or ZSTDNODICT).
* @param compressionLevel The compression level.
*/
public Lucene95CustomCodec(Mode mode, int compressionLevel) {
public Lucene95CustomCodec(Mode mode) {
super(mode.getCodec(), new Lucene95Codec());
this.storedFieldsFormat = new Lucene95CustomStoredFieldsFormat(mode, compressionLevel);
}

/**
* Creates a new compression codec with the given compression level. We use
* lowercase letters when registering the codec so that we remain consistent with
* the other compression codecs: default, lucene_default, and best_compression.
*
* @param mode The compression codec (ZSTD or ZSTDNODICT).
* @param compressionLevel The compression level.
* @param mapperService The mapper service.
* @param logger The logger.
*/
public Lucene95CustomCodec(Mode mode, int compressionLevel, MapperService mapperService, Logger logger) {
super(mode.getCodec(), new PerFieldMappingPostingFormatCodec(Lucene95Codec.Mode.BEST_SPEED, mapperService, logger));
this.storedFieldsFormat = new Lucene95CustomStoredFieldsFormat(mode, compressionLevel);
this.storedFieldsFormat = new Lucene99CustomStoredFieldsFormat();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.customcodecs;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.FilterCodec;
import org.apache.lucene.codecs.StoredFieldsFormat;
import org.apache.lucene.codecs.lucene99.Lucene99Codec;
import org.opensearch.index.codec.PerFieldMappingPostingFormatCodec;
import org.opensearch.index.mapper.MapperService;

import java.util.Set;

/**
*
* Extends {@link FilterCodec} to reuse the functionality of Lucene Codec.
* Supports two modes zstd and zstd_no_dict.
* Uses Lucene99 as the delegate codec
*
* @opensearch.internal
*/
public abstract class Lucene99CustomCodec extends FilterCodec {

/** Default compression level used for compression */
public static final int DEFAULT_COMPRESSION_LEVEL = 3;

/** Each mode represents a compression algorithm. */
public enum Mode {
/**
* ZStandard mode with dictionary
*/
ZSTD("ZSTD99", Set.of("zstd")),
/**
* ZStandard mode without dictionary
*/
ZSTD_NO_DICT("ZSTDNODICT99", Set.of("zstd_no_dict"));

private final String codec;
private final Set<String> aliases;

Mode(String codec, Set<String> aliases) {
this.codec = codec;
this.aliases = aliases;
}

/**
* Returns the Codec that is registered with Lucene
*/
public String getCodec() {
return codec;
}

/**
* Returns the aliases of the Codec
*/
public Set<String> getAliases() {
return aliases;
}
}

private final StoredFieldsFormat storedFieldsFormat;

/**
* Creates a new compression codec with the default compression level.
*
* @param mode The compression codec (ZSTD or ZSTDNODICT).
*/
public Lucene99CustomCodec(Mode mode) {
this(mode, DEFAULT_COMPRESSION_LEVEL);
}

/**
* Creates a new compression codec with the given compression level. We use
* lowercase letters when registering the codec so that we remain consistent with
* the other compression codecs: default, lucene_default, and best_compression.
*
* @param mode The compression codec (ZSTD or ZSTDNODICT).
* @param compressionLevel The compression level.
*/
public Lucene99CustomCodec(Mode mode, int compressionLevel) {
super(mode.getCodec(), new Lucene99Codec());
this.storedFieldsFormat = new Lucene99CustomStoredFieldsFormat(mode, compressionLevel);
}

/**
* Creates a new compression codec with the given compression level. We use
* lowercase letters when registering the codec so that we remain consistent with
* the other compression codecs: default, lucene_default, and best_compression.
*
* @param mode The compression codec (ZSTD or ZSTDNODICT).
* @param compressionLevel The compression level.
* @param mapperService The mapper service.
* @param logger The logger.
*/
public Lucene99CustomCodec(Mode mode, int compressionLevel, MapperService mapperService, Logger logger) {
super(mode.getCodec(), new PerFieldMappingPostingFormatCodec(Lucene99Codec.Mode.BEST_SPEED, mapperService, logger));
this.storedFieldsFormat = new Lucene99CustomStoredFieldsFormat(mode, compressionLevel);
}

@Override
public StoredFieldsFormat storedFieldsFormat() {
return storedFieldsFormat;
}

@Override
public String toString() {
return getClass().getSimpleName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
import java.util.Objects;

/** Stored field format used by pluggable codec */
public class Lucene95CustomStoredFieldsFormat extends StoredFieldsFormat {
public class Lucene99CustomStoredFieldsFormat extends StoredFieldsFormat {

/** A key that we use to map to a mode */
public static final String MODE_KEY = Lucene95CustomStoredFieldsFormat.class.getSimpleName() + ".mode";
public static final String LUCENE95_MODE_KEY = "Lucene95CustomStoredFieldsFormat.mode";
public static final String MODE_KEY = Lucene99CustomStoredFieldsFormat.class.getSimpleName() + ".mode";

protected static final int ZSTD_BLOCK_LENGTH = 10 * 48 * 1024;
protected static final int ZSTD_MAX_DOCS_PER_BLOCK = 4096;
Expand All @@ -34,21 +35,21 @@ public class Lucene95CustomStoredFieldsFormat extends StoredFieldsFormat {
private final CompressionMode zstdCompressionMode;
private final CompressionMode zstdNoDictCompressionMode;

private final Lucene95CustomCodec.Mode mode;
private final Lucene99CustomCodec.Mode mode;
private final int compressionLevel;

/** default constructor */
public Lucene95CustomStoredFieldsFormat() {
this(Lucene95CustomCodec.Mode.ZSTD, Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL);
public Lucene99CustomStoredFieldsFormat() {
this(Lucene99CustomCodec.Mode.ZSTD, Lucene99CustomCodec.DEFAULT_COMPRESSION_LEVEL);
}

/**
* Creates a new instance.
*
* @param mode The mode represents ZSTD or ZSTDNODICT
*/
public Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode mode) {
this(mode, Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL);
public Lucene99CustomStoredFieldsFormat(Lucene99CustomCodec.Mode mode) {
this(mode, Lucene99CustomCodec.DEFAULT_COMPRESSION_LEVEL);
}

/**
Expand All @@ -57,7 +58,7 @@ public Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode mode) {
* @param mode The mode represents ZSTD or ZSTDNODICT
* @param compressionLevel The compression level for the mode.
*/
public Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode mode, int compressionLevel) {
public Lucene99CustomStoredFieldsFormat(Lucene99CustomCodec.Mode mode, int compressionLevel) {
this.mode = Objects.requireNonNull(mode);
this.compressionLevel = compressionLevel;
zstdCompressionMode = new ZstdCompressionMode(compressionLevel);
Expand All @@ -73,12 +74,17 @@ public Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode mode, int compr
*/
@Override
public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo si, FieldInfos fn, IOContext context) throws IOException {
String value = si.getAttribute(MODE_KEY);
if (value == null) {
if (si.getAttribute(LUCENE95_MODE_KEY) != null) {
String value = si.getAttribute(LUCENE95_MODE_KEY);
Lucene95CustomCodec.Mode mode = Lucene95CustomCodec.Mode.valueOf(value);
return impl(mode).fieldsReader(directory, si, fn, context);
} else if (si.getAttribute(MODE_KEY) !=null){
String value = si.getAttribute(MODE_KEY);
Lucene99CustomCodec.Mode mode = Lucene99CustomCodec.Mode.valueOf(value);
return impl(mode).fieldsReader(directory, si, fn, context);
} else {
throw new IllegalStateException("missing value for " + MODE_KEY + " for segment: " + si.name);
}
Lucene95CustomCodec.Mode mode = Lucene95CustomCodec.Mode.valueOf(value);
return impl(mode).fieldsReader(directory, si, fn, context);
}

/**
Expand All @@ -98,31 +104,40 @@ public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo si, IOCo
return impl(mode).fieldsWriter(directory, si, context);
}

StoredFieldsFormat impl(Lucene99CustomCodec.Mode mode) {
switch (mode) {
case ZSTD:
return getCustomCompressingStoredFieldsFormat("CustomStoredFieldsZstd", this.zstdCompressionMode);
case ZSTD_NO_DICT:
return getCustomCompressingStoredFieldsFormat("CustomStoredFieldsZstdNoDict", this.zstdNoDictCompressionMode);
default:
throw new AssertionError();
}
}

StoredFieldsFormat impl(Lucene95CustomCodec.Mode mode) {
switch (mode) {
case ZSTD:
case ZSTD_DEPRECATED:
return new Lucene90CompressingStoredFieldsFormat(
"CustomStoredFieldsZstd",
zstdCompressionMode,
ZSTD_BLOCK_LENGTH,
ZSTD_MAX_DOCS_PER_BLOCK,
ZSTD_BLOCK_SHIFT
);
return getCustomCompressingStoredFieldsFormat("CustomStoredFieldsZstd", this.zstdCompressionMode);
case ZSTD_NO_DICT:
return new Lucene90CompressingStoredFieldsFormat(
"CustomStoredFieldsZstdNoDict",
zstdNoDictCompressionMode,
ZSTD_BLOCK_LENGTH,
ZSTD_MAX_DOCS_PER_BLOCK,
ZSTD_BLOCK_SHIFT
);
return getCustomCompressingStoredFieldsFormat("CustomStoredFieldsZstdNoDict", this.zstdNoDictCompressionMode);
default:
throw new AssertionError();
}
}

public Lucene95CustomCodec.Mode getMode() {
private StoredFieldsFormat getCustomCompressingStoredFieldsFormat(String formatName, CompressionMode compressionMode) {
return new Lucene90CompressingStoredFieldsFormat(
formatName,
compressionMode,
ZSTD_BLOCK_LENGTH,
ZSTD_MAX_DOCS_PER_BLOCK,
ZSTD_BLOCK_SHIFT
);
}

public Lucene99CustomCodec.Mode getMode() {
return mode;
}

Expand All @@ -134,7 +149,7 @@ public int getCompressionLevel() {
}

public CompressionMode getCompressionMode() {
return mode == Lucene95CustomCodec.Mode.ZSTD_NO_DICT ? zstdNoDictCompressionMode : zstdCompressionMode;
return mode == Lucene99CustomCodec.Mode.ZSTD_NO_DICT ? zstdNoDictCompressionMode : zstdCompressionMode;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.customcodecs;

import org.opensearch.common.settings.Setting;
import org.opensearch.index.codec.CodecAliases;
import org.opensearch.index.codec.CodecSettings;
import org.opensearch.index.engine.EngineConfig;

import java.util.Set;

/**
* ZstdCodec provides ZSTD compressor using the <a href="https://github.com/luben/zstd-jni">zstd-jni</a> library.
*/
public class Zstd95Codec extends Lucene95CustomCodec implements CodecSettings, CodecAliases {

/**
* Creates a new ZstdCodec instance with the default compression level.
*/
public Zstd95Codec() {
super(Mode.ZSTD);
}


/** The name for this codec. */
@Override
public String toString() {
return getClass().getSimpleName();
}

@Override
public boolean supports(Setting<?> setting) {
return setting.equals(EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING);
}

@Override
public Set<String> aliases() {
return Mode.ZSTD.getAliases();
}
}
Loading

0 comments on commit 678be65

Please sign in to comment.