Skip to content

Commit

Permalink
chore: fix sourcer to read all responses (numaproj#168)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Nov 6, 2024
1 parent 2f78f37 commit 332006c
Showing 1 changed file with 29 additions and 26 deletions.
55 changes: 29 additions & 26 deletions pkg/sourcer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,32 +146,35 @@ func (fs *Service) receiveReadRequests(ctx context.Context, stream sourcepb.Sour

// processReadData processes the read data and sends it to the client.
func (fs *Service) processReadData(ctx context.Context, stream sourcepb.Source_ReadFnServer, messageCh <-chan Message) error {
select {
case <-ctx.Done():
return ctx.Err()
case msg, ok := <-messageCh:
if !ok {
break
}
offset := &sourcepb.Offset{
Offset: msg.Offset().Value(),
PartitionId: msg.Offset().PartitionId(),
}
element := &sourcepb.ReadResponse{
Result: &sourcepb.ReadResponse_Result{
Payload: msg.Value(),
Offset: offset,
EventTime: timestamppb.New(msg.EventTime()),
Keys: msg.Keys(),
Headers: msg.Headers(),
},
Status: &sourcepb.ReadResponse_Status{
Eot: false,
Code: 0,
},
}
if err := stream.Send(element); err != nil {
return err
readLoop:
for {
select {
case <-ctx.Done():
return ctx.Err()
case msg, ok := <-messageCh:
if !ok {
break readLoop
}
offset := &sourcepb.Offset{
Offset: msg.Offset().Value(),
PartitionId: msg.Offset().PartitionId(),
}
element := &sourcepb.ReadResponse{
Result: &sourcepb.ReadResponse_Result{
Payload: msg.Value(),
Offset: offset,
EventTime: timestamppb.New(msg.EventTime()),
Keys: msg.Keys(),
Headers: msg.Headers(),
},
Status: &sourcepb.ReadResponse_Status{
Eot: false,
Code: 0,
},
}
if err := stream.Send(element); err != nil {
return err
}
}
}
err := stream.Send(&sourcepb.ReadResponse{
Expand Down

0 comments on commit 332006c

Please sign in to comment.