diff --git a/src/aggregator/aggregator/handler/config.go b/src/aggregator/aggregator/handler/config.go index aff25d1838..bcd40092cc 100644 --- a/src/aggregator/aggregator/handler/config.go +++ b/src/aggregator/aggregator/handler/config.go @@ -216,7 +216,7 @@ type storagePolicyFilterConfiguration struct { } func (c storagePolicyFilterConfiguration) NewConsumerServiceFilter() (services.ServiceID, producer.FilterFunc) { - return c.ServiceID.NewServiceID(), writer.NewStoragePolicyFilter(c.StoragePolicies) + return c.ServiceID.NewServiceID(), writer.NewStoragePolicyFilter(c.StoragePolicies, producer.StaticConfig) } type percentageFilterConfiguration struct { @@ -225,7 +225,7 @@ type percentageFilterConfiguration struct { } func (c percentageFilterConfiguration) NewConsumerServiceFilter() (services.ServiceID, producer.FilterFunc) { - return c.ServiceID.NewServiceID(), filter.NewPercentageFilter(c.Percentage) + return c.ServiceID.NewServiceID(), filter.NewPercentageFilter(c.Percentage, producer.StaticConfig) } // ConsumerServiceFilterConfiguration - exported to be able to write unit tests @@ -236,7 +236,7 @@ type ConsumerServiceFilterConfiguration struct { // NewConsumerServiceFilter - exported to be able to write unit tests func (c ConsumerServiceFilterConfiguration) NewConsumerServiceFilter() (services.ServiceID, producer.FilterFunc) { - return c.ServiceID.NewServiceID(), filter.NewShardSetFilter(c.ShardSet) + return c.ServiceID.NewServiceID(), filter.NewShardSetFilter(c.ShardSet, producer.StaticConfig) } // StaticBackendConfiguration configures a static backend as a flush handler. diff --git a/src/aggregator/aggregator/handler/filter/percentageFilter.go b/src/aggregator/aggregator/handler/filter/percentageFilter.go index dfac9b62ba..1e1d99b873 100644 --- a/src/aggregator/aggregator/handler/filter/percentageFilter.go +++ b/src/aggregator/aggregator/handler/filter/percentageFilter.go @@ -32,7 +32,7 @@ type percentageFilter struct { } // NewPercentageFilter creates a filter based on percentage. -func NewPercentageFilter(percentage float64) producer.FilterFunc { +func NewPercentageFilter(percentage float64, configSource producer.FilterFuncConfigSourceType) producer.FilterFunc { rate := uint32(_maxRate) if percentage <= 0 { rate = 0 @@ -41,7 +41,7 @@ func NewPercentageFilter(percentage float64) producer.FilterFunc { } f := percentageFilter{rate: rate} - return f.Filter + return producer.NewFilterFunc(f.Filter, producer.PercentageFilter, configSource) } func (f percentageFilter) Filter(_ producer.Message) bool { diff --git a/src/aggregator/aggregator/handler/filter/percentageFilter_test.go b/src/aggregator/aggregator/handler/filter/percentageFilter_test.go index 3f848643eb..3093ec210e 100644 --- a/src/aggregator/aggregator/handler/filter/percentageFilter_test.go +++ b/src/aggregator/aggregator/handler/filter/percentageFilter_test.go @@ -34,11 +34,11 @@ func TestPercentageFilter(t *testing.T) { defer ctrl.Finish() mm := producer.NewMockMessage(ctrl) - f0 := NewPercentageFilter(0) - require.False(t, f0(mm)) + f0 := NewPercentageFilter(0, producer.StaticConfig) + require.False(t, f0.Function((mm))) - f1 := NewPercentageFilter(1) - require.True(t, f1(mm)) + f1 := NewPercentageFilter(1, producer.DynamicConfig) + require.True(t, f1.Function(mm)) } var filterResult bool @@ -55,10 +55,10 @@ BenchmarkPercentageFilter-12 280085259 4.232 ns/op PASS */ func BenchmarkPercentageFilter(b *testing.B) { - f := NewPercentageFilter(0.5) + f := NewPercentageFilter(0.5, producer.StaticConfig) var r bool for i := 0; i < b.N; i++ { - r = f(nil) + r = f.Function(nil) } filterResult = r } diff --git a/src/aggregator/aggregator/handler/filter/shard.go b/src/aggregator/aggregator/handler/filter/shard.go index b81a94fec2..7b654eb017 100644 --- a/src/aggregator/aggregator/handler/filter/shard.go +++ b/src/aggregator/aggregator/handler/filter/shard.go @@ -30,9 +30,9 @@ type shardSetFilter struct { } // NewShardSetFilter creates a filter based on ShardSet. -func NewShardSetFilter(shardSet sharding.ShardSet) producer.FilterFunc { +func NewShardSetFilter(shardSet sharding.ShardSet, sourceType producer.FilterFuncConfigSourceType) producer.FilterFunc { f := shardSetFilter{shardSet: shardSet} - return f.Filter + return producer.NewFilterFunc(f.Filter, producer.ShardSetFilter, sourceType) } func (f shardSetFilter) Filter(m producer.Message) bool { diff --git a/src/aggregator/aggregator/handler/filter/shard_test.go b/src/aggregator/aggregator/handler/filter/shard_test.go index d78aa6c188..ddec749b55 100644 --- a/src/aggregator/aggregator/handler/filter/shard_test.go +++ b/src/aggregator/aggregator/handler/filter/shard_test.go @@ -35,15 +35,15 @@ func TestShardSetFilter(t *testing.T) { defer ctrl.Finish() ss := sharding.MustParseShardSet("0..511") - f := NewShardSetFilter(ss) + f := NewShardSetFilter(ss, producer.StaticConfig) mm := producer.NewMockMessage(ctrl) mm.EXPECT().Shard().Return(uint32(0)) - require.True(t, f(mm)) + require.True(t, f.Function(mm)) mm.EXPECT().Shard().Return(uint32(10)) - require.True(t, f(mm)) + require.True(t, f.Function(mm)) mm.EXPECT().Shard().Return(uint32(511)) - require.True(t, f(mm)) + require.True(t, f.Function(mm)) mm.EXPECT().Shard().Return(uint32(512)) - require.False(t, f(mm)) + require.False(t, f.Function(mm)) } diff --git a/src/aggregator/aggregator/handler/writer/protobuf.go b/src/aggregator/aggregator/handler/writer/protobuf.go index 11e767916a..429df31bfa 100644 --- a/src/aggregator/aggregator/handler/writer/protobuf.go +++ b/src/aggregator/aggregator/handler/writer/protobuf.go @@ -168,8 +168,13 @@ type storagePolicyFilter struct { } // NewStoragePolicyFilter creates a new storage policy based filter. -func NewStoragePolicyFilter(acceptedStoragePolicies []policy.StoragePolicy) producer.FilterFunc { - return storagePolicyFilter{acceptedStoragePolicies}.Filter +func NewStoragePolicyFilter( + acceptedStoragePolicies []policy.StoragePolicy, + configSource producer.FilterFuncConfigSourceType) producer.FilterFunc { + return producer.NewFilterFunc( + storagePolicyFilter{acceptedStoragePolicies}.Filter, + producer.StoragePolicyFilter, + configSource) } func (f storagePolicyFilter) Filter(m producer.Message) bool { diff --git a/src/aggregator/aggregator/handler/writer/protobuf_test.go b/src/aggregator/aggregator/handler/writer/protobuf_test.go index f0331d1f7c..dffda8e39d 100644 --- a/src/aggregator/aggregator/handler/writer/protobuf_test.go +++ b/src/aggregator/aggregator/handler/writer/protobuf_test.go @@ -90,11 +90,11 @@ func TestStoragePolicyFilter(t *testing.T) { m2 := producer.NewMockMessage(nil) - f := NewStoragePolicyFilter([]policy.StoragePolicy{sp2}) + f := NewStoragePolicyFilter([]policy.StoragePolicy{sp2}, producer.StaticConfig) - require.True(t, f(m2)) - require.False(t, f(newMessage(0, sp1, protobuf.Buffer{}))) - require.True(t, f(newMessage(0, sp2, protobuf.Buffer{}))) + require.True(t, f.Function(m2)) + require.False(t, f.Function(newMessage(0, sp1, protobuf.Buffer{}))) + require.True(t, f.Function(newMessage(0, sp2, protobuf.Buffer{}))) } func TestProtobufWriterWriteClosed(t *testing.T) { diff --git a/src/msg/generated/proto/msgpb/msg.pb.go b/src/msg/generated/proto/msgpb/msg.pb.go index 30395d7060..17fed520a0 100644 --- a/src/msg/generated/proto/msgpb/msg.pb.go +++ b/src/msg/generated/proto/msgpb/msg.pb.go @@ -1,26 +1,6 @@ // Code generated by protoc-gen-gogo. DO NOT EDIT. // source: github.com/m3db/m3/src/msg/generated/proto/msgpb/msg.proto -// Copyright (c) 2021 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - /* Package msgpb is a generated protocol buffer package. diff --git a/src/msg/generated/proto/topicpb/topic.pb.go b/src/msg/generated/proto/topicpb/topic.pb.go index c749befdd4..a1c3c08ebe 100644 --- a/src/msg/generated/proto/topicpb/topic.pb.go +++ b/src/msg/generated/proto/topicpb/topic.pb.go @@ -1,26 +1,6 @@ // Code generated by protoc-gen-gogo. DO NOT EDIT. // source: github.com/m3db/m3/src/msg/generated/proto/topicpb/topic.proto -// Copyright (c) 2018 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - /* Package topicpb is a generated protocol buffer package. @@ -30,6 +10,10 @@ It has these top-level messages: Topic ConsumerService + Filters + StoragePolicyFilter + PercentageFilter + ShardSetFilter ServiceID */ package topicpb @@ -38,6 +22,8 @@ import proto "github.com/gogo/protobuf/proto" import fmt "fmt" import math "math" +import binary "encoding/binary" + import io "io" // Reference imports to suppress errors if they are not otherwise used. @@ -111,6 +97,7 @@ type ConsumerService struct { ServiceId *ServiceID `protobuf:"bytes,1,opt,name=service_id,json=serviceId" json:"service_id,omitempty"` ConsumptionType ConsumptionType `protobuf:"varint,2,opt,name=consumption_type,json=consumptionType,proto3,enum=topicpb.ConsumptionType" json:"consumption_type,omitempty"` MessageTtlNanos int64 `protobuf:"varint,3,opt,name=message_ttl_nanos,json=messageTtlNanos,proto3" json:"message_ttl_nanos,omitempty"` + Filters *Filters `protobuf:"bytes,4,opt,name=filters" json:"filters,omitempty"` } func (m *ConsumerService) Reset() { *m = ConsumerService{} } @@ -139,6 +126,93 @@ func (m *ConsumerService) GetMessageTtlNanos() int64 { return 0 } +func (m *ConsumerService) GetFilters() *Filters { + if m != nil { + return m.Filters + } + return nil +} + +type Filters struct { + StoragePolicyFilter *StoragePolicyFilter `protobuf:"bytes,1,opt,name=storage_policy_filter,json=storagePolicyFilter" json:"storage_policy_filter,omitempty"` + PercentageFilter *PercentageFilter `protobuf:"bytes,2,opt,name=percentage_filter,json=percentageFilter" json:"percentage_filter,omitempty"` + ShardSetFilter *ShardSetFilter `protobuf:"bytes,3,opt,name=shard_set_filter,json=shardSetFilter" json:"shard_set_filter,omitempty"` +} + +func (m *Filters) Reset() { *m = Filters{} } +func (m *Filters) String() string { return proto.CompactTextString(m) } +func (*Filters) ProtoMessage() {} +func (*Filters) Descriptor() ([]byte, []int) { return fileDescriptorTopic, []int{2} } + +func (m *Filters) GetStoragePolicyFilter() *StoragePolicyFilter { + if m != nil { + return m.StoragePolicyFilter + } + return nil +} + +func (m *Filters) GetPercentageFilter() *PercentageFilter { + if m != nil { + return m.PercentageFilter + } + return nil +} + +func (m *Filters) GetShardSetFilter() *ShardSetFilter { + if m != nil { + return m.ShardSetFilter + } + return nil +} + +type StoragePolicyFilter struct { + StoragePolicies []string `protobuf:"bytes,1,rep,name=storage_policies,json=storagePolicies" json:"storage_policies,omitempty"` +} + +func (m *StoragePolicyFilter) Reset() { *m = StoragePolicyFilter{} } +func (m *StoragePolicyFilter) String() string { return proto.CompactTextString(m) } +func (*StoragePolicyFilter) ProtoMessage() {} +func (*StoragePolicyFilter) Descriptor() ([]byte, []int) { return fileDescriptorTopic, []int{3} } + +func (m *StoragePolicyFilter) GetStoragePolicies() []string { + if m != nil { + return m.StoragePolicies + } + return nil +} + +type PercentageFilter struct { + Percentage float64 `protobuf:"fixed64,1,opt,name=percentage,proto3" json:"percentage,omitempty"` +} + +func (m *PercentageFilter) Reset() { *m = PercentageFilter{} } +func (m *PercentageFilter) String() string { return proto.CompactTextString(m) } +func (*PercentageFilter) ProtoMessage() {} +func (*PercentageFilter) Descriptor() ([]byte, []int) { return fileDescriptorTopic, []int{4} } + +func (m *PercentageFilter) GetPercentage() float64 { + if m != nil { + return m.Percentage + } + return 0 +} + +type ShardSetFilter struct { + ShardSet string `protobuf:"bytes,1,opt,name=shard_set,json=shardSet,proto3" json:"shard_set,omitempty"` +} + +func (m *ShardSetFilter) Reset() { *m = ShardSetFilter{} } +func (m *ShardSetFilter) String() string { return proto.CompactTextString(m) } +func (*ShardSetFilter) ProtoMessage() {} +func (*ShardSetFilter) Descriptor() ([]byte, []int) { return fileDescriptorTopic, []int{5} } + +func (m *ShardSetFilter) GetShardSet() string { + if m != nil { + return m.ShardSet + } + return "" +} + type ServiceID struct { Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` Environment string `protobuf:"bytes,2,opt,name=environment,proto3" json:"environment,omitempty"` @@ -148,7 +222,7 @@ type ServiceID struct { func (m *ServiceID) Reset() { *m = ServiceID{} } func (m *ServiceID) String() string { return proto.CompactTextString(m) } func (*ServiceID) ProtoMessage() {} -func (*ServiceID) Descriptor() ([]byte, []int) { return fileDescriptorTopic, []int{2} } +func (*ServiceID) Descriptor() ([]byte, []int) { return fileDescriptorTopic, []int{6} } func (m *ServiceID) GetName() string { if m != nil { @@ -174,6 +248,10 @@ func (m *ServiceID) GetZone() string { func init() { proto.RegisterType((*Topic)(nil), "topicpb.Topic") proto.RegisterType((*ConsumerService)(nil), "topicpb.ConsumerService") + proto.RegisterType((*Filters)(nil), "topicpb.Filters") + proto.RegisterType((*StoragePolicyFilter)(nil), "topicpb.StoragePolicyFilter") + proto.RegisterType((*PercentageFilter)(nil), "topicpb.PercentageFilter") + proto.RegisterType((*ShardSetFilter)(nil), "topicpb.ShardSetFilter") proto.RegisterType((*ServiceID)(nil), "topicpb.ServiceID") proto.RegisterEnum("topicpb.ConsumptionType", ConsumptionType_name, ConsumptionType_value) } @@ -253,6 +331,145 @@ func (m *ConsumerService) MarshalTo(dAtA []byte) (int, error) { i++ i = encodeVarintTopic(dAtA, i, uint64(m.MessageTtlNanos)) } + if m.Filters != nil { + dAtA[i] = 0x22 + i++ + i = encodeVarintTopic(dAtA, i, uint64(m.Filters.Size())) + n2, err := m.Filters.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n2 + } + return i, nil +} + +func (m *Filters) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Filters) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.StoragePolicyFilter != nil { + dAtA[i] = 0xa + i++ + i = encodeVarintTopic(dAtA, i, uint64(m.StoragePolicyFilter.Size())) + n3, err := m.StoragePolicyFilter.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n3 + } + if m.PercentageFilter != nil { + dAtA[i] = 0x12 + i++ + i = encodeVarintTopic(dAtA, i, uint64(m.PercentageFilter.Size())) + n4, err := m.PercentageFilter.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n4 + } + if m.ShardSetFilter != nil { + dAtA[i] = 0x1a + i++ + i = encodeVarintTopic(dAtA, i, uint64(m.ShardSetFilter.Size())) + n5, err := m.ShardSetFilter.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n5 + } + return i, nil +} + +func (m *StoragePolicyFilter) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *StoragePolicyFilter) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.StoragePolicies) > 0 { + for _, s := range m.StoragePolicies { + dAtA[i] = 0xa + i++ + l = len(s) + for l >= 1<<7 { + dAtA[i] = uint8(uint64(l)&0x7f | 0x80) + l >>= 7 + i++ + } + dAtA[i] = uint8(l) + i++ + i += copy(dAtA[i:], s) + } + } + return i, nil +} + +func (m *PercentageFilter) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PercentageFilter) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Percentage != 0 { + dAtA[i] = 0x9 + i++ + binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Percentage)))) + i += 8 + } + return i, nil +} + +func (m *ShardSetFilter) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ShardSetFilter) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.ShardSet) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintTopic(dAtA, i, uint64(len(m.ShardSet))) + i += copy(dAtA[i:], m.ShardSet) + } return i, nil } @@ -333,6 +550,59 @@ func (m *ConsumerService) Size() (n int) { if m.MessageTtlNanos != 0 { n += 1 + sovTopic(uint64(m.MessageTtlNanos)) } + if m.Filters != nil { + l = m.Filters.Size() + n += 1 + l + sovTopic(uint64(l)) + } + return n +} + +func (m *Filters) Size() (n int) { + var l int + _ = l + if m.StoragePolicyFilter != nil { + l = m.StoragePolicyFilter.Size() + n += 1 + l + sovTopic(uint64(l)) + } + if m.PercentageFilter != nil { + l = m.PercentageFilter.Size() + n += 1 + l + sovTopic(uint64(l)) + } + if m.ShardSetFilter != nil { + l = m.ShardSetFilter.Size() + n += 1 + l + sovTopic(uint64(l)) + } + return n +} + +func (m *StoragePolicyFilter) Size() (n int) { + var l int + _ = l + if len(m.StoragePolicies) > 0 { + for _, s := range m.StoragePolicies { + l = len(s) + n += 1 + l + sovTopic(uint64(l)) + } + } + return n +} + +func (m *PercentageFilter) Size() (n int) { + var l int + _ = l + if m.Percentage != 0 { + n += 9 + } + return n +} + +func (m *ShardSetFilter) Size() (n int) { + var l int + _ = l + l = len(m.ShardSet) + if l > 0 { + n += 1 + l + sovTopic(uint64(l)) + } return n } @@ -596,6 +866,407 @@ func (m *ConsumerService) Unmarshal(dAtA []byte) error { break } } + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Filters", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTopic + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTopic + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Filters == nil { + m.Filters = &Filters{} + } + if err := m.Filters.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipTopic(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthTopic + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Filters) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTopic + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Filters: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Filters: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field StoragePolicyFilter", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTopic + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTopic + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.StoragePolicyFilter == nil { + m.StoragePolicyFilter = &StoragePolicyFilter{} + } + if err := m.StoragePolicyFilter.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field PercentageFilter", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTopic + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTopic + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.PercentageFilter == nil { + m.PercentageFilter = &PercentageFilter{} + } + if err := m.PercentageFilter.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ShardSetFilter", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTopic + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTopic + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.ShardSetFilter == nil { + m.ShardSetFilter = &ShardSetFilter{} + } + if err := m.ShardSetFilter.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipTopic(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthTopic + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *StoragePolicyFilter) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTopic + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: StoragePolicyFilter: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: StoragePolicyFilter: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field StoragePolicies", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTopic + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthTopic + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.StoragePolicies = append(m.StoragePolicies, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipTopic(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthTopic + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *PercentageFilter) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTopic + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PercentageFilter: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PercentageFilter: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 1 { + return fmt.Errorf("proto: wrong wireType = %d for field Percentage", wireType) + } + var v uint64 + if (iNdEx + 8) > l { + return io.ErrUnexpectedEOF + } + v = uint64(binary.LittleEndian.Uint64(dAtA[iNdEx:])) + iNdEx += 8 + m.Percentage = float64(math.Float64frombits(v)) + default: + iNdEx = preIndex + skippy, err := skipTopic(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthTopic + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ShardSetFilter) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTopic + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ShardSetFilter: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ShardSetFilter: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ShardSet", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTopic + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthTopic + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ShardSet = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipTopic(dAtA[iNdEx:]) @@ -864,30 +1535,40 @@ func init() { } var fileDescriptorTopic = []byte{ - // 385 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x91, 0x4f, 0x8e, 0xd3, 0x30, - 0x14, 0x87, 0xc7, 0x53, 0x98, 0x51, 0x5e, 0x44, 0x9b, 0xf1, 0x2a, 0xab, 0x28, 0xea, 0x2a, 0x9a, - 0x45, 0x22, 0xa6, 0x3b, 0x16, 0x48, 0x43, 0x1b, 0x89, 0x0a, 0x94, 0x41, 0x6e, 0x46, 0x2c, 0xa3, - 0xfc, 0xf1, 0x64, 0x22, 0xd5, 0x76, 0x64, 0xbb, 0x95, 0xca, 0x19, 0x58, 0x70, 0x19, 0xee, 0xc0, - 0x92, 0x23, 0xa0, 0x72, 0x11, 0x14, 0xd7, 0x14, 0x0a, 0xb3, 0xca, 0xd3, 0x97, 0x9f, 0xdf, 0xfb, - 0xfc, 0x0c, 0xaf, 0xdb, 0x4e, 0x3f, 0x6e, 0xaa, 0xb8, 0x16, 0x2c, 0x61, 0xb3, 0xa6, 0x4a, 0xd8, - 0x2c, 0x51, 0xb2, 0x4e, 0x98, 0x6a, 0x93, 0x96, 0x72, 0x2a, 0x4b, 0x4d, 0x9b, 0xa4, 0x97, 0x42, - 0x8b, 0x44, 0x8b, 0xbe, 0xab, 0xfb, 0xea, 0xf0, 0x8d, 0x0d, 0xc3, 0x97, 0x16, 0x4e, 0x3f, 0x23, - 0x78, 0x9e, 0x0f, 0x35, 0xc6, 0xf0, 0x8c, 0x97, 0x8c, 0xfa, 0x28, 0x44, 0x91, 0x43, 0x4c, 0x8d, - 0x23, 0xf0, 0xf8, 0x86, 0x55, 0x54, 0x16, 0xe2, 0xa1, 0x50, 0x8f, 0xa5, 0x6c, 0x94, 0x7f, 0x1e, - 0xa2, 0xe8, 0x05, 0x19, 0x1f, 0xf8, 0xdd, 0xc3, 0xca, 0x50, 0x9c, 0xc2, 0x55, 0x2d, 0xb8, 0xda, - 0x30, 0x2a, 0x0b, 0x45, 0xe5, 0xb6, 0xab, 0xa9, 0xf2, 0x47, 0xe1, 0x28, 0x72, 0x6f, 0xfc, 0xd8, - 0x0e, 0x8b, 0xe7, 0x36, 0xb1, 0x3a, 0x04, 0x88, 0x57, 0x9f, 0x02, 0x35, 0xfd, 0x8a, 0x60, 0xf2, - 0x4f, 0x0a, 0xbf, 0x04, 0xb0, 0x1d, 0x8b, 0xae, 0x31, 0x7a, 0xee, 0x0d, 0x3e, 0xf6, 0xb4, 0xa9, - 0xe5, 0x82, 0x38, 0x36, 0xb5, 0x6c, 0xf0, 0x1c, 0x6c, 0xeb, 0x5e, 0x77, 0x82, 0x17, 0x7a, 0xd7, - 0x53, 0xe3, 0x3d, 0xfe, 0x4f, 0xc6, 0x04, 0xf2, 0x5d, 0x4f, 0xc9, 0xa4, 0x3e, 0x05, 0xf8, 0x1a, - 0xae, 0x18, 0x55, 0xaa, 0x6c, 0x69, 0xa1, 0xf5, 0xba, 0xe0, 0x25, 0x17, 0xc3, 0x95, 0x50, 0x34, - 0x22, 0x13, 0xfb, 0x23, 0xd7, 0xeb, 0x6c, 0xc0, 0xd3, 0x7b, 0x70, 0x8e, 0x22, 0x4f, 0x6e, 0x32, - 0x04, 0x97, 0xf2, 0x6d, 0x27, 0x05, 0x67, 0x94, 0x6b, 0x23, 0xe3, 0x90, 0xbf, 0xd1, 0x70, 0xea, - 0x93, 0xe0, 0xd4, 0x4c, 0x70, 0x88, 0xa9, 0xaf, 0x5f, 0xfd, 0xde, 0xc6, 0x1f, 0x2b, 0x17, 0x2e, - 0xef, 0xb3, 0x77, 0xd9, 0xdd, 0xc7, 0xcc, 0x3b, 0xc3, 0x00, 0x17, 0xab, 0xb7, 0xb7, 0x24, 0x5d, - 0x78, 0x08, 0x8f, 0x01, 0x48, 0xfa, 0xe1, 0xfd, 0x72, 0x7e, 0x9b, 0xa7, 0x0b, 0xef, 0xfc, 0x8d, - 0xf7, 0x6d, 0x1f, 0xa0, 0xef, 0xfb, 0x00, 0xfd, 0xd8, 0x07, 0xe8, 0xcb, 0xcf, 0xe0, 0xac, 0xba, - 0x30, 0x6f, 0x3f, 0xfb, 0x15, 0x00, 0x00, 0xff, 0xff, 0xd4, 0xd7, 0xf0, 0x71, 0x3d, 0x02, 0x00, - 0x00, + // 549 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x93, 0xcf, 0x8e, 0xda, 0x3c, + 0x14, 0xc5, 0xc7, 0xc3, 0x7c, 0xc3, 0x97, 0x8b, 0x0a, 0xc1, 0xa3, 0xaa, 0xa9, 0x5a, 0x21, 0x94, + 0x55, 0x8a, 0x54, 0xa2, 0xc2, 0xae, 0x8b, 0xaa, 0x14, 0x18, 0x15, 0xb5, 0x62, 0x90, 0x61, 0xd4, + 0x65, 0x04, 0xc1, 0x30, 0x91, 0x88, 0x1d, 0xd9, 0x66, 0x24, 0xfa, 0x0c, 0x5d, 0xf4, 0xb1, 0xba, + 0xec, 0x23, 0xb4, 0x54, 0xea, 0x73, 0x54, 0x71, 0x4c, 0xf8, 0x33, 0xb3, 0x8a, 0xf3, 0xcb, 0xf1, + 0xf1, 0xb9, 0x37, 0xd7, 0xf0, 0x6e, 0x19, 0xa9, 0xbb, 0xf5, 0xac, 0x19, 0xf2, 0xd8, 0x8f, 0xdb, + 0xf3, 0x99, 0x1f, 0xb7, 0x7d, 0x29, 0x42, 0x3f, 0x96, 0x4b, 0x7f, 0x49, 0x19, 0x15, 0x53, 0x45, + 0xe7, 0x7e, 0x22, 0xb8, 0xe2, 0xbe, 0xe2, 0x49, 0x14, 0x26, 0xb3, 0xec, 0xd9, 0xd4, 0x0c, 0x17, + 0x0d, 0x74, 0xbf, 0x21, 0xf8, 0x6f, 0x92, 0xae, 0x31, 0x86, 0x0b, 0x36, 0x8d, 0xa9, 0x83, 0xea, + 0xc8, 0xb3, 0x88, 0x5e, 0x63, 0x0f, 0x6c, 0xb6, 0x8e, 0x67, 0x54, 0x04, 0x7c, 0x11, 0xc8, 0xbb, + 0xa9, 0x98, 0x4b, 0xe7, 0xbc, 0x8e, 0xbc, 0x27, 0xa4, 0x9c, 0xf1, 0x9b, 0xc5, 0x58, 0x53, 0xdc, + 0x87, 0x6a, 0xc8, 0x99, 0x5c, 0xc7, 0x54, 0x04, 0x92, 0x8a, 0xfb, 0x28, 0xa4, 0xd2, 0x29, 0xd4, + 0x0b, 0x5e, 0xa9, 0xe5, 0x34, 0xcd, 0x61, 0xcd, 0xae, 0x51, 0x8c, 0x33, 0x01, 0xb1, 0xc3, 0x63, + 0x20, 0xdd, 0xdf, 0x08, 0x2a, 0x27, 0x2a, 0xfc, 0x06, 0xc0, 0x38, 0x06, 0xd1, 0x5c, 0xc7, 0x2b, + 0xb5, 0x70, 0xee, 0x69, 0x54, 0x83, 0x1e, 0xb1, 0x8c, 0x6a, 0x30, 0xc7, 0x5d, 0x30, 0xd6, 0x89, + 0x8a, 0x38, 0x0b, 0xd4, 0x26, 0xa1, 0x3a, 0x77, 0xf9, 0x41, 0x18, 0x2d, 0x98, 0x6c, 0x12, 0x4a, + 0x2a, 0xe1, 0x31, 0xc0, 0x0d, 0xa8, 0xc6, 0x54, 0xca, 0xe9, 0x92, 0x06, 0x4a, 0xad, 0x02, 0x36, + 0x65, 0x3c, 0x2d, 0x09, 0x79, 0x05, 0x52, 0x31, 0x1f, 0x26, 0x6a, 0x35, 0x4c, 0x31, 0x6e, 0x40, + 0x71, 0x11, 0xad, 0x14, 0x15, 0xd2, 0xb9, 0xd0, 0x01, 0xed, 0xfc, 0x9c, 0xeb, 0x8c, 0x93, 0x9d, + 0xc0, 0xfd, 0x8b, 0xa0, 0x68, 0x20, 0x1e, 0xc1, 0x53, 0xa9, 0xb8, 0x48, 0xcf, 0x48, 0xf8, 0x2a, + 0x0a, 0x37, 0x41, 0xa6, 0x32, 0x65, 0xbe, 0xdc, 0x97, 0x99, 0xa9, 0x46, 0x5a, 0x94, 0xed, 0x26, + 0x57, 0xf2, 0x21, 0xc4, 0xd7, 0x50, 0x4d, 0xa8, 0x08, 0x29, 0x53, 0xa9, 0xa9, 0x71, 0x3b, 0xd7, + 0x6e, 0xcf, 0x73, 0xb7, 0x51, 0xae, 0x30, 0x56, 0x76, 0x72, 0x42, 0x70, 0x07, 0x6c, 0xfd, 0xc3, + 0x03, 0x49, 0xd5, 0xce, 0xa6, 0xa0, 0x6d, 0x9e, 0xed, 0x43, 0xa5, 0x82, 0x31, 0x55, 0xc6, 0xa4, + 0x2c, 0x8f, 0xde, 0xdd, 0xf7, 0x70, 0xf5, 0x48, 0x6c, 0xfc, 0x0a, 0xec, 0xa3, 0x9a, 0x23, 0x2a, + 0x1d, 0x54, 0x2f, 0x78, 0x16, 0xa9, 0x1c, 0x16, 0x14, 0x51, 0xe9, 0xb6, 0xc0, 0x3e, 0x8d, 0x8a, + 0x6b, 0x00, 0xfb, 0xb0, 0xba, 0x4f, 0x88, 0x1c, 0x10, 0xf7, 0x35, 0x94, 0x8f, 0x73, 0xe1, 0x17, + 0x60, 0xe5, 0xa5, 0x98, 0xf1, 0xfe, 0x7f, 0x17, 0xd5, 0xbd, 0x05, 0x2b, 0x1f, 0xa1, 0x47, 0xef, + 0x40, 0x1d, 0x4a, 0x94, 0xdd, 0x47, 0x82, 0xb3, 0x98, 0x32, 0xa5, 0x5b, 0x69, 0x91, 0x43, 0x94, + 0xee, 0xfa, 0xca, 0x19, 0xd5, 0xed, 0xb1, 0x88, 0x5e, 0x37, 0xde, 0xee, 0xe6, 0x78, 0x3f, 0x4f, + 0x25, 0x28, 0xde, 0x0e, 0x3f, 0x0d, 0x6f, 0xbe, 0x0c, 0xed, 0x33, 0x0c, 0x70, 0x39, 0xfe, 0xd8, + 0x21, 0xfd, 0x9e, 0x8d, 0x70, 0x19, 0x80, 0xf4, 0x47, 0x9f, 0x07, 0xdd, 0xce, 0xa4, 0xdf, 0xb3, + 0xcf, 0x3f, 0xd8, 0x3f, 0xb6, 0x35, 0xf4, 0x73, 0x5b, 0x43, 0xbf, 0xb6, 0x35, 0xf4, 0xfd, 0x4f, + 0xed, 0x6c, 0x76, 0xa9, 0x6f, 0x6d, 0xfb, 0x5f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x6f, 0xa8, 0x59, + 0x77, 0xf7, 0x03, 0x00, 0x00, } diff --git a/src/msg/generated/proto/topicpb/topic.proto b/src/msg/generated/proto/topicpb/topic.proto index 193f146a04..d45ef9fe70 100644 --- a/src/msg/generated/proto/topicpb/topic.proto +++ b/src/msg/generated/proto/topicpb/topic.proto @@ -11,8 +11,21 @@ message ConsumerService { ServiceID service_id = 1; ConsumptionType consumption_type = 2; int64 message_ttl_nanos = 3; + Filters filters = 4; } +message Filters { + StoragePolicyFilter storage_policy_filter = 1; + PercentageFilter percentage_filter = 2; + ShardSetFilter shard_set_filter = 3; +} + +message StoragePolicyFilter { repeated string storage_policies = 1; } + +message PercentageFilter { double percentage = 1; } + +message ShardSetFilter { string shard_set = 1; } + message ServiceID { string name = 1; string environment = 2; diff --git a/src/msg/producer/ref_counted.go b/src/msg/producer/ref_counted.go index 444d47aee1..598b1a6619 100644 --- a/src/msg/producer/ref_counted.go +++ b/src/msg/producer/ref_counted.go @@ -23,6 +23,7 @@ package producer import ( "sync" + "github.com/uber-go/tally" "go.uber.org/atomic" ) @@ -50,16 +51,28 @@ func NewRefCountedMessage(m Message, fn OnFinalizeFn) *RefCountedMessage { } } +// Not a fan of passing function pointers here, +// but passing ConsumerServiceWriterMetrics +// to RefCountedMessage.Accept will cause a cyclical depdenency. ¯\_(ツ)_/¯ +type filterAcceptCounterFn func(metadata FilterFuncMetadata) tally.Counter +type filterDenyCounterFn func(metadata FilterFuncMetadata) tally.Counter + // Accept returns true if the message can be accepted by the filter. -func (rm *RefCountedMessage) Accept(fn []FilterFunc) bool { +func (rm *RefCountedMessage) Accept( + fn []FilterFunc, + acceptCounterFn filterAcceptCounterFn, + denyCounterFn filterDenyCounterFn) bool { if len(fn) == 0 { return false } for _, f := range fn { - if !f(rm.Message) { + if !f.Function(rm.Message) { + denyCounterFn(f.Metadata).Inc(1) return false } + acceptCounterFn(f.Metadata).Inc(1) } + return true } diff --git a/src/msg/producer/ref_counted_test.go b/src/msg/producer/ref_counted_test.go index 7d51584f34..f8462d8c72 100644 --- a/src/msg/producer/ref_counted_test.go +++ b/src/msg/producer/ref_counted_test.go @@ -27,8 +27,17 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" + "github.com/uber-go/tally" ) +var filterAcceptFn = func(metadata FilterFuncMetadata) tally.Counter { + return tally.NoopScope.Counter("accept") +} + +var filterDenyFn = func(metadata FilterFuncMetadata) tally.Counter { + return tally.NoopScope.Counter("deny") +} + func TestRefCountedMessageConsume(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -123,31 +132,35 @@ func TestRefCountedMessageFilter(t *testing.T) { defer ctrl.Finish() var called int - filter := func(m Message) bool { - called++ - return m.Shard() == 0 - } - - sizeFilter := func(m Message) bool { + filter := NewFilterFunc( + func(m Message) bool { + called++ + return m.Shard() == 0 + }, + UnspecifiedFilter, + StaticConfig, + ) + + sizeFilter := NewFilterFunc(func(m Message) bool { called++ return m.Size() == 0 - } + }, UnspecifiedFilter, StaticConfig) mm := NewMockMessage(ctrl) mm.EXPECT().Size().Return(0) rm := NewRefCountedMessage(mm, nil) mm.EXPECT().Shard().Return(uint32(0)) - require.True(t, rm.Accept([]FilterFunc{filter})) + require.True(t, rm.Accept([]FilterFunc{filter}, filterAcceptFn, filterDenyFn)) mm.EXPECT().Shard().Return(uint32(1)) - require.False(t, rm.Accept([]FilterFunc{filter})) + require.False(t, rm.Accept([]FilterFunc{filter}, filterAcceptFn, filterDenyFn)) mm.EXPECT().Shard().Return(uint32(0)) mm.EXPECT().Size().Return(0) - require.True(t, rm.Accept([]FilterFunc{filter, sizeFilter})) + require.True(t, rm.Accept([]FilterFunc{filter, sizeFilter}, filterAcceptFn, filterDenyFn)) - require.False(t, rm.Accept([]FilterFunc{})) + require.False(t, rm.Accept([]FilterFunc{}, filterAcceptFn, filterDenyFn)) } func TestRefCountedMessageOnDropFn(t *testing.T) { diff --git a/src/msg/producer/types.go b/src/msg/producer/types.go index a69e0ecf53..4771fd2809 100644 --- a/src/msg/producer/types.go +++ b/src/msg/producer/types.go @@ -85,8 +85,94 @@ type Producer interface { Close(ct CloseType) } +// FilterFuncType specifies the type of filter function. +type FilterFuncType uint8 + +const ( + // ShardSetFilter filters messages based on a shard set. + ShardSetFilter FilterFuncType = iota + // StoragePolicyFilter filters messages based on a storage policy. + StoragePolicyFilter + // PercentageFilter filters messages on a sampling percentage. + PercentageFilter + // AcceptAllFilter accepts all messages. + AcceptAllFilter + // UnspecifiedFilter is any filter that is not one of the well known types. + UnspecifiedFilter +) + +func (f FilterFuncType) String() string { + switch f { + + case ShardSetFilter: + return "ShardSetFilter" + case StoragePolicyFilter: + return "StoragePolicyFilter" + case PercentageFilter: + return "PercentageFilter" + case AcceptAllFilter: + return "AcceptAllFilter" + case UnspecifiedFilter: + return "UnspecifiedFilter" + } + + return "Unknown" +} + +// FilterFuncConfigSourceType specifies the configuration source of the filter function. +type FilterFuncConfigSourceType uint8 + +const ( + // StaticConfig is static configuration that is applied once at service startup. + StaticConfig FilterFuncConfigSourceType = iota + // DynamicConfig is dynamic configuration that can be updated at runtime. + DynamicConfig +) + +func (f FilterFuncConfigSourceType) String() string { + switch f { + + case StaticConfig: + return "StaticConfig" + case DynamicConfig: + return "DynamicConfig" + } + + return "Unknown" +} + +// FilterFuncMetadata contains metadata about a filter function. +type FilterFuncMetadata struct { + FilterType FilterFuncType + SourceType FilterFuncConfigSourceType +} + +// NewFilterFuncMetadata creates a new filter function metadata. +func NewFilterFuncMetadata( + filterType FilterFuncType, + sourceType FilterFuncConfigSourceType) FilterFuncMetadata { + return FilterFuncMetadata{ + FilterType: filterType, + SourceType: sourceType, + } +} + // FilterFunc can filter message. -type FilterFunc func(m Message) bool +type FilterFunc struct { + Function func(m Message) bool + Metadata FilterFuncMetadata +} + +// NewFilterFunc creates a new filter function. +func NewFilterFunc( + function func(m Message) bool, + filterType FilterFuncType, + sourceType FilterFuncConfigSourceType) FilterFunc { + return FilterFunc{ + Function: function, + Metadata: NewFilterFuncMetadata(filterType, sourceType), + } +} // Options configs a producer. type Options interface { diff --git a/src/msg/producer/writer/consumer_service_writer.go b/src/msg/producer/writer/consumer_service_writer.go index 70d1894884..77dc62b699 100644 --- a/src/msg/producer/writer/consumer_service_writer.go +++ b/src/msg/producer/writer/consumer_service_writer.go @@ -37,8 +37,14 @@ import ( var ( acceptAllFilter = producer.FilterFunc( - func(m producer.Message) bool { - return true + producer.FilterFunc{ + Function: func(m producer.Message) bool { + return true + }, + Metadata: producer.FilterFuncMetadata{ + FilterType: producer.AcceptAllFilter, + SourceType: producer.StaticConfig, + }, }, ) @@ -77,23 +83,84 @@ type consumerServiceWriter interface { // UnregisterFilters unregisters the filters for the consumer service. UnregisterFilters() + + // GetDataFilter returns the data filters on the consumer service writer. + GetDataFilters() []producer.FilterFunc } type consumerServiceWriterMetrics struct { - placementError tally.Counter - placementUpdate tally.Counter - filterAccepted tally.Counter - filterNotAccepted tally.Counter - queueSize tally.Gauge + placementError tally.Counter + placementUpdate tally.Counter + queueSize tally.Gauge + filterAccepted tally.Counter + filterNotAccepted tally.Counter + filterAcceptedGranular map[string]tally.Counter + filterAcceptedGranularLock sync.RWMutex + filterNotAcceptedGranular map[string]tally.Counter + filterNotAcceptedGranularLock sync.RWMutex + scope tally.Scope +} + +func (cswm *consumerServiceWriterMetrics) getGranularFilterCounterMapKey(metadata producer.FilterFuncMetadata) string { + return fmt.Sprintf("%s::%s", metadata.FilterType.String(), metadata.SourceType.String()) +} + +//nolint:dupl +func (cswm *consumerServiceWriterMetrics) getFilterAcceptedGranularCounter( + metadata producer.FilterFuncMetadata) tally.Counter { + key := cswm.getGranularFilterCounterMapKey(metadata) + + cswm.filterAcceptedGranularLock.RLock() + val, ok := cswm.filterAcceptedGranular[key] + cswm.filterAcceptedGranularLock.RUnlock() + + if !ok { + val = cswm.scope.Tagged(map[string]string{ + "config-source": metadata.SourceType.String(), + "filter-type": metadata.FilterType.String(), + }).Counter("filter-accepted-granular") + + cswm.filterAcceptedGranularLock.Lock() + cswm.filterAcceptedGranular[key] = val + cswm.filterAcceptedGranularLock.Unlock() + } + + return val +} + +//nolint:dupl +func (cswm *consumerServiceWriterMetrics) getFilterNotAcceptedGranularCounter( + metadata producer.FilterFuncMetadata) tally.Counter { + key := cswm.getGranularFilterCounterMapKey(metadata) + + cswm.filterNotAcceptedGranularLock.RLock() + val, ok := cswm.filterNotAcceptedGranular[key] + cswm.filterNotAcceptedGranularLock.RUnlock() + + if !ok { + val = cswm.scope.Tagged(map[string]string{ + "config-source": metadata.SourceType.String(), + "filter-type": metadata.FilterType.String(), + }).Counter("filter-not-accepted-granular") + + cswm.filterNotAcceptedGranularLock.Lock() + cswm.filterNotAcceptedGranular[key] = val + cswm.filterNotAcceptedGranularLock.Unlock() + } + + return val } func newConsumerServiceWriterMetrics(scope tally.Scope) consumerServiceWriterMetrics { return consumerServiceWriterMetrics{ - placementUpdate: scope.Counter("placement-update"), - placementError: scope.Counter("placement-error"), - filterAccepted: scope.Counter("filter-accepted"), - filterNotAccepted: scope.Counter("filter-not-accepted"), - queueSize: scope.Gauge("queue-size"), + placementUpdate: scope.Counter("placement-update"), + placementError: scope.Counter("placement-error"), + filterAccepted: scope.Counter("filter-accepted"), + filterNotAccepted: scope.Counter("filter-not-accepted"), + scope: scope, + filterAcceptedGranular: make(map[string]tally.Counter), + filterNotAcceptedGranular: make(map[string]tally.Counter), + queueSize: scope.Gauge("queue-size"), } } @@ -178,8 +245,12 @@ func initShardWriters( return sws } +func (w *consumerServiceWriterImpl) GetDataFilters() []producer.FilterFunc { + return w.dataFilters +} + func (w *consumerServiceWriterImpl) Write(rm *producer.RefCountedMessage) { - if rm.Accept(w.dataFilters) { + if rm.Accept(w.dataFilters, w.m.getFilterAcceptedGranularCounter, w.m.getFilterNotAcceptedGranularCounter) { w.shardWriters[rm.Shard()].Write(rm) w.m.filterAccepted.Inc(1) return diff --git a/src/msg/producer/writer/consumer_service_writer_mock.go b/src/msg/producer/writer/consumer_service_writer_mock.go index 4472d92b36..1a91d1a11e 100644 --- a/src/msg/producer/writer/consumer_service_writer_mock.go +++ b/src/msg/producer/writer/consumer_service_writer_mock.go @@ -105,6 +105,21 @@ func (mr *MockconsumerServiceWriterMockRecorder) SetMessageTTLNanos(value interf return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetMessageTTLNanos", reflect.TypeOf((*MockconsumerServiceWriter)(nil).SetMessageTTLNanos), value) } +// GetDataFilter mocks base method. +func (m *MockconsumerServiceWriter) GetDataFilters() []producer.FilterFunc { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetDataFilters") + + ret0, _ := ret[0].([]producer.FilterFunc) + return ret0 +} + +// GetDataFilters indicates an expected call of GetDataFilters +func (mr *MockconsumerServiceWriterMockRecorder) GetDataFilters() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetDataFilters", reflect.TypeOf((*MockconsumerServiceWriter)(nil).GetDataFilters())) +} + // UnregisterFilters mocks base method. func (m *MockconsumerServiceWriter) UnregisterFilters() { m.ctrl.T.Helper() diff --git a/src/msg/producer/writer/consumer_service_writer_test.go b/src/msg/producer/writer/consumer_service_writer_test.go index b11f1611cf..e3a5c675f6 100644 --- a/src/msg/producer/writer/consumer_service_writer_test.go +++ b/src/msg/producer/writer/consumer_service_writer_test.go @@ -30,6 +30,7 @@ import ( "github.com/fortytw2/leaktest" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" + "github.com/uber-go/tally" "github.com/m3db/m3/src/cluster/kv" "github.com/m3db/m3/src/cluster/kv/mem" @@ -521,7 +522,10 @@ func TestConsumerServiceWriterFilter(t *testing.T) { sw1.EXPECT().Write(gomock.Any()) csw.Write(producer.NewRefCountedMessage(mm1, nil)) - csw.RegisterFilter(func(m producer.Message) bool { return m.Shard() == uint32(0) }) + csw.RegisterFilter(producer.NewFilterFunc( + func(m producer.Message) bool { return m.Shard() == uint32(0) }, + producer.UnspecifiedFilter, + producer.StaticConfig)) // Write is not expected due to mm1 shard != 0 csw.Write(producer.NewRefCountedMessage(mm1, nil)) @@ -529,7 +533,10 @@ func TestConsumerServiceWriterFilter(t *testing.T) { // Write is expected due to mm0 shard == 0 csw.Write(producer.NewRefCountedMessage(mm0, nil)) - csw.RegisterFilter(func(m producer.Message) bool { return m.Size() == 3 }) + csw.RegisterFilter(producer.NewFilterFunc( + func(m producer.Message) bool { return m.Size() == 3 }, + producer.UnspecifiedFilter, + producer.StaticConfig)) sw0.EXPECT().Write(gomock.Any()) // Write is expected because to mm0 shard == 0 and mm0 size == 3 csw.Write(producer.NewRefCountedMessage(mm0, nil)) @@ -686,6 +693,54 @@ func TestConsumerServiceCloseShardWritersConcurrently(t *testing.T) { } } +func TestConsumerServiceWriterMetrics(t *testing.T) { + testScope := tally.NewTestScope("test", nil) + + acceptedMetadata := producer.FilterFuncMetadata{ + FilterType: producer.ShardSetFilter, + SourceType: producer.DynamicConfig} + + notAcceptedMetadata := producer.FilterFuncMetadata{ + FilterType: producer.PercentageFilter, + SourceType: producer.DynamicConfig} + + m := newConsumerServiceWriterMetrics(testScope) + m.getFilterAcceptedGranularCounter(acceptedMetadata).Inc(1) + m.getFilterNotAcceptedGranularCounter(notAcceptedMetadata).Inc(1) + + accepetKey := m.getGranularFilterCounterMapKey(acceptedMetadata) + notAcceptKey := m.getGranularFilterCounterMapKey(notAcceptedMetadata) + + _, ok := m.filterAcceptedGranular[accepetKey] + require.True(t, ok) + + _, ok = m.filterNotAcceptedGranular[notAcceptKey] + require.True(t, ok) + + gotAcceptedValue := false + gotNotAcceptedValue := false + + snap := testScope.Snapshot() + for _, c := range snap.Counters() { + if c.Name() == "test.filter-accepted-granular" { + require.Equal(t, "DynamicConfig", c.Tags()["config-source"]) + require.Equal(t, "ShardSetFilter", c.Tags()["filter-type"]) + require.Equal(t, int64(1), c.Value()) + gotAcceptedValue = true + } + + if c.Name() == "test.filter-not-accepted-granular" { + require.Equal(t, "DynamicConfig", c.Tags()["config-source"]) + require.Equal(t, "PercentageFilter", c.Tags()["filter-type"]) + require.Equal(t, int64(1), c.Value()) + gotNotAcceptedValue = true + } + } + + require.True(t, gotAcceptedValue) + require.True(t, gotNotAcceptedValue) +} + func testPlacementService(store kv.Store, sid services.ServiceID) placement.Service { return service.NewPlacementService( storage.NewPlacementStorage(store, sid.String(), placement.NewOptions()), diff --git a/src/msg/producer/writer/writer.go b/src/msg/producer/writer/writer.go index 080627f707..fbc82a1172 100644 --- a/src/msg/producer/writer/writer.go +++ b/src/msg/producer/writer/writer.go @@ -28,7 +28,11 @@ import ( "github.com/uber-go/tally" "go.uber.org/zap" + "github.com/m3db/m3/src/aggregator/aggregator/handler/filter" + handlerWriter "github.com/m3db/m3/src/aggregator/aggregator/handler/writer" + "github.com/m3db/m3/src/aggregator/sharding" "github.com/m3db/m3/src/cluster/services" + "github.com/m3db/m3/src/metrics/policy" "github.com/m3db/m3/src/msg/producer" "github.com/m3db/m3/src/msg/topic" xerrors "github.com/m3db/m3/src/x/errors" @@ -158,6 +162,7 @@ func (w *writer) process(update interface{}) error { if err := t.Validate(); err != nil { return err } + // We don't allow changing number of shards for topics, it will be // prevented on topic service side, but also being defensive here as well. numShards := w.NumShards() @@ -174,9 +179,47 @@ func (w *writer) process(update interface{}) error { for _, cs := range t.ConsumerServices() { key := cs.ServiceID().String() csw, ok := w.consumerServiceWriters[key] + if ok { + // update existing consumer service writer + csw.SetMessageTTLNanos(cs.MessageTTLNanos()) + + if cs.DynamicFilterConfigs() != nil { + dynamicFilters, err := ParseDynamicFilters(csw, cs.DynamicFilterConfigs()) + + if err != nil { + w.logger.Error("could not update dynamic filters on consumer service writer, error registering dynamic filters", + zap.String("writer", cs.String()), zap.Error(err)) + + multiErr = multiErr.Add(err) + } else { + // unregister all filters and register the new ones + w.Lock() + + csw.UnregisterFilters() + for _, dynamicFilter := range dynamicFilters { + csw.RegisterFilter(dynamicFilter) + } + + w.Unlock() + } + } else { + // sending no dynamic filters means we should remove all filters, unless there are static filters + // if there are no static filters, remove all filters + _, ok := w.filterRegistry[key] + + if !ok { + w.Lock() + csw.UnregisterFilters() + w.Unlock() + } + } + newConsumerServiceWriters[key] = csw + + w.logger.Info("Updated consumer service writer", zap.String("consumer-service", cs.String())) + continue } scope := iOpts.MetricsScope().Tagged(map[string]string{ @@ -185,13 +228,50 @@ func (w *writer) process(update interface{}) error { "consumer-service-env": cs.ServiceID().Environment(), "consumption-type": cs.ConsumptionType().String(), }) + + // create new consumer service writer csw, err := newConsumerServiceWriter(cs, t.NumberOfShards(), w.opts.SetInstrumentOptions(iOpts.SetMetricsScope(scope))) + if err != nil { w.logger.Error("could not create consumer service writer", zap.String("writer", cs.String()), zap.Error(err)) multiErr = multiErr.Add(err) continue } + + // if there are dynamicly configured filters, they are the source of truth + if cs.DynamicFilterConfigs() != nil { + dynamicFilters, err := ParseDynamicFilters(csw, cs.DynamicFilterConfigs()) + + if err != nil { + w.logger.Error("could not create consumer service writer, error registering dynamic filters", + zap.String("writer", cs.String()), zap.Error(err)) + + multiErr = multiErr.Add(err) + continue + } else { + w.Lock() + + for _, dynamicFilter := range dynamicFilters { + csw.RegisterFilter(dynamicFilter) + } + + w.Unlock() + } + + } else { + w.Lock() + + // if there are no dynamicly configured filters, static filters are the source of truth + if staticFilters, ok := w.filterRegistry[key]; ok { + for _, staticFilter := range staticFilters { + csw.RegisterFilter(staticFilter) + } + } + + w.Unlock() + } + if err = csw.Init(w.initType); err != nil { w.logger.Error("could not init consumer service writer", zap.String("writer", cs.String()), zap.Error(err)) @@ -220,13 +300,7 @@ func (w *writer) process(update interface{}) error { // Apply the new consumer service writers. w.Lock() - for key, csw := range newConsumerServiceWriters { - if filters, ok := w.filterRegistry[key]; ok { - for _, filter := range filters { - csw.RegisterFilter(filter) - } - } - } + w.consumerServiceWriters = newConsumerServiceWriters w.numShards = t.NumberOfShards() w.Unlock() @@ -285,3 +359,100 @@ func (w *writer) UnregisterFilters(sid services.ServiceID) { csw.UnregisterFilters() } } + +// ParseDynamicFilters parses the dynamic filters for a consumer service from a topic update. +func ParseDynamicFilters(csw consumerServiceWriter, filterConfig topic.FilterConfig) ([]producer.FilterFunc, error) { + filterFuncs := []producer.FilterFunc{} + + if filterConfig == nil { + return filterFuncs, errors.New("nil filter config") + } + + if filterConfig.ShardSetFilter() != nil { + shardSetFilterFunc, err := ParseShardSetFilterFromTopicUpdate(csw, filterConfig.ShardSetFilter()) + + if err != nil { + return filterFuncs, fmt.Errorf("Error registering shard set filter: %w", err) + } + + filterFuncs = append(filterFuncs, shardSetFilterFunc) + } + + if filterConfig.StoragePolicyFilter() != nil { + storagePolicyFilterFunc, err := ParseStoragePolicyFilterFromTopicUpdate(csw, filterConfig.StoragePolicyFilter()) + + if err != nil { + return filterFuncs, fmt.Errorf("Error registering storage policy filter: %w", err) + } + + filterFuncs = append(filterFuncs, storagePolicyFilterFunc) + } + + if filterConfig.PercentageFilter() != nil { + percentageFilterFunc, err := ParsePercentageFilterFromFromTopicUpdate(csw, filterConfig.PercentageFilter()) + + if err != nil { + return filterFuncs, fmt.Errorf("Error registering percentage filter: %w", err) + } + + filterFuncs = append(filterFuncs, percentageFilterFunc) + } + + return filterFuncs, nil +} + +// ParseShardSetFilterFromTopicUpdate parses a shard set filter from a topic update. +func ParseShardSetFilterFromTopicUpdate( + csw consumerServiceWriter, + ssf topic.ShardSetFilter) (producer.FilterFunc, error) { + var filterFunc producer.FilterFunc + + shardSetString := ssf.ShardSet() + + shardSet, err := sharding.ParseShardSet(shardSetString) + + if err != nil { + return filterFunc, errors.New("Error parsing shard set") + } + + filterFunc = filter.NewShardSetFilter(shardSet, producer.DynamicConfig) + + return filterFunc, nil +} + +// ParseStoragePolicyFilterFromTopicUpdate parses a storage policy filter from a topic update. +func ParseStoragePolicyFilterFromTopicUpdate( + csw consumerServiceWriter, + spf topic.StoragePolicyFilter) (producer.FilterFunc, error) { + var filterFunc producer.FilterFunc + + storagePolicies := spf.StoragePolicies() + + parsedPolicies := []policy.StoragePolicy{} + for _, storagePolicyString := range storagePolicies { + parsedPolicy, err := policy.ParseStoragePolicy(storagePolicyString) + + if err != nil { + return filterFunc, fmt.Errorf("Error parsing storage policy: %w", err) + } + + parsedPolicies = append(parsedPolicies, parsedPolicy) + + filterFunc = handlerWriter.NewStoragePolicyFilter(parsedPolicies, producer.DynamicConfig) + } + + return filterFunc, nil +} + +// ParsePercentageFilterFromFromTopicUpdate parses a percentage filter from a topic update. +func ParsePercentageFilterFromFromTopicUpdate( + csw consumerServiceWriter, + pf topic.PercentageFilter) (producer.FilterFunc, error) { + var filterFunc producer.FilterFunc + + percentage := pf.Percentage() + + filterFunc = filter.NewPercentageFilter(percentage, producer.DynamicConfig) + + return filterFunc, nil +} diff --git a/src/msg/producer/writer/writer_test.go b/src/msg/producer/writer/writer_test.go index e929455a54..737cb1b176 100644 --- a/src/msg/producer/writer/writer_test.go +++ b/src/msg/producer/writer/writer_test.go @@ -211,8 +211,15 @@ func TestWriterRegisterFilter(t *testing.T) { csw1 := NewMockconsumerServiceWriter(ctrl) sid2 := services.NewServiceID().SetName("s2") - filter := func(producer.Message) bool { return false } - filter2 := func(producer.Message) bool { return true } + filter := producer.NewFilterFunc( + func(producer.Message) bool { return false }, + producer.UnspecifiedFilter, + producer.StaticConfig) + + filter2 := producer.NewFilterFunc( + func(producer.Message) bool { return true }, + producer.UnspecifiedFilter, + producer.StaticConfig) w := NewWriter(opts).(*writer) w.consumerServiceWriters[cs1.ServiceID().String()] = csw1 @@ -236,7 +243,6 @@ func TestWriterRegisterFilter(t *testing.T) { csw1.EXPECT().RegisterFilter(gomock.Any()) w.RegisterFilter(sid1, filter) - csw1.EXPECT().RegisterFilter(gomock.Any()) csw1.EXPECT().SetMessageTTLNanos(int64(0)) testTopic := topic.NewTopic(). SetName(opts.TopicName()). @@ -454,6 +460,9 @@ func TestTopicUpdateWithSameConsumerServicesButDifferentOrder(t *testing.T) { w.consumerServiceWriters[cs2.ServiceID().String()] = cswMock2 defer csw.Close() + cswMock1.EXPECT().UnregisterFilters() + cswMock2.EXPECT().UnregisterFilters() + cswMock1.EXPECT().SetMessageTTLNanos(int64(0)) cswMock2.EXPECT().SetMessageTTLNanos(int64(500)) testTopic = testTopic. @@ -850,3 +859,382 @@ func TestWriterNumShards(t *testing.T) { require.NoError(t, w.Init()) require.Equal(t, 2, int(w.NumShards())) } + +func TestDynamicConsumerServiceWriterFilters(t *testing.T) { + testDynamicFilterConfig := topic.NewFilterConfig(). + SetPercentageFilter( + topic.NewPercentageFilter(50), + ). + SetShardSetFilter( + topic.NewShardSetFilter("1..5"), + ). + SetStoragePolicyFilter( + topic.NewStoragePolicyFilter([]string{"1m:40d"}), + ) + + type testTopicUpdate struct { + dynamicFilterConfig topic.FilterConfig + expectedDataFilters []producer.FilterFuncMetadata + expectTopicUpdateError bool + expectTopicValidationError bool + expectedCswCount int + } + + type testCase struct { + name string + staticFilters []producer.FilterFuncType + topicUpdate1 testTopicUpdate + topicUpdate2 *testTopicUpdate + } + + tests := []testCase{ + { + name: "No_Static_Filters_One_Topic_Update_With_Dynamic_Filters", + staticFilters: []producer.FilterFuncType{}, + topicUpdate1: testTopicUpdate{ + dynamicFilterConfig: testDynamicFilterConfig, + expectedDataFilters: []producer.FilterFuncMetadata{ + {FilterType: producer.PercentageFilter, SourceType: producer.DynamicConfig}, + {FilterType: producer.ShardSetFilter, SourceType: producer.DynamicConfig}, + {FilterType: producer.StoragePolicyFilter, SourceType: producer.DynamicConfig}, + {FilterType: producer.AcceptAllFilter, SourceType: producer.StaticConfig}, + }, + expectedCswCount: 1, + }, + topicUpdate2: nil, + }, + + { + name: "Has_Static_Filters_One_Topic_Update_With_Dynamic_Filters", + staticFilters: []producer.FilterFuncType{ + producer.PercentageFilter, + producer.ShardSetFilter, + producer.StoragePolicyFilter}, + + topicUpdate1: testTopicUpdate{ + dynamicFilterConfig: testDynamicFilterConfig, + expectedDataFilters: []producer.FilterFuncMetadata{ + {FilterType: producer.PercentageFilter, SourceType: producer.DynamicConfig}, + {FilterType: producer.ShardSetFilter, SourceType: producer.DynamicConfig}, + {FilterType: producer.StoragePolicyFilter, SourceType: producer.DynamicConfig}, + {FilterType: producer.AcceptAllFilter, SourceType: producer.StaticConfig}, + }, + expectedCswCount: 1, + }, + topicUpdate2: nil, + }, + + { + name: "Has_Static_Filters_Two_Topic_Updates_With_No_Dynamic_Filters", + staticFilters: []producer.FilterFuncType{ + producer.PercentageFilter, + producer.ShardSetFilter, + producer.StoragePolicyFilter}, + topicUpdate1: testTopicUpdate{ + dynamicFilterConfig: nil, + expectedDataFilters: []producer.FilterFuncMetadata{ + {FilterType: producer.PercentageFilter, SourceType: producer.StaticConfig}, + {FilterType: producer.ShardSetFilter, SourceType: producer.StaticConfig}, + {FilterType: producer.StoragePolicyFilter, SourceType: producer.StaticConfig}, + {FilterType: producer.AcceptAllFilter, SourceType: producer.StaticConfig}, + }, + expectedCswCount: 1, + }, + topicUpdate2: &testTopicUpdate{ + dynamicFilterConfig: nil, + expectedDataFilters: []producer.FilterFuncMetadata{ + {FilterType: producer.PercentageFilter, SourceType: producer.StaticConfig}, + {FilterType: producer.ShardSetFilter, SourceType: producer.StaticConfig}, + {FilterType: producer.StoragePolicyFilter, SourceType: producer.StaticConfig}, + {FilterType: producer.AcceptAllFilter, SourceType: producer.StaticConfig}, + }, + expectedCswCount: 1, + }, + }, + + { + name: "No_Static_Config_Two_Topic_Updates_With_Different_Dynamic_Filters", + staticFilters: []producer.FilterFuncType{}, + topicUpdate1: testTopicUpdate{ + dynamicFilterConfig: testDynamicFilterConfig, + expectedDataFilters: []producer.FilterFuncMetadata{ + {FilterType: producer.PercentageFilter, SourceType: producer.DynamicConfig}, + {FilterType: producer.ShardSetFilter, SourceType: producer.DynamicConfig}, + {FilterType: producer.StoragePolicyFilter, SourceType: producer.DynamicConfig}, + {FilterType: producer.AcceptAllFilter, SourceType: producer.StaticConfig}, + }, + expectedCswCount: 1, + }, + topicUpdate2: &testTopicUpdate{ + dynamicFilterConfig: topic.NewFilterConfig().SetPercentageFilter(topic.NewPercentageFilter(75)), + expectedDataFilters: []producer.FilterFuncMetadata{ + {FilterType: producer.PercentageFilter, SourceType: producer.DynamicConfig}, + {FilterType: producer.AcceptAllFilter, SourceType: producer.StaticConfig}, + }, + expectedCswCount: 1, + }, + }, + + { + name: "No_Static_Config_One_Topic_Update_With_Invalid_Dynamic_Shard_Set_Filter", + staticFilters: []producer.FilterFuncType{}, + topicUpdate1: testTopicUpdate{ + dynamicFilterConfig: topic.NewFilterConfig(). + SetShardSetFilter(topic.NewShardSetFilter("randomstringstrinxyz123abc")), + expectedDataFilters: []producer.FilterFuncMetadata{}, + expectTopicUpdateError: true, + expectedCswCount: 0, + }, + topicUpdate2: nil, + }, + + { + name: "No_Static_Config_One_Topic_Update_With_Invalid_Dynamic_Percentage_Filter", + staticFilters: []producer.FilterFuncType{}, + topicUpdate1: testTopicUpdate{ + dynamicFilterConfig: topic.NewFilterConfig().SetPercentageFilter(topic.NewPercentageFilter(99999)), + expectedDataFilters: []producer.FilterFuncMetadata{}, + expectTopicValidationError: true, + expectedCswCount: 0, + }, + topicUpdate2: nil, + }, + + { + // nolint:lll + name: "No_Static_Config_First_Topic_Update_With_Dynamic_Storage_Policy_Filter_Second_Topic_Update_With_Invalid_Dynamic_Shard_Set_Filter", + staticFilters: []producer.FilterFuncType{}, + topicUpdate1: testTopicUpdate{ + dynamicFilterConfig: topic.NewFilterConfig(). + SetStoragePolicyFilter(topic.NewStoragePolicyFilter([]string{"1m:40d"})), + expectedDataFilters: []producer.FilterFuncMetadata{ + {FilterType: producer.StoragePolicyFilter, SourceType: producer.DynamicConfig}, + {FilterType: producer.AcceptAllFilter, SourceType: producer.StaticConfig}, + }, + expectedCswCount: 1, + }, + topicUpdate2: &testTopicUpdate{ + dynamicFilterConfig: topic.NewFilterConfig(). + SetShardSetFilter(topic.NewShardSetFilter("randomstringstrinxyz123abc")), + expectedDataFilters: []producer.FilterFuncMetadata{ + {FilterType: producer.AcceptAllFilter, SourceType: producer.StaticConfig}, + // second update should not be applied + {FilterType: producer.StoragePolicyFilter, SourceType: producer.DynamicConfig}, + }, + expectedCswCount: 1, + }, + }, + + { + // nolint:lll + name: "No_Static_Config_Two_Topic_Updates_First_Update_Adds_Dynamic_Filters_Second_Update_Removes_Dynamic_Filters", + staticFilters: []producer.FilterFuncType{}, + topicUpdate1: testTopicUpdate{ + dynamicFilterConfig: testDynamicFilterConfig, + expectedDataFilters: []producer.FilterFuncMetadata{ + {FilterType: producer.PercentageFilter, SourceType: producer.DynamicConfig}, + {FilterType: producer.ShardSetFilter, SourceType: producer.DynamicConfig}, + {FilterType: producer.StoragePolicyFilter, SourceType: producer.DynamicConfig}, + {FilterType: producer.AcceptAllFilter, SourceType: producer.StaticConfig}, + }, + expectedCswCount: 1, + }, + topicUpdate2: &testTopicUpdate{ + dynamicFilterConfig: nil, + expectedDataFilters: []producer.FilterFuncMetadata{ + {FilterType: producer.AcceptAllFilter, SourceType: producer.StaticConfig}, + }, + expectedCswCount: 1, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + defer leaktest.Check(t)() + + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + store := mem.NewStore() + cs := client.NewMockClient(ctrl) + cs.EXPECT().Store(gomock.Any()).Return(store, nil) + + ts, err := topic.NewService(topic.NewServiceOptions().SetConfigService(cs)) + require.NoError(t, err, "expect no error after reading topic service") + + opts := testOptions().SetTopicService(ts) + + sid1 := services.NewServiceID().SetName("s1") + + cs1 := topic.NewConsumerService().SetConsumptionType(topic.Replicated).SetServiceID(sid1) + + if test.topicUpdate1.dynamicFilterConfig != nil { + cs1 = cs1.SetDynamicFilterConfigs(test.topicUpdate1.dynamicFilterConfig) + } + + testTopic := topic.NewTopic(). + SetName(opts.TopicName()). + SetNumberOfShards(1). + SetConsumerServices([]topic.ConsumerService{cs1}) + _, err = ts.CheckAndSet(testTopic, kv.UninitializedVersion) + + if test.topicUpdate1.expectTopicValidationError { + require.Error(t, err, "expect error after setting topic") + return + } + require.NoError(t, err, "expect no error after setting topic") + + sd := services.NewMockServices(ctrl) + opts = opts.SetServiceDiscovery(sd) + ps1 := testPlacementService(store, sid1) + sd.EXPECT().PlacementService(sid1, gomock.Any()).Return(ps1, nil) + + p1 := placement.NewPlacement(). + SetInstances([]placement.Instance{ + placement.NewInstance(). + SetID("i1"). + SetEndpoint("i1"). + SetShards(shard.NewShards([]shard.Shard{ + shard.NewShard(0).SetState(shard.Available), + })), + }). + SetShards([]uint32{0}). + SetReplicaFactor(1). + SetIsSharded(true) + _, err = ps1.Set(p1) + require.NoError(t, err, "expect no error after setting placement") + + w := NewWriter(opts).(*writer) + + for _, filterType := range test.staticFilters { + w.RegisterFilter( + sid1, + producer.NewFilterFunc(func(producer.Message) bool { return true }, filterType, producer.StaticConfig)) + } + + called := atomic.NewInt32(0) + w.processFn = func(update interface{}) error { + called.Inc() + return w.process(update) + } + + err = w.Init() + + if test.topicUpdate1.expectTopicUpdateError { + require.Error(t, err, "expect error after writer init") + } else { + require.NoError(t, err, "expect no error after writer init") + } + + require.Equal(t, 1, int(called.Load()), "expect processFn to be called once") + require.Equal( + t, + test.topicUpdate1.expectedCswCount, + len(w.consumerServiceWriters), + "expect csw count to match after 1st topic update") + + if test.topicUpdate1.expectedCswCount > 0 { + csw, ok := w.consumerServiceWriters[cs1.ServiceID().String()] + require.True(t, ok, "expect csw to exist after 1st topic update") + + actualDataFilterFuncs := csw.GetDataFilters() + actualDataFilterMetadatas := []producer.FilterFuncMetadata{} + for _, filter := range actualDataFilterFuncs { + actualDataFilterMetadatas = append(actualDataFilterMetadatas, filter.Metadata) + } + + require.True(t, + testAreFilterFuncMetadataSlicesEqual( + actualDataFilterMetadatas, + test.topicUpdate1.expectedDataFilters), + "expect data filters to match after 1st topic update", + actualDataFilterMetadatas, + test.topicUpdate1.expectedDataFilters) + + if test.topicUpdate2 == nil { + csw.Close() + } + } + + if test.topicUpdate2 != nil { + cs1 = cs1.SetDynamicFilterConfigs(nil) + + if test.topicUpdate2.dynamicFilterConfig != nil { + cs1 = cs1.SetDynamicFilterConfigs(test.topicUpdate2.dynamicFilterConfig) + } + + testTopic = testTopic. + SetConsumerServices([]topic.ConsumerService{cs1}). + SetVersion(1) + _, err = ts.CheckAndSet(testTopic, 1) + require.NoError(t, err, "expect no error after 2nd topic update") + + for called.Load() != 2 { + time.Sleep(100 * time.Millisecond) + } + + w.RLock() + + require.Equal( + t, + test.topicUpdate1.expectedCswCount, + len(w.consumerServiceWriters), + "expect csw count to match after 2nd topic update") + + w.RUnlock() + + if test.topicUpdate2.expectedCswCount == 0 { + return + } + + csw, ok := w.consumerServiceWriters[cs1.ServiceID().String()] + require.True(t, ok, "expect csw to exist after 2nd topic update") + + actualDataFilterFuncs := csw.GetDataFilters() + actualDataFilterMetadatas := []producer.FilterFuncMetadata{} + for _, filter := range actualDataFilterFuncs { + actualDataFilterMetadatas = append(actualDataFilterMetadatas, filter.Metadata) + } + + require.True(t, + testAreFilterFuncMetadataSlicesEqual( + actualDataFilterMetadatas, + test.topicUpdate2.expectedDataFilters), + "expect data filters to match after 2nd topic update", + actualDataFilterMetadatas, + test.topicUpdate2.expectedDataFilters) + + csw.Close() + } + + w.Close() + }) + } +} + +func testAreFilterFuncMetadataSlicesEqual(slice1, slice2 []producer.FilterFuncMetadata) bool { + if len(slice1) != len(slice2) { + return false + } + + countMap := make(map[producer.FilterFuncMetadata]int) + for _, item := range slice1 { + countMap[item]++ + } + + for _, item := range slice2 { + if countMap[item] == 0 { + return false + } + countMap[item]-- + } + + for _, count := range countMap { + if count != 0 { + return false + } + } + + return true +} diff --git a/src/msg/topic/topic.go b/src/msg/topic/topic.go index b3a7885686..5a5b8d7345 100644 --- a/src/msg/topic/topic.go +++ b/src/msg/topic/topic.go @@ -186,7 +186,42 @@ func (t *topic) Validate() error { return fmt.Errorf("invalid topic: duplicated consumer %s", cs.ServiceID().String()) } uniqConsumers[cs.ServiceID().String()] = struct{}{} + + filterConfig := cs.DynamicFilterConfigs() + + if filterConfig != nil { + shardSetFilter := filterConfig.ShardSetFilter() + if shardSetFilter != nil { + shardSet := shardSetFilter.ShardSet() + if shardSet == "" { + return fmt.Errorf("invalid topic: empty shard set in filter for consumer %s", cs.ServiceID().String()) + } + } + + percentageFilter := filterConfig.PercentageFilter() + if percentageFilter != nil { + percentage := percentageFilter.Percentage() + if percentage <= 0 || percentage >= 100 { + return fmt.Errorf("invalid topic: invalid percentage in filter for consumer %s", cs.ServiceID().String()) + } + } + + storagePolicyFilter := filterConfig.StoragePolicyFilter() + if storagePolicyFilter != nil { + storagePolicies := storagePolicyFilter.StoragePolicies() + if len(storagePolicies) == 0 { + return fmt.Errorf("invalid topic: empty storage policy filter for consumer %s", cs.ServiceID().String()) + } + + for _, storagePolicy := range storagePolicies { + if storagePolicy == "" { + return fmt.Errorf("invalid topic: empty storage policy in filter for consumer %s", cs.ServiceID().String()) + } + } + } + } } + return nil } @@ -208,10 +243,109 @@ func ToProto(t Topic) (*topicpb.Topic, error) { }, nil } +type filterConfig struct { + shardSetFilterConfig *shardSetFilter + percentageFilterConfig *percentageFilter + storagePolicyFilterConfig *storagePolicyFilter +} + +// NewFilterConfig creates a new filter config. +func NewFilterConfig() FilterConfig { + return new(filterConfig) +} + +func (fc *filterConfig) ShardSetFilter() ShardSetFilter { + if fc.shardSetFilterConfig == nil { + return nil + } + + return fc.shardSetFilterConfig +} + +func (fc *filterConfig) SetShardSetFilter(value ShardSetFilter) FilterConfig { + newfc := *fc + if value != nil { + newfc.shardSetFilterConfig = value.(*shardSetFilter) + } + return &newfc +} + +func (fc *filterConfig) PercentageFilter() PercentageFilter { + if fc.percentageFilterConfig == nil { + return nil + } + + return fc.percentageFilterConfig +} + +func (fc *filterConfig) SetPercentageFilter(value PercentageFilter) FilterConfig { + newfc := *fc + if value != nil { + newfc.percentageFilterConfig = value.(*percentageFilter) + } + return &newfc +} + +func (fc *filterConfig) StoragePolicyFilter() StoragePolicyFilter { + if fc.storagePolicyFilterConfig == nil { + return nil + } + + return fc.storagePolicyFilterConfig +} + +func (fc *filterConfig) SetStoragePolicyFilter(value StoragePolicyFilter) FilterConfig { + newfc := *fc + if value != nil { + newfc.storagePolicyFilterConfig = value.(*storagePolicyFilter) + } + return &newfc +} + +type shardSetFilter struct { + shardSet string +} + +func (filter *shardSetFilter) ShardSet() string { + return filter.shardSet +} + +// NewShardSetFilter creates a new shard set filter. +func NewShardSetFilter(shardSet string) ShardSetFilter { + return &shardSetFilter{shardSet: shardSet} +} + +type percentageFilter struct { + percentage float64 +} + +// NewPercentageFilter creates a new percentage filter. +func NewPercentageFilter(percentage float64) PercentageFilter { + return &percentageFilter{percentage: percentage} +} + +func (filter *percentageFilter) Percentage() float64 { + return filter.percentage +} + +type storagePolicyFilter struct { + storagePolicies []string +} + +// NewStoragePolicyFilter creates a new storage policy filter. +func NewStoragePolicyFilter(storagePolicies []string) StoragePolicyFilter { + return &storagePolicyFilter{storagePolicies: storagePolicies} +} + +func (filter *storagePolicyFilter) StoragePolicies() []string { + return filter.storagePolicies +} + type consumerService struct { - sid services.ServiceID - ct ConsumptionType - ttlNanos int64 + sid services.ServiceID + ct ConsumptionType + ttlNanos int64 + filterConfigs *filterConfig } // NewConsumerService creates a ConsumerService. @@ -228,7 +362,8 @@ func NewConsumerServiceFromProto(cs *topicpb.ConsumerService) (ConsumerService, return NewConsumerService(). SetServiceID(NewServiceIDFromProto(cs.ServiceId)). SetConsumptionType(ct). - SetMessageTTLNanos(cs.MessageTtlNanos), nil + SetMessageTTLNanos(cs.MessageTtlNanos). + SetDynamicFilterConfigs(NewDynamicFilterConfigFromProto(cs.Filters)), nil } // ConsumerServiceToProto creates proto from a ConsumerService. @@ -241,6 +376,7 @@ func ConsumerServiceToProto(cs ConsumerService) (*topicpb.ConsumerService, error ConsumptionType: ct, ServiceId: ServiceIDToProto(cs.ServiceID()), MessageTtlNanos: cs.MessageTTLNanos(), + Filters: DynamicFilterConfigToProto(cs.DynamicFilterConfigs()), }, nil } @@ -274,6 +410,24 @@ func (cs *consumerService) SetMessageTTLNanos(value int64) ConsumerService { return &newcs } +func (cs *consumerService) DynamicFilterConfigs() FilterConfig { + if cs.filterConfigs == nil { + return nil + } + + return cs.filterConfigs +} + +func (cs *consumerService) SetDynamicFilterConfigs(value FilterConfig) ConsumerService { + newcs := *cs + if value != nil { + newcs.filterConfigs = value.(*filterConfig) + } else { + newcs.filterConfigs = nil + } + return &newcs +} + func (cs *consumerService) String() string { var buf bytes.Buffer buf.WriteString("{") @@ -281,6 +435,18 @@ func (cs *consumerService) String() string { if cs.ttlNanos != 0 { buf.WriteString(fmt.Sprintf(", ttl: %v", time.Duration(cs.ttlNanos))) } + if cs.filterConfigs != nil { + if cs.filterConfigs.shardSetFilterConfig != nil { + buf.WriteString(fmt.Sprintf(", shard set filter: %s", cs.filterConfigs.shardSetFilterConfig.shardSet)) + } + if cs.filterConfigs.percentageFilterConfig != nil { + buf.WriteString(fmt.Sprintf(", percentage filter: %v", cs.filterConfigs.percentageFilterConfig.percentage)) + } + if cs.filterConfigs.storagePolicyFilterConfig != nil { + buf.WriteString( + fmt.Sprintf(", storage policy filter: %v", cs.filterConfigs.storagePolicyFilterConfig.storagePolicies)) + } + } buf.WriteString("}") return buf.String() } @@ -298,3 +464,64 @@ func ServiceIDToProto(sid services.ServiceID) *topicpb.ServiceID { Zone: sid.Zone(), } } + +// NewDynamicFilterConfigFromProto creates filter config from a proto. +func NewDynamicFilterConfigFromProto(filterProto *topicpb.Filters) FilterConfig { + if filterProto == nil { + return nil + } + + filter := filterConfig{} + if filterProto.ShardSetFilter != nil { + filter.shardSetFilterConfig = &shardSetFilter{shardSet: filterProto.ShardSetFilter.ShardSet} + } + if filterProto.PercentageFilter != nil { + filter.percentageFilterConfig = &percentageFilter{percentage: filterProto.PercentageFilter.Percentage} + } + if filterProto.StoragePolicyFilter != nil { + filter.storagePolicyFilterConfig = &storagePolicyFilter{ + storagePolicies: filterProto.StoragePolicyFilter.StoragePolicies} + } + + return &filter +} + +// DynamicFilterConfigToProto creates proto from a filter config. +func DynamicFilterConfigToProto(filter FilterConfig) *topicpb.Filters { + if filter == nil { + return nil + } + + return &topicpb.Filters{ + ShardSetFilter: ShardSetFilterToProto(filter.ShardSetFilter()), + PercentageFilter: PercentageFilterToProto(filter.PercentageFilter()), + StoragePolicyFilter: StoragePolicyFilterToProto(filter.StoragePolicyFilter()), + } +} + +// ShardSetFilterToProto creates proto from a shard set filter. +func ShardSetFilterToProto(filter ShardSetFilter) *topicpb.ShardSetFilter { + if filter == nil { + return nil + } + + return &topicpb.ShardSetFilter{ShardSet: filter.ShardSet()} +} + +// PercentageFilterToProto creates proto from a percentage filter. +func PercentageFilterToProto(filter PercentageFilter) *topicpb.PercentageFilter { + if filter == nil { + return nil + } + + return &topicpb.PercentageFilter{Percentage: filter.Percentage()} +} + +// StoragePolicyFilterToProto creates proto from a storage policy filter. +func StoragePolicyFilterToProto(filter StoragePolicyFilter) *topicpb.StoragePolicyFilter { + if filter == nil { + return nil + } + + return &topicpb.StoragePolicyFilter{StoragePolicies: filter.StoragePolicies()} +} diff --git a/src/msg/topic/topic_test.go b/src/msg/topic/topic_test.go index 776e2d9b9f..e58777a3bb 100644 --- a/src/msg/topic/topic_test.go +++ b/src/msg/topic/topic_test.go @@ -145,6 +145,15 @@ func TestTopicUpdateConsumer(t *testing.T) { } func TestTopicString(t *testing.T) { + percentageFilter := NewPercentageFilter(0.5) + shardSetFilter := NewShardSetFilter("10..23") + storagePolicyFilter := NewStoragePolicyFilter([]string{"1m:40d"}) + + filterConfig := NewFilterConfig(). + SetPercentageFilter(percentageFilter). + SetShardSetFilter(shardSetFilter). + SetStoragePolicyFilter(storagePolicyFilter) + cs1 := NewConsumerService(). SetConsumptionType(Shared). SetServiceID(services.NewServiceID(). @@ -159,7 +168,8 @@ func TestTopicString(t *testing.T) { SetEnvironment("env2"). SetZone("zone2"), ). - SetMessageTTLNanos(int64(time.Minute)) + SetMessageTTLNanos(int64(time.Minute)). + SetDynamicFilterConfigs(filterConfig) tpc := NewTopic(). SetName("testName"). SetNumberOfShards(1024). @@ -167,6 +177,7 @@ func TestTopicString(t *testing.T) { SetConsumerServices( []ConsumerService{cs1, cs2}, ) + //nolint:lll str := ` { version: 5 @@ -174,7 +185,7 @@ func TestTopicString(t *testing.T) { numOfShards: 1024 consumerServices: { {service: [name: s1, env: env1, zone: zone1], consumption type: shared} - {service: [name: s2, env: env2, zone: zone2], consumption type: shared, ttl: 1m0s} + {service: [name: s2, env: env2, zone: zone2], consumption type: shared, ttl: 1m0s, shard set filter: 10..23, percentage filter: 0.5, storage policy filter: [1m:40d]} } } ` @@ -222,6 +233,40 @@ func TestTopicValidation(t *testing.T) { }) err = topic.Validate() require.NoError(t, err) + + topic = topic.SetConsumerServices([]ConsumerService{ + cs1.SetDynamicFilterConfigs( + NewFilterConfig(). + SetPercentageFilter(NewPercentageFilter(0.4)). + SetStoragePolicyFilter(NewStoragePolicyFilter([]string{"1m:40d"})). + SetShardSetFilter(NewShardSetFilter("[10..23]"))), + }) + err = topic.Validate() + require.NoError(t, err) + + topic = topic.SetConsumerServices([]ConsumerService{ + cs1.SetDynamicFilterConfigs(NewFilterConfig().SetPercentageFilter(NewPercentageFilter(9999))), + }) + err = topic.Validate() + require.Contains(t, err.Error(), "invalid percentage") + + topic = topic.SetConsumerServices([]ConsumerService{ + cs1.SetDynamicFilterConfigs(NewFilterConfig().SetShardSetFilter(NewShardSetFilter(""))), + }) + err = topic.Validate() + require.Contains(t, err.Error(), "empty shard set") + + topic = topic.SetConsumerServices([]ConsumerService{ + cs1.SetDynamicFilterConfigs(NewFilterConfig().SetStoragePolicyFilter(NewStoragePolicyFilter([]string{}))), + }) + err = topic.Validate() + require.Contains(t, err.Error(), "empty storage policy") + + topic = topic.SetConsumerServices([]ConsumerService{ + cs1.SetDynamicFilterConfigs(NewFilterConfig().SetStoragePolicyFilter(NewStoragePolicyFilter([]string{""}))), + }) + err = topic.Validate() + require.Contains(t, err.Error(), "empty storage policy") } func TestConsumerService(t *testing.T) { diff --git a/src/msg/topic/types.go b/src/msg/topic/types.go index d6ced4fcbe..3125cc54a8 100644 --- a/src/msg/topic/types.go +++ b/src/msg/topic/types.go @@ -88,10 +88,53 @@ type ConsumerService interface { // SetMessageTTLNanos sets ttl for each message in nanoseconds. SetMessageTTLNanos(value int64) ConsumerService + // DynamicFilterConfigs returns the dynamic filters for the consumer service. + DynamicFilterConfigs() FilterConfig + + // SetDynamicFilterConfigs sets the dynamic filters for the consumer service. + SetDynamicFilterConfigs(value FilterConfig) ConsumerService + // String returns the string representation of the consumer service. String() string } +// FilterConfig is the filter configuration for a consumer service. +type FilterConfig interface { + // StoragePolicyFilter returns the storage policy data filter of the consumer service. + StoragePolicyFilter() StoragePolicyFilter + + // SetStoragePolicyFilter sets the storage policy data filter of the consumer service. + SetStoragePolicyFilter(value StoragePolicyFilter) FilterConfig + + // PercentageFilter returns the percentage data filter of the consumer service. + PercentageFilter() PercentageFilter + + // SetPercentageFilter sets the percentage data filter of the consumer service. + SetPercentageFilter(value PercentageFilter) FilterConfig + + // ShardSetFilter returns the shard set data filter of the consumer service. + ShardSetFilter() ShardSetFilter + + // SetShardSetFilter sets the shard set data filter of the consumer service. + SetShardSetFilter(value ShardSetFilter) FilterConfig +} + +// PercentageFilter is sampling percentage filter for a consumer service. +type PercentageFilter interface { + Percentage() float64 +} + +// StoragePolicyFilter is the storage policy filter for a consumer service, +// filters metrics based on their storage policies. +type StoragePolicyFilter interface { + StoragePolicies() []string +} + +// ShardSetFilter is the shard set filter for a consumer service, filters metrics based on specified shard sets. +type ShardSetFilter interface { + ShardSet() string +} + // Watch watches the updates of a topic. type Watch interface { // C returns the notification channel.