Skip to content

Commit

Permalink
fix: handle empty metadata->resource in job spec & added grpc logger … (
Browse files Browse the repository at this point in the history
#196)

* fix: handle empty metadata->resource in job spec & added grpc logger for stream interceptor

* fix: added same situation in test case
  • Loading branch information
Mryashbhardwaj authored Feb 29, 2024
1 parent 9be10b5 commit a568ca3
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 2 deletions.
10 changes: 8 additions & 2 deletions core/job/handler/v1beta1/job_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,14 @@ func toMetadata(jobMetadata *pb.JobMetadata) (*job.Metadata, error) {

if jobMetadata.Resource != nil {
metadataResourceProto := jobMetadata.Resource
request := job.NewMetadataResourceConfig(metadataResourceProto.Request.Cpu, metadataResourceProto.Request.Memory)
limit := job.NewMetadataResourceConfig(metadataResourceProto.Limit.Cpu, metadataResourceProto.Limit.Memory)
var request *job.MetadataResourceConfig
if metadataResourceProto.Request != nil {
request = job.NewMetadataResourceConfig(metadataResourceProto.Request.Cpu, metadataResourceProto.Request.Memory)
}
var limit *job.MetadataResourceConfig
if metadataResourceProto.Limit != nil {
limit = job.NewMetadataResourceConfig(metadataResourceProto.Limit.Cpu, metadataResourceProto.Limit.Memory)
}
resourceMetadata := job.NewResourceMetadata(request, limit)
metadataBuilder = metadataBuilder.WithResource(resourceMetadata)
}
Expand Down
17 changes: 17 additions & 0 deletions core/job/handler/v1beta1/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,23 @@ func TestNewJobHandler(t *testing.T) {
WindowSize: jobWindow.GetSize(),
WindowOffset: jobWindow.GetOffset(),
WindowTruncateTo: jobWindow.GetTruncateTo(),
Config: []*pb.JobConfigItem{},
Window: &pb.JobSpecification_Window{},
Dependencies: []*pb.JobDependency{},
Assets: map[string]string{},
Hooks: []*pb.JobSpecHook{},
Description: "",
Labels: map[string]string{},
Behavior: &pb.JobSpecification_Behavior{Notify: []*pb.JobSpecification_Behavior_Notifiers{}},
Metadata: &pb.JobMetadata{
Resource: &pb.JobSpecMetadataResource{
Request: nil,
Limit: nil,
},
Airflow: &pb.JobSpecMetadataAirflow{},
},
Destination: "",
Sources: []string{},
},
{
Version: int32(jobVersion),
Expand Down
1 change: 1 addition & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func setupGRPCServer(l log.Logger) (*grpc.Server, error) {
),
grpc_middleware.WithStreamServerChain(
otelgrpc.StreamServerInterceptor(),
grpc_logrus.StreamServerInterceptor(grpcLogrusEntry, opts...),
grpc_prometheus.StreamServerInterceptor,
grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(recoverPanic)),
),
Expand Down

0 comments on commit a568ca3

Please sign in to comment.