Notes: Golang Concurrency - Common Patterns and Use Cases
This blog contains common and frequently used concurrency patterns in Golang with use cases and code examples...
Table of contents
Four common concurrency patterns in Golang are:
- Generator Pattern
- Futures Pattern
- Fan-In Pattern
- Fan-Out Pattern
So, let's dive into these patterns one by one with some real-world examples.
Generator Pattern
In the generator pattern, we create and consume data simultaneously via a channel. We can use this pattern in places where we have huge data that needs to be read and processed. If we process that data all at once, it may lead to uncertain problems like memory overflow, high processor usage, transaction failure, etc.
An example implementation would be reading a large file line by line to calculate the total number of characters in that file. Here is the code for the same:
package main
import (
"bufio"
"fmt"
"log"
"os"
"unicode/utf8"
)
func charCount(fileName string) chan int {
// communication channel
charCountChannel := make(chan int)
file, err := os.Open(fileName)
if err != nil {
log.Fatalf("failed opening file: %s", err)
}
scanner := bufio.NewScanner(file)
scanner.Split(bufio.ScanLines)
// goroutine to start putting data into channel
go func() {
for scanner.Scan() {
// passing character count of line(s) to the channel
charCountChannel <- utf8.RuneCountInString(scanner.Text())
}
// close channel and file after complete exhaustion
close(charCountChannel)
file.Close()
}()
return charCountChannel
}
func main() {
var numOfChars int
var fileName = "sample-file.txt"
// range through channel until it is closed
for charCount := range charCount(fileName) {
numOfChars += charCount
}
fmt.Printf("Number of characters in file %v is %v\n", fileName, numOfChars)
}
Futures Pattern
Futures pattern is analogous to async/await in JS. In futures pattern, we launch goroutines to carry on tasks beforehand and we use the result later somewhere in the application. This pattern is useful to make code non-blocking in cases where we have some dependency on some external infrastructure e.g. external API calls, database queries, etc.
A very simple example can be calling an external API and using the result later on in the application. Here is the demo code:
package main
import (
"fmt"
"io/ioutil"
"net/http"
)
type data struct {
Body []byte
Error error
}
// this returns channel which contains future data
func futureData(url string) <-chan data {
// buffer size 1 to make channel non-blocking
c := make(chan data, 1)
// goroutine to execute GET call to get data and put into channel
go func() {
resp, err := http.Get(url)
if err != nil {
c <- data{Body: []byte{}, Error: err}
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
c <- data{Body: body, Error: err}
}()
return c
}
func main() {
future := futureData("https://jsonplaceholder.typicode.com/posts")
/****************************
** Do all the other stuffs **
****************************/
body := <-future
fmt.Printf("response: %v", string(body.Body))
fmt.Printf("error: %v", body.Error)
}
Fan-In/ Fan-Out Pattern
Fan-In Fan-Out patterns are a way to Multiplex and Demultiplex data in Go. In the Fan-In pattern, multiple data channels are merged into one or fewer channels. Fan-Out is the exact opposite, it divides the data into multiple channels, distributing the work amongst a group of workers to parallelize CPU usage and I/O.
One practical example of these patterns can be in messaging queues. Let's assume that we have a service running on a server sending a stream of enormous log data and we want to push this log to other external locations via queues. In that case, we can push these log streams into another service with multiple workers which can push the data into the relevant queues (info, warning, error, etc). Here, we are Demultiplexing one data stream into multiple streams via workers. This is Fan-Out.
Now, let's say our consumer gets this log data and writes that into a single file. In that case, we'll use a single stream to write into the file as we cannot write on the file simultaneously. This is the Fan-In pattern. Here we're Multiplexing multiple data streams into one stream to write into a file.
Here is a diagram showing the same:
The below example is a different implementation though. In this example, we're creating random strings and calculating their lengths, and publishing it as the result. Here we have Job
data that needs to be processed and Result
data is the output of processing on Job data. Job data is passed to "workers" via the jobs
channel (Fan-Out) and workers after processing the data pass results via the results
channel (Fan-In).
package main
import (
"fmt"
"math/rand"
"sync"
"time"
"unicode/utf8"
)
type Job struct {
id int
randomString string
}
type Result struct {
job Job
charLength int
}
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
// creates multiple workers to Fan-Out jobs channel data processing across them
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
// workers waiting for data from jobs channel
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, charCount(job.randomString)}
results <- output
}
wg.Done()
}
// function used by workers to process the data
func charCount(randomString string) int {
return utf8.RuneCountInString(randomString)
}
// this creates/ fetches data to push in jobs channel
// data can be fetched from external sources as well
// e.g. message queues, broadcasts etc.
func startJobs(noOfJobs int) {
var letters = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
for i := 0; i < noOfJobs; i++ {
maxStringLength := rand.Intn(20)
b := make([]rune, maxStringLength)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
job := Job{i, string(b)}
jobs <- job
}
close(jobs)
}
// this function receives the final output from the worker
// we can further process or pass this data from here
// this is Fan-In pattern as we're passing on data from all the workers through this single channel
func result(done chan bool) {
for result := range results {
fmt.Printf("Job ID: %d, Input Random String: %s, Char Length: %d\n", result.job.id, result.job.randomString, result.charLength)
}
done <- true
}
func init() {
rand.Seed(time.Now().UnixNano())
}
func main() {
// spawn workers
noOfWorkers := 12
go createWorkerPool(noOfWorkers)
// start jobs
noOfJobs := 100
go startJobs(noOfJobs)
done := make(chan bool)
go result(done)
<-done
fmt.Println("Processing Completed!")
}