Skip to content

Commit

Permalink
Replaced tabs with spaces
Browse files Browse the repository at this point in the history
  • Loading branch information
destel committed Apr 15, 2024
1 parent 4ca0504 commit 0a1227b
Showing 1 changed file with 94 additions and 94 deletions.
188 changes: 94 additions & 94 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,71 +35,71 @@ all while maintaining simplicity and efficiency.

```go
func main() {
urls := rill.FromSlice([]string{
"https://example.com/file1.txt",
"https://example.com/file2.txt",
"https://example.com/file3.txt",
"https://example.com/file4.txt",
}, nil)

// Fetch keys from each URL and flatten them into a single stream
keys := rill.FlatMap(urls, 3, func(url string) <-chan rill.Try[string] {
return streamFileLines(url)
})

// Exclude any empty keys from the stream
keys = rill.Filter(keys, 3, func(key string) (bool, error) {
return key != "", nil
})

// Organize keys into manageable batches of 10 for bulk operations
keyBatches := rill.Batch(keys, 10, 1*time.Second)

// Fetch values from DB for each batch of keys
resultBatches := rill.Map(keyBatches, 3, func(keys []string) ([]KV, error) {
values, err := kvMultiGet(keys...)
if err != nil {
return nil, err
}

results := make([]KV, len(keys))
for i, key := range keys {
results[i] = KV{Key: key, Value: values[i]}
}

return results, nil
})

// Convert batches back to a single items for final processing
results := rill.Unbatch(resultBatches)

// Exclude any empty values from the stream
results = rill.Filter(results, 3, func(kv KV) (bool, error) {
return kv.Value != "<nil>", nil
})

// Iterate over each key-value pair and print
cnt := 0
err := rill.ForEach(results, 1, func(kv KV) error {
fmt.Println(kv.Key, "=>", kv.Value)
cnt++
return nil
})
if err != nil {
fmt.Println("Error:", err)
}

fmt.Println("Total keys:", cnt)
urls := rill.FromSlice([]string{
"https://example.com/file1.txt",
"https://example.com/file2.txt",
"https://example.com/file3.txt",
"https://example.com/file4.txt",
}, nil)

// Fetch keys from each URL and flatten them into a single stream
keys := rill.FlatMap(urls, 3, func(url string) <-chan rill.Try[string] {
return streamFileLines(url)
})

// Exclude any empty keys from the stream
keys = rill.Filter(keys, 3, func(key string) (bool, error) {
return key != "", nil
})

// Organize keys into manageable batches of 10 for bulk operations
keyBatches := rill.Batch(keys, 10, 1*time.Second)

// Fetch values from DB for each batch of keys
resultBatches := rill.Map(keyBatches, 3, func(keys []string) ([]KV, error) {
values, err := kvMultiGet(keys...)
if err != nil {
return nil, err
}

results := make([]KV, len(keys))
for i, key := range keys {
results[i] = KV{Key: key, Value: values[i]}
}

return results, nil
})

// Convert batches back to a single items for final processing
results := rill.Unbatch(resultBatches)

// Exclude any empty values from the stream
results = rill.Filter(results, 3, func(kv KV) (bool, error) {
return kv.Value != "<nil>", nil
})

// Iterate over each key-value pair and print
cnt := 0
err := rill.ForEach(results, 1, func(kv KV) error {
fmt.Println(kv.Key, "=>", kv.Value)
cnt++
return nil
})
if err != nil {
fmt.Println("Error:", err)
}

fmt.Println("Total keys:", cnt)
}

// streamFileLines does line-by-line streaming of a file from a URL,
func streamFileLines(url string) <-chan rill.Try[string] {
// ...
// ...
}

// kvMultiGet does a batch read from a key-value database,
func kvMultiGet(keys ...string) ([]string, error) {
// ...
// ...
}
```

Expand Down Expand Up @@ -182,7 +182,7 @@ func doWork(ctx context.Context) error {
ids := streamIDs(ctx)

// Define other pipeline stages...
// Final stage processing
for value := range results {
// Process value...
Expand All @@ -203,10 +203,10 @@ func doWork(ctx context.Context) error {
ids := streamIDs(ctx)

// Define other pipeline stages...
// Ensure pipeline is drained in case of failure
defer rill.DrainNB(results)
// Final stage processing
for value := range results {
// Process value...
Expand Down Expand Up @@ -285,47 +285,47 @@ accurate computation of temperature variations.

```go
type Measurement struct {
Date time.Time
Temp float64
Date time.Time
Temp float64
}

func main() {
city := "New York"
endDate := time.Now()
startDate := endDate.AddDate(0, 0, -30)

// Make a channel that emits all the days between startDate and endDate
days := make(chan rill.Try[time.Time])
go func() {
defer close(days)
for date := startDate; date.Before(endDate); date = date.AddDate(0, 0, 1) {
days <- rill.Wrap(date, nil)
}
}()

// Download the temperature for each day concurrently
measurements := rill.OrderedMap(days, 10, func(date time.Time) (Measurement, error) {
temp, err := getTemperature(city, date)
return Measurement{Date: date, Temp: temp}, err
})

// Iterate over the measurements, calculate and print changes. Use a single goroutine
prev := Measurement{Temp: math.NaN()}
err := rill.ForEach(measurements, 1, func(m Measurement) error {
change := m.Temp - prev.Temp
prev = m

fmt.Printf("%s: %.1f°C (change %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, change)
return nil
})
if err != nil {
fmt.Println("Error:", err)
}
city := "New York"
endDate := time.Now()
startDate := endDate.AddDate(0, 0, -30)

// Make a channel that emits all the days between startDate and endDate
days := make(chan rill.Try[time.Time])
go func() {
defer close(days)
for date := startDate; date.Before(endDate); date = date.AddDate(0, 0, 1) {
days <- rill.Wrap(date, nil)
}
}()

// Download the temperature for each day concurrently
measurements := rill.OrderedMap(days, 10, func(date time.Time) (Measurement, error) {
temp, err := getTemperature(city, date)
return Measurement{Date: date, Temp: temp}, err
})

// Iterate over the measurements, calculate and print changes. Use a single goroutine
prev := Measurement{Temp: math.NaN()}
err := rill.ForEach(measurements, 1, func(m Measurement) error {
change := m.Temp - prev.Temp
prev = m

fmt.Printf("%s: %.1f°C (change %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, change)
return nil
})
if err != nil {
fmt.Println("Error:", err)
}
}

// getTemperature fetches a temperature reading for a city and date,
func getTemperature(city string, date time.Time) (float64, error) {
// ...
// ...
}

```

0 comments on commit 0a1227b

Please sign in to comment.