Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for 1.18 generics #107

Merged
merged 4 commits into from
May 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore

This file was deleted.

4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ language: go
# You don't need to test on very old version of the Go compiler. It's the user's
# responsibility to keep their compilers up to date.
go:
- 1.12.x
- 1.18

# Only clone the most recent commit.
git:
Expand All @@ -29,4 +29,4 @@ before_script:
# .golangci.yml file at the top level of your repo.
script:
- golangci-lint run # run a bunch of code checkers/linters in parallel
- go test -v -race ./... # Run all the tests with the race detector enabled
- go test -v -race ./... # Run all the tests with the race detector enabled
94 changes: 47 additions & 47 deletions concurrent_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,29 @@ var SHARD_COUNT = 32

// A "thread" safe map of type string:Anything.
// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
type ConcurrentMap []*ConcurrentMapShared
type ConcurrentMap[V any] []*ConcurrentMapShared[V]

// A "thread" safe string to anything map.
type ConcurrentMapShared struct {
items map[string]interface{}
type ConcurrentMapShared[V any] struct {
items map[string]V
sync.RWMutex // Read Write mutex, guards access to internal map.
}

// Creates a new concurrent map.
func New() ConcurrentMap {
m := make(ConcurrentMap, SHARD_COUNT)
func New[V any]() ConcurrentMap[V] {
m := make(ConcurrentMap[V], SHARD_COUNT)
for i := 0; i < SHARD_COUNT; i++ {
m[i] = &ConcurrentMapShared{items: make(map[string]interface{})}
m[i] = &ConcurrentMapShared[V]{items: make(map[string]V)}
}
return m
}

// GetShard returns shard under given key
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {
func (m ConcurrentMap[V]) GetShard(key string) *ConcurrentMapShared[V] {
return m[uint(fnv32(key))%uint(SHARD_COUNT)]
}

func (m ConcurrentMap) MSet(data map[string]interface{}) {
func (m ConcurrentMap[V]) MSet(data map[string]V) {
for key, value := range data {
shard := m.GetShard(key)
shard.Lock()
Expand All @@ -41,7 +41,7 @@ func (m ConcurrentMap) MSet(data map[string]interface{}) {
}

// Sets the given value under the specified key.
func (m ConcurrentMap) Set(key string, value interface{}) {
func (m ConcurrentMap[V]) Set(key string, value V) {
// Get map shard.
shard := m.GetShard(key)
shard.Lock()
Expand All @@ -53,10 +53,10 @@ func (m ConcurrentMap) Set(key string, value interface{}) {
// It is called while lock is held, therefore it MUST NOT
// try to access other keys in same map, as it can lead to deadlock since
// Go sync.RWLock is not reentrant
type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{}
type UpsertCb[V any] func(exist bool, valueInMap V, newValue V) V

// Insert or Update - updates existing element or inserts a new one using UpsertCb
func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) {
func (m ConcurrentMap[V]) Upsert(key string, value V, cb UpsertCb[V]) (res V) {
shard := m.GetShard(key)
shard.Lock()
v, ok := shard.items[key]
Expand All @@ -67,7 +67,7 @@ func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res i
}

// Sets the given value under the specified key if no value was associated with it.
func (m ConcurrentMap) SetIfAbsent(key string, value interface{}) bool {
func (m ConcurrentMap[V]) SetIfAbsent(key string, value V) bool {
// Get map shard.
shard := m.GetShard(key)
shard.Lock()
Expand All @@ -80,7 +80,7 @@ func (m ConcurrentMap) SetIfAbsent(key string, value interface{}) bool {
}

// Get retrieves an element from map under given key.
func (m ConcurrentMap) Get(key string) (interface{}, bool) {
func (m ConcurrentMap[V]) Get(key string) (V, bool) {
// Get shard
shard := m.GetShard(key)
shard.RLock()
Expand All @@ -91,7 +91,7 @@ func (m ConcurrentMap) Get(key string) (interface{}, bool) {
}

// Count returns the number of elements within the map.
func (m ConcurrentMap) Count() int {
func (m ConcurrentMap[V]) Count() int {
count := 0
for i := 0; i < SHARD_COUNT; i++ {
shard := m[i]
Expand All @@ -103,7 +103,7 @@ func (m ConcurrentMap) Count() int {
}

// Looks up an item under specified key
func (m ConcurrentMap) Has(key string) bool {
func (m ConcurrentMap[V]) Has(key string) bool {
// Get shard
shard := m.GetShard(key)
shard.RLock()
Expand All @@ -114,7 +114,7 @@ func (m ConcurrentMap) Has(key string) bool {
}

// Remove removes an element from the map.
func (m ConcurrentMap) Remove(key string) {
func (m ConcurrentMap[V]) Remove(key string) {
// Try to get shard.
shard := m.GetShard(key)
shard.Lock()
Expand All @@ -124,12 +124,12 @@ func (m ConcurrentMap) Remove(key string) {

// RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held
// If returns true, the element will be removed from the map
type RemoveCb func(key string, v interface{}, exists bool) bool
type RemoveCb[V any] func(key string, v V, exists bool) bool

// RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params
// If callback returns true and element exists, it will remove it from the map
// Returns the value returned by the callback (even if element was not present in the map)
func (m ConcurrentMap) RemoveCb(key string, cb RemoveCb) bool {
func (m ConcurrentMap[V]) RemoveCb(key string, cb RemoveCb[V]) bool {
// Try to get shard.
shard := m.GetShard(key)
shard.Lock()
Expand All @@ -143,7 +143,7 @@ func (m ConcurrentMap) RemoveCb(key string, cb RemoveCb) bool {
}

// Pop removes an element from the map and returns it
func (m ConcurrentMap) Pop(key string) (v interface{}, exists bool) {
func (m ConcurrentMap[V]) Pop(key string) (v V, exists bool) {
// Try to get shard.
shard := m.GetShard(key)
shard.Lock()
Expand All @@ -154,40 +154,40 @@ func (m ConcurrentMap) Pop(key string) (v interface{}, exists bool) {
}

// IsEmpty checks if map is empty.
func (m ConcurrentMap) IsEmpty() bool {
func (m ConcurrentMap[V]) IsEmpty() bool {
return m.Count() == 0
}

// Used by the Iter & IterBuffered functions to wrap two variables together over a channel,
type Tuple struct {
type Tuple[V any] struct {
Key string
Val interface{}
Val V
}

// Iter returns an iterator which could be used in a for range loop.
//
// Deprecated: using IterBuffered() will get a better performence
func (m ConcurrentMap) Iter() <-chan Tuple {
func (m ConcurrentMap[V]) Iter() <-chan Tuple[V] {
chans := snapshot(m)
ch := make(chan Tuple)
ch := make(chan Tuple[V])
go fanIn(chans, ch)
return ch
}

// IterBuffered returns a buffered iterator which could be used in a for range loop.
func (m ConcurrentMap) IterBuffered() <-chan Tuple {
func (m ConcurrentMap[V]) IterBuffered() <-chan Tuple[V] {
chans := snapshot(m)
total := 0
for _, c := range chans {
total += cap(c)
}
ch := make(chan Tuple, total)
ch := make(chan Tuple[V], total)
go fanIn(chans, ch)
return ch
}

// Clear removes all items from map.
func (m ConcurrentMap) Clear() {
func (m ConcurrentMap[V]) Clear() {
for item := range m.IterBuffered() {
m.Remove(item.Key)
}
Expand All @@ -197,23 +197,23 @@ func (m ConcurrentMap) Clear() {
// which likely takes a snapshot of `m`.
// It returns once the size of each buffered channel is determined,
// before all the channels are populated using goroutines.
func snapshot(m ConcurrentMap) (chans []chan Tuple) {
func snapshot[V any](m ConcurrentMap[V]) (chans []chan Tuple[V]) {
//When you access map items before initializing.
if len(m) == 0{
if len(m) == 0 {
panic(`cmap.ConcurrentMap is not initialized. Should run New() before usage.`)
}
chans = make([]chan Tuple, SHARD_COUNT)
chans = make([]chan Tuple[V], SHARD_COUNT)
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
// Foreach shard.
for index, shard := range m {
go func(index int, shard *ConcurrentMapShared) {
go func(index int, shard *ConcurrentMapShared[V]) {
// Foreach key, value pair.
shard.RLock()
chans[index] = make(chan Tuple, len(shard.items))
chans[index] = make(chan Tuple[V], len(shard.items))
wg.Done()
for key, val := range shard.items {
chans[index] <- Tuple{key, val}
chans[index] <- Tuple[V]{key, val}
}
shard.RUnlock()
close(chans[index])
Expand All @@ -224,11 +224,11 @@ func snapshot(m ConcurrentMap) (chans []chan Tuple) {
}

// fanIn reads elements from channels `chans` into channel `out`
func fanIn(chans []chan Tuple, out chan Tuple) {
func fanIn[V any](chans []chan Tuple[V], out chan Tuple[V]) {
wg := sync.WaitGroup{}
wg.Add(len(chans))
for _, ch := range chans {
go func(ch chan Tuple) {
go func(ch chan Tuple[V]) {
for t := range ch {
out <- t
}
Expand All @@ -239,9 +239,9 @@ func fanIn(chans []chan Tuple, out chan Tuple) {
close(out)
}

// Items returns all items as map[string]interface{}
func (m ConcurrentMap) Items() map[string]interface{} {
tmp := make(map[string]interface{})
// Items returns all items as map[string]V
func (m ConcurrentMap[V]) Items() map[string]V {
tmp := make(map[string]V)

// Insert items to temporary map.
for item := range m.IterBuffered() {
Expand All @@ -251,15 +251,15 @@ func (m ConcurrentMap) Items() map[string]interface{} {
return tmp
}

// Iterator callback,called for every key,value found in
// Iterator callbacalled for every key,value found in
// maps. RLock is held for all calls for a given shard
// therefore callback sess consistent view of a shard,
// but not across the shards
type IterCb func(key string, v interface{})
type IterCb[V any] func(key string, v V)

// Callback based iterator, cheapest way to read
// all elements in a map.
func (m ConcurrentMap) IterCb(fn IterCb) {
func (m ConcurrentMap[V]) IterCb(fn IterCb[V]) {
for idx := range m {
shard := (m)[idx]
shard.RLock()
Expand All @@ -271,15 +271,15 @@ func (m ConcurrentMap) IterCb(fn IterCb) {
}

// Keys returns all keys as []string
func (m ConcurrentMap) Keys() []string {
func (m ConcurrentMap[V]) Keys() []string {
count := m.Count()
ch := make(chan string, count)
go func() {
// Foreach shard.
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
for _, shard := range m {
go func(shard *ConcurrentMapShared) {
go func(shard *ConcurrentMapShared[V]) {
// Foreach key, value pair.
shard.RLock()
for key := range shard.items {
Expand All @@ -302,9 +302,9 @@ func (m ConcurrentMap) Keys() []string {
}

//Reviles ConcurrentMap "private" variables to json marshal.
func (m ConcurrentMap) MarshalJSON() ([]byte, error) {
func (m ConcurrentMap[V]) MarshalJSON() ([]byte, error) {
// Create a temporary map, which will hold all item spread across shards.
tmp := make(map[string]interface{})
tmp := make(map[string]V)

// Insert items to temporary map.
for item := range m.IterBuffered() {
Expand All @@ -326,13 +326,13 @@ func fnv32(key string) uint32 {

// Concurrent map uses Interface{} as its value, therefor JSON Unmarshal
// will probably won't know which to type to unmarshal into, in such case
// we'll end up with a value of type map[string]interface{}, In most cases this isn't
// we'll end up with a value of type map[string]V, In most cases this isn't
// out value type, this is why we've decided to remove this functionality.

// func (m *ConcurrentMap) UnmarshalJSON(b []byte) (err error) {
// // Reverse process of Marshal.

// tmp := make(map[string]interface{})
// tmp := make(map[string]V)

// // Unmarshal into a single map.
// if err := json.Unmarshal(b, &tmp); err != nil {
Expand Down
Loading