diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/go-concurrency-exercises.iml b/.idea/go-concurrency-exercises.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/.idea/go-concurrency-exercises.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..2c0c266 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/0-limit-crawler/main.go b/0-limit-crawler/main.go index ddadd14..37b7c93 100644 --- a/0-limit-crawler/main.go +++ b/0-limit-crawler/main.go @@ -12,17 +12,20 @@ package main import ( "fmt" "sync" + "time" ) // Crawl uses `fetcher` from the `mockfetcher.go` file to imitate a // real crawler. It crawls until the maximum depth has reached. -func Crawl(url string, depth int, wg *sync.WaitGroup) { +func Crawl(url string, depth int, wg *sync.WaitGroup, ticker <-chan time.Time) { defer wg.Done() if depth <= 0 { return } + <-ticker + body, urls, err := fetcher.Fetch(url) if err != nil { fmt.Println(err) @@ -35,14 +38,16 @@ func Crawl(url string, depth int, wg *sync.WaitGroup) { for _, u := range urls { // Do not remove the `go` keyword, as Crawl() must be // called concurrently - go Crawl(u, depth-1, wg) + go Crawl(u, depth-1, wg, ticker) } } func main() { var wg sync.WaitGroup + ticker := time.NewTicker(1 * time.Second) + wg.Add(1) - Crawl("http://golang.org/", 4, &wg) + Crawl("http://golang.org/", 4, &wg, ticker.C) wg.Wait() } diff --git a/1-producer-consumer/main.go b/1-producer-consumer/main.go index e508e93..e763e20 100644 --- a/1-producer-consumer/main.go +++ b/1-producer-consumer/main.go @@ -10,26 +10,30 @@ package main import ( "fmt" + "sync" "time" ) -func producer(stream Stream) (tweets []*Tweet) { +func producer(stream Stream, tweetsChan chan<- *Tweet) { for { tweet, err := stream.Next() if err == ErrEOF { - return tweets + close(tweetsChan) + return } - tweets = append(tweets, tweet) + tweetsChan <- tweet } } -func consumer(tweets []*Tweet) { - for _, t := range tweets { - if t.IsTalkingAboutGo() { - fmt.Println(t.Username, "\ttweets about golang") +func consumer(tweetsChan <-chan *Tweet, wg *sync.WaitGroup) { + defer wg.Done() + + for tweet := range tweetsChan { + if tweet.IsTalkingAboutGo() { + fmt.Println(tweet.Username, "\ttweets about golang") } else { - fmt.Println(t.Username, "\tdoes not tweet about golang") + fmt.Println(tweet.Username, "\tdoes not tweet about golang") } } } @@ -37,12 +41,17 @@ func consumer(tweets []*Tweet) { func main() { start := time.Now() stream := GetMockStream() + var wg sync.WaitGroup + + tweetsChan := make(chan *Tweet) // Producer - tweets := producer(stream) + go producer(stream, tweetsChan) + wg.Add(1) // Consumer - consumer(tweets) + go consumer(tweetsChan, &wg) + wg.Wait() fmt.Printf("Process took %s\n", time.Since(start)) } diff --git a/2-race-in-cache/main.go b/2-race-in-cache/main.go index 7618dd1..5e10e9c 100644 --- a/2-race-in-cache/main.go +++ b/2-race-in-cache/main.go @@ -10,6 +10,7 @@ package main import ( "container/list" + "sync" "testing" ) @@ -32,6 +33,7 @@ type KeyStoreCache struct { cache map[string]*list.Element pages list.List load func(string) string + mutex *sync.Mutex } // New creates a new KeyStoreCache @@ -39,27 +41,41 @@ func New(load KeyStoreCacheLoader) *KeyStoreCache { return &KeyStoreCache{ load: load.Load, cache: make(map[string]*list.Element), + mutex: &sync.Mutex{}, } } // Get gets the key from cache, loads it from the source if needed func (k *KeyStoreCache) Get(key string) string { + + k.mutex.Lock() + defer k.mutex.Unlock() + if e, ok := k.cache[key]; ok { k.pages.MoveToFront(e) return e.Value.(page).Value } + // Miss - load from database and save it in cache p := page{key, k.load(key)} + // if cache is full remove the least used item if len(k.cache) >= CacheSize { + end := k.pages.Back() + // remove from map delete(k.cache, end.Value.(page).Key) + // remove from list k.pages.Remove(end) + } + k.pages.PushFront(p) + k.cache[key] = k.pages.Front() + return p.Value } diff --git a/2-race-in-cache/mockserver.go b/2-race-in-cache/mockserver.go index a60fab2..b84d376 100644 --- a/2-race-in-cache/mockserver.go +++ b/2-race-in-cache/mockserver.go @@ -31,7 +31,7 @@ func RunMockServer(cache *KeyStoreCache, t *testing.T) { go func(i int) { value := cache.Get("Test" + strconv.Itoa(i)) if t != nil { - if value != "Test" + strconv.Itoa(i) { + if value != "Test"+strconv.Itoa(i) { t.Errorf("Incorrect db response %v", value) } } diff --git a/3-limit-service-time/main.go b/3-limit-service-time/main.go index 6a0ebb3..1865d38 100644 --- a/3-limit-service-time/main.go +++ b/3-limit-service-time/main.go @@ -10,6 +10,8 @@ package main +import "time" + // User defines the UserModel. Use this to check whether a User is a // Premium user or not type User struct { @@ -21,8 +23,36 @@ type User struct { // HandleRequest runs the processes requested by users. Returns false // if process had to be killed func HandleRequest(process func(), u *User) bool { - process() - return true + var ( + startReqProc, endReqProc time.Time + ) + + done := make(chan struct{}) + + go func() { + startReqProc = time.Now() + process() + endReqProc = time.Now() + done <- struct{}{} + }() + + if u.IsPremium { + <-done + return true + } + + select { + case <-time.After(10 * time.Second): + return false + case <-done: + diff := endReqProc.Sub(startReqProc).Seconds() + u.TimeUsed += int64(diff) + if u.TimeUsed <= 10 { + return true + } + return false + } + } func main() { diff --git a/4-graceful-sigint/main.go b/4-graceful-sigint/main.go index 38c7d3d..bdd2e60 100644 --- a/4-graceful-sigint/main.go +++ b/4-graceful-sigint/main.go @@ -13,10 +13,38 @@ package main +import ( + "fmt" + "os" + "os/signal" + "syscall" + "time" +) + func main() { + sigsStop := make(chan os.Signal) + + signal.Notify(sigsStop, syscall.SIGINT, syscall.SIGTERM) + // Create a process proc := MockProcess{} + fmt.Println("The program is running. Press Ctrl+C to complete.") + // Run the process (blocking) - proc.Run() + go proc.Run() + sigStop := <-sigsStop + fmt.Printf("\nA signal has been received: %v.\n", sigStop) + + go proc.Stop() + fmt.Println("We are completing the work correctly...") + + select { + case <-time.After(5 * time.Second): + fmt.Println("\nThe correct shutdown has been completed.") + case sigStop = <-sigsStop: + fmt.Printf("\nA signal has been received: %v. Early termination of work.", sigStop) + } + + os.Exit(0) } diff --git a/5-session-cleaner/main.go b/5-session-cleaner/main.go index 352ecc3..d678f4d 100644 --- a/5-session-cleaner/main.go +++ b/5-session-cleaner/main.go @@ -20,23 +20,28 @@ package main import ( "errors" "log" + "sync" + "time" ) // SessionManager keeps track of all sessions from creation, updating // to destroying. type SessionManager struct { sessions map[string]Session + mu *sync.RWMutex } // Session stores the session's data type Session struct { - Data map[string]interface{} + Data map[string]interface{} + ActiveStatusCh chan struct{} } // NewSessionManager creates a new sessionManager func NewSessionManager() *SessionManager { m := &SessionManager{ sessions: make(map[string]Session), + mu: &sync.RWMutex{}, } return m @@ -50,9 +55,26 @@ func (m *SessionManager) CreateSession() (string, error) { } m.sessions[sessionID] = Session{ - Data: make(map[string]interface{}), + Data: make(map[string]interface{}), + ActiveStatusCh: make(chan struct{}), } + go func() { + + for { + select { + case <-time.After(5 * time.Second): + m.mu.Lock() + delete(m.sessions, sessionID) + m.mu.Unlock() + return + case <-m.sessions[sessionID].ActiveStatusCh: + continue + } + } + + }() + return sessionID, nil } @@ -63,7 +85,18 @@ var ErrSessionNotFound = errors.New("SessionID does not exists") // GetSessionData returns data related to session if sessionID is // found, errors otherwise func (m *SessionManager) GetSessionData(sessionID string) (map[string]interface{}, error) { - session, ok := m.sessions[sessionID] + session, ok := Session{}, false + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + m.mu.RLock() + defer m.mu.RUnlock() + session, ok = m.sessions[sessionID] + }() + + wg.Wait() + if !ok { return nil, ErrSessionNotFound } @@ -72,15 +105,37 @@ func (m *SessionManager) GetSessionData(sessionID string) (map[string]interface{ // UpdateSessionData overwrites the old session data with the new one func (m *SessionManager) UpdateSessionData(sessionID string, data map[string]interface{}) error { - _, ok := m.sessions[sessionID] + ok := false + wg := &sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + m.mu.RLock() + defer m.mu.RUnlock() + _, ok = m.sessions[sessionID] + }() + + wg.Wait() + if !ok { return ErrSessionNotFound } // Hint: you should renew expiry of the session here - m.sessions[sessionID] = Session{ - Data: data, - } + wg.Add(1) + go func() { + defer wg.Done() + m.mu.Lock() + defer m.mu.Unlock() + m.sessions[sessionID].ActiveStatusCh <- struct{}{} + m.sessions[sessionID] = Session{ + Data: data, + } + + }() + + wg.Wait() return nil } @@ -99,6 +154,8 @@ func main() { data := make(map[string]interface{}) data["website"] = "longhoang.de" + //time.Sleep(5 * time.Second) + err = m.UpdateSessionData(sID, data) if err != nil { log.Fatal(err)