Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote Vector Index Build Component -- Repository Integration Low Level Design #2465

Open
jed326 opened this issue Jan 29, 2025 · 8 comments
Labels
Features Introduces a new unit of functionality that satisfies a requirement Roadmap:Vector Database/GenAI Project-wide roadmap label

Comments

@jed326
Copy link
Contributor

jed326 commented Jan 29, 2025

Overview

This is the low level design follow-up to #2392. Specifically, the following are covered:

  1. Control knobs for configuring and enabling the feature
  2. Metrics
  3. Detailed analysis of repository upload/download

User Experience

First, we define the user experience for how a user can configure and control the remote vector build feature.

Vector Repository Configuration

We will expose a cluster settings for users to indicate the name of the (registered) repository they would like to use as the vector repository. A user must register the repository on their own.

knn.remote_build.vector_repo (String)

It is difficult to get a reference to the RepositoryService from KNNSettings, so for now we will not validate that the repository specified here is registered. We will leave this for #2464.

From #2392 we outlined that we will use the vectors path outside of the indices path to avoid collision with any snapshots, so a user will still be able to use this repository for snapshots if they wish (although we will recommend them not to).

There are many knobs exposed by the specific repository implementations that can be used for performance tuning. For example, repository-s3 allows configuring the chunk size, buffer size, etc (https://opensearch.org/docs/latest/api-reference/snapshots/create-repository/#s3-repository). We will also perform performance benchmarks and performance tuning suggestions in #2464.

Feature Controls

The various settings with which a user can enable/disable this feature are listed below.

  1. Feature flag in KNNFeatureFlag. This will be internal only and removed before GA.
knn.feature.remote_build.enabled (boolean)
  1. Index Setting to enable/disable the feature. We will explore a field level setting as a part of Remote Vector Index Build Component -- Future Work #2464.
index.knn.remote_build.enabled (boolean)
  1. Index threshold settings to control the minimum vector data size required to use the remote vector build feature. For example, a segment with 1k 5 dimension vectors would be roughly the same size as a segment with 5 1k dimension vectors.
index.knn.remote_build.min_segment_size (int)

Visibility

Because we are giving field level controls over this feature, ideally we also should provide field level visibility/metrics. However, since k-nn stats today only supports node level stats, in the first version we will provide only node level stats and users will need to reference the node level stats to understand if the remote vector index build feature is being successfully used. Shard/index level metrics will be explored as a part of #2464.

Previously outlined metrics:

  1. Data Upload/Download Volume
  2. Data Upload/Download Time
  3. Upload/Download Success Count
  4. Upload/Download Failure Count
  5. Upload/Download Success Rate, computed from [3], [4]
  6. Specific repository implementations also provide their own metrics, for example for repository-s3

Proposed Metric Names

  1. Repository_Write_Volume / Repository_Read_Volume
    1. This can be measured from the vectors writer as we know the length of the blob being read/written each time
  2. Repository_Write_Time / Repository_Read_Time
    1. This can be measured from the vectors writer as well
  3. Repository_Write_Success_Count / Repository_Read_Success_Count
    1. This can be measured from the vectors writer as well
  4. Repository_Write_Failure_Count / Repository_Read_Failure_Count
    1. This can be measured from the vectors writer as well
  5. This does not need to be a separate metric
  6. We will explore adding additional repository metrics as Remote Vector Index Build Component -- Future Work #2464

Additionally, we will also add corresponding "remote" versions of the existing merge/refresh metrics:

REFRESH("refresh"),
MERGE("merge"),

Implementation

This section covers the low level abstractions that we will implement. We will create a new RemoteVectorIndexBuilder component in the KNN Plugin that will manage both the RepositoriesService as well as the new remote vector service client. Below is an overview of how the new components will fit into the existing k-NN plugin:

Image

A reference to this new RemoteVectorIndexBuilder component will be passed down to the vectors writers, and the RemoteVectorIndexBuilder will be responsible for:

  1. Uploading the required files to the remote object store
  2. Triggering the vector build
  3. Awaiting on the build to complete
  4. Downloading the constructed graph file
  5. Writing the downloaded file to disk in the expected Lucene codec format

Repositories Service

This section contains all of the specific low level design decisions related to the repository service and reading/writing vectors to a repository.

Converting Vector Values to InputStream

The OpenSearch repository interface uses the java InputStream interface to read and write from a given repository, which implements methods that can read an arbitrary number of bytes in a sequential manner (see javadoc). In the KNN plugin we maintain a generic KNNVectorValues class that acts as an abstraction over the various Lucene DocIdSetIterators (for example FloatVectorValues), and this KNNVectorValues is what we use in the native engines index writer to iterate over the vectors during merge or flush. This also means that ultimately we need to convert the KNNVectorValues into an InputStream in order to write the vector values to the repository, which we will reference for now as the VectorValuesInputStream. The rest of this section discusses how we can do so while keeping memory consumption in check.

A naive solution would be to iterate through all of the vectors, copying them into an Array, and then use that Array as the backing data structure to read from in the VectorValuesInputStream, however the size of this array would run into memory limitations in larger segments. For example, a 1k dimension fp32 vector takes 4k bytes to store each vector, so with 10m documents the vectors would take up 40GB, exceeding the heap space of many typical setups.

If we look at the repository-s3 implementation as an example, when trying to upload large blobs S3 will split the blob into multiple configurable buffer sizes and perform a multi part upload (ref). By default the repository-s3 buffer size is between 5mb and 5% of jvm heap size (ref), so this also means that the VectorValuesInputStream will not need to read more than this buffer size from KNNVectorValues at a time.

In the POC, we back the VectorValuesInputStream with a single vector sized byte buffer which is refilled one vector at a time so from the analysis above we can instead maintain a buffer between 5mb and 1% of jvm heap size, similar to what is being done for the vector transfer to JNI today (see: #1506).

Parallel Blob Upload

Today there are 2 write methods in the BlobContainer interface, asyncBlobUpload and writeBlob , differences outlined below:

asyncBlobUpload

  • Supports uploading multiple InputStreams in parallel via a queueing mechanism
  • Implemented by AsyncMultiStreamBlobContainer, which only repository-s3 implements today
  • Used asynchronously by remote store to upload segments on refresh
  • Creates N InputStreams of part size S from the same file

writeBlob

  • “Legacy” blob upload path which uploads a blob serially, must be implemented by all existing repository implementations.
  • Supports multi-part upload, however parts are still uploaded serially from a single InputStream
  • Used by remote store as the fallback when the repository does not support asyncBlobUpload

Remote Store Reference
Original Parallel Upload Design: opensearch-project/OpenSearch#6632

Based on the performance analysis in opensearch-project/OpenSearch#6632, we are going to need the parallel upload feature for performance. In the POC where vectors are buffered 1 by 1, the transfer of ~1.6m 768 dimension vectors only takes ~1 minute to complete, so we can revisit the performance aspect here as needed. This is compared to the 110s the single threaded POC took to upload 4.5GB from opensearch-project/OpenSearch#6632.

First, writeBlob must be used as a fallback regardless otherwise (at least for now), repository-s3 would be the only repository implementation supported by the feature today. Unfortunately for asyncBlobUpload the implementation will not be as straightforward for the following reasons:

  1. Talking specifically about S3 here (as currently this is only relevant to S3), at a high level, the JVM utilization of a single blob upload when using writeBlob is just the part buffer size (which is controllable via repository setting), as parts are uploaded sequentially. However, when using asyncBlobUpload the JVM utilization becomes num_threads * partSize, but neither of these are directly controllable by repository settings today so based on benchmarking results we may need to implement additional knobs here.
  2. Today this multi part upload in parallel is designed with a file on the filesystem in mind as it needs to open multiple InputStreams on the same file in order to perform the upload in parallel (ref). For remote store this is somewhat trivial because the upload is performed post refresh after the segment has been written to disk, however in our case we only have the KNNVectorValues which itself is only a sequential iterator.
  3. Another key problem from [2] is all of the parallel upload components require a InputStreamContainer object which contains the length of the input stream and this length is required in the S3 upload request. However, given a KNNVectorValues iterator with M live docs, if we want to take a subset of K doc ids 1 < K < M it’s impossible to know how many docs are present between doc ids 1 through K without iterating through K due to the potential presence of deleted docs. This is actually how the native engines vector writer gets the live docs count today (ref). This means if we want to split a given KNNVectorValues into N InputStreams, we have to iterate through each part in order to get the content length to be used in the S3 upload.

The specific implementation then of the parallel upload would look like this:

  1. We use a buffering input stream implementation that loads the vector values into memory in small chunk sizes, similar to the existing POC to ensure we keep JVM utilization in check.
  2. Each InputStream is responsible for a subset of K doc ids and the corresponding InputStreamContainer will know the number of live docs within each part of documents via iterating through the vector values.
  3. To upload N InputStreams in parallel we will use N corresponding KNNVectorValues iterators.

Writing Doc Ids

In addition to the vector values, we also need to write the doc ids to the remote repository. However, unlike for vector values doc ids have a much smaller bounded size, so for the doc ids case we do not need to buffer the doc ids in smaller parts and instead we can take the naive approach above of writing all the doc ids in one go. Below is copied from the same JNI memory improvement issue as above:

Apart from vectors should we also stream the docIds?
I think we should not stream docids. Here are some stats, In Lucene we can have at 2^(31)-1 docIds and an Integer takes 4 bytes so total required memory to hold all docids is 0.5GB((2^(31)-1)/2^30). We do this we will doing over engineering in our solution.

Read InputStream to IndexOutput

Similar to writeBlob, the repository readBlob method also returns an InputStream, and we must use this InputStream to write to an IndexOutput to write the graph file to disk. Also similarly we need to do so in a way that keeps the jvm utilization under control, so we can take the same approach described above in Converting Vector Values to InputStream to buffer the bytes to disk in chunks. Specific IndexOutput implementations already buffer chunks to the disk however in order to keep our memory management solution generic we should implement this chunking logic within the remote index builder as well so whether or not chunking happens is not dependent on Directory or Repository implementation.

Similar to asyncBlobUpload, there is also a corresponding readBlobAsync to be used for parallel multipart download of a blob. However, this API is still marked as experimental is unused due to limitations related to encryption. For now we will only use the readBlob implementation with a similar buffered approach to writeBlob to keep memory utilization in check. For #2464 there are 2 tracks we can explore:

  1. Explore making multi part download in parallel support encryption. If this is implemented as a part of other efforts then we could get it “for free” as well.
  2. Explore compression techniques to reduce the amount of data that needs to be downloaded. For example if it were possible to download only the structure of the graph without the vector values within it since the vector values are already present locally.
@jed326 jed326 added Features Introduces a new unit of functionality that satisfies a requirement Roadmap:Vector Database/GenAI Project-wide roadmap label labels Jan 29, 2025
@jed326 jed326 removed the untriaged label Jan 29, 2025
@navneet1v
Copy link
Collaborator

@jed326 thanks for putting up the details. index.knn.remote_build.min_segment_size this setting we should see what could be a more friendly name. Giving out details of segment is something I would disagree.

On the download part, we should see how we can parallelize otherwise for larger graphs it will reduce the performance of the system

@jed326
Copy link
Contributor Author

jed326 commented Feb 4, 2025

Thanks @navneet1v, how about instead something like index.knn.remote_build.size_threshold? This seems in line with the existing (and functionally similar) index.knn.advanced.approximate_threshold.

As for download part I've mentioned a few potential improvements in #2464, we will revisit this in the near future based on perf benchmarking.

@jed326
Copy link
Contributor Author

jed326 commented Feb 6, 2025

The first step here is to build an initial POC to validate the following:

  1. Validate that we can wire up the native index writer with the repository service
  2. Validate that we can read from the repository and write to the repository
  3. Validate that we can write VectorValues into binary data in the repository
  4. Validate that we can read a faiss graph from the repository and write it to the IndexOutput
  5. Validate that we can use the existing parallel upload API used by remote store

For this I have created 2 POCs:
POC 1: jed326@3f4a65d -- Covering [1-4]
POC 2: jed326@380d6d9 -- Covering [5]

With the POC 2 I have also done some preliminary benchmarking to validate the parallel upload performance, using 1 shard 0 replica setup on AWS r6g.4xlarge instance type.

Parallel vs Sequential Vectors Dimensions Size Latency (ms)
Parallel 100k 768 293mb 1642
Sequential 100k 768 293mb 2561
Parallel 1m 768 2.9gb 37789
Parallel 10m 768 28.6gb 3718525

Note that the skip implementation for VectorValuesInputStream is currently extremely inefficient which is why we are seeing slowdowns on the larger datasets, however that will be resolved by the next revision.


Tangential to POC and benchmarking, I also want to highlight some of the nuances specific to the S3 parallel upload implementation and list out the various performance tuning knobs.

Chunk Sizing

Today by default a 16mb chunk size is used for parallel uploads (ref), so in the benchmarking above the 2.9GB file was split into ~180 parts. There is no way to directly control the number of parts, however the part size is adjustable. This is configurable via the parallel_multipart_upload.minimum_part_size repository setting, however the minimum part size for multi-part upload is 5mb.

Memory Usage

The number of parts that are uploaded in parallel is determined by the size of the threadpool processing these uploads. This also depends on the priority of the write context we use, but for example the NORMAL priority that remote store uses for segment upload is by default configured with 2x processors, so in the benchmarking above there were 32 threads available to do the upload. This is also configurable via the s3_transfer_queue_consumers setting.

If the repository is configured to allow upload retries, configurable via the s3_upload_retry_enabled repository setting, then in order to support retries each chunk is entirely loaded into memory (ref) in a buffered input stream to support retrying from a specific location in the stream. This means the maximum memory utilization per node is going to be s3_transfer_queue_consumers * parallel_multipart_upload.minimum_part_size, which from our POC above would be 32 * 16mb = 512mb.

VectorValuesInputStream

In the POC I shared above, the VectorValuesInputStream is also implemented (informally) as a buffered input stream where there is a byte buffer that vectors are loaded into as needed. In the final implementation we need to change this, primarily because:

  1. A buffered input stream is going to be needed by any vendor implementation in order to properly support retries, so keeping a buffer in this input stream is duplicated work.
  2. Iterating through the KNNVectorValues is relatively cheap, however loading the vector value is expensive as that will ultimately read the data from an IndexInput. We should not load vectors unless absolutely necessary.
  3. As mentioned above, the existing skip implementation is extremely inefficient as it is buffering through all of the vector values to get to the specified location. We need to support skipping without loading the vector to keep performance in check.

The next step here is to raise a PR with the above considerations in place and do more comprehensive benchmarks on various chunk sizes and potentially various thread counts.

@jed326
Copy link
Contributor Author

jed326 commented Feb 7, 2025

On an updated POC that does not load vectors on InputStream.skip I am seeing just 7019ms to upload the 2.9GB for the 1m vector dataset. Will publish full details and POC code in the near future.

@navneet1v
Copy link
Collaborator

On an updated POC that does not load vectors on InputStream.skip I am seeing just 7019ms to upload the 2.9GB for the 1m vector dataset. Will publish full details and POC code in the near future.

@jed326 is this good or bad as compared to your older runs?

@jed326
Copy link
Contributor Author

jed326 commented Feb 7, 2025

@navneet1v It's much better, the "bad" version took 37789 ms on the same. 7k ms for 2.9 GB is approximately 400 mb/s for upload (which is including all of the disk reads), which is a good speed. We will see if we can push this even higher with tuning in the future.

@navneet1v
Copy link
Collaborator

@navneet1v It's much better, the "bad" version took 37789 ms on the same. 7k ms for 2.9 GB is approximately 400 mb/s for upload (which is including all of the disk reads), which is a good speed. We will see if we can push this even higher with tuning in the future.

Thanks for the update. benchmark numbers look awesome then.

@jed326
Copy link
Contributor Author

jed326 commented Feb 8, 2025

Updated POC: jed326@29a8230

Updated numbers:

Parallel vs Sequential Vectors Dimensions Size Latency (ms)
Parallel 1m 768 ~290mb 7019
Parallel 10m 768 ~29GB 42008

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Features Introduces a new unit of functionality that satisfies a requirement Roadmap:Vector Database/GenAI Project-wide roadmap label
Projects
None yet
Development

No branches or pull requests

2 participants