Skip to content

Commit

Permalink
✨ Add StreamInsert
Browse files Browse the repository at this point in the history
Signed-off-by: Rintaro Okamura <[email protected]>
  • Loading branch information
rinx committed Jan 15, 2021
1 parent 5246446 commit 0c0bf15
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 27 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Current Status

- Using Vald v1 APIs.
- [X] Insert
- [ ] StreamInsert
- [X] StreamInsert
- [X] Search
- [ ] StreamSearch
- [ ] SearchByID
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async fn main() {
env_logger::init();

let addr = "0.0.0.0:8080".parse().unwrap();
let mut vald = ValdImpl::default();
let vald = ValdImpl::default();

vald.initialize().unwrap();

Expand Down
7 changes: 0 additions & 7 deletions src/ngt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,6 @@ impl NGT {
}

pub fn insert(&mut self, id: &str, vec: Vec<f32>) -> Result<(), io::Error> {
let index = match &mut self.index {
Some(index) => index,
None => {
panic!("NGT index is not opened");
}
};

self.insert_vecs
.lock()
.unwrap()
Expand Down
72 changes: 54 additions & 18 deletions src/vald.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,24 +66,12 @@ impl ValdImpl {

Ok(())
}
}

impl Clone for ValdImpl {
fn clone(&self) -> ValdImpl {
ValdImpl {
ngt: self.ngt.clone(),
}
}
}

#[tonic::async_trait]
impl Insert for ValdImpl {
async fn insert(
fn insert_impl(
&self,
request: Request<payload::v1::insert::Request>,
) -> Result<Response<payload::v1::object::Location>, Status> {
let msg = request.get_ref();
let obj = match &msg.vector {
request: &payload::v1::insert::Request,
) -> Result<payload::v1::object::Location, Status> {
let obj = match &request.vector {
Some(o) => o,
None => return Err(Status::invalid_argument("vector is required.")),
};
Expand All @@ -98,19 +86,67 @@ impl Insert for ValdImpl {
ips: vec!["192.168.1.1".to_string()],
};

Ok(Response::new(reply))
Ok(reply)
},
Err(err) => Err(Status::internal(err.to_string())),
}
}
}

impl Clone for ValdImpl {
fn clone(&self) -> ValdImpl {
ValdImpl {
ngt: self.ngt.clone(),
}
}
}

#[tonic::async_trait]
impl Insert for ValdImpl {
async fn insert(
&self,
request: Request<payload::v1::insert::Request>,
) -> Result<Response<payload::v1::object::Location>, Status> {
match self.insert_impl(request.get_ref()) {
Ok(res) => Ok(Response::new(res)),
Err(err) => Err(err),
}
}

type StreamInsertStream = mpsc::Receiver<Result<payload::v1::object::StreamLocation, Status>>;

async fn stream_insert(
&self,
request: Request<Streaming<payload::v1::insert::Request>>,
) -> Result<Response<Self::StreamInsertStream>, Status> {
unimplemented!()
let mut stream = request.into_inner();
let (mut tx, rx) = mpsc::channel(4);
let vald = self.clone();

tokio::spawn(async move {
while let Some(req) = stream.message().await.unwrap() {
let reply = match vald.insert_impl(&req) {
Ok(loc) => payload::v1::object::StreamLocation{
payload: Some(payload::v1::object::stream_location::Payload::Location(loc)),
},
Err(st) => payload::v1::object::StreamLocation{
payload: Some(payload::v1::object::stream_location::Payload::Error(errors::v1::errors::Rpc{
r#type: "".to_string(),
msg: "".to_string(),
details: Vec::new(),
error: st.to_string(),
instance: "".to_string(),
status: 0,
roots: Vec::new(),
})),
},
};

tx.send(Ok(reply)).await.unwrap();
}
});

Ok(Response::new(rx))
}

async fn multi_insert(
Expand Down

0 comments on commit 0c0bf15

Please sign in to comment.