Notes: Golang Concurrency - Common Patterns and Use Cases

Notes: Golang Concurrency - Common Patterns and Use Cases

This blog contains common and frequently used concurrency patterns in Golang with use cases and code examples...

ยท

5 min read

Four common concurrency patterns in Golang are:

  1. Generator Pattern
  2. Futures Pattern
  3. Fan-In Pattern
  4. Fan-Out Pattern

So, let's dive into these patterns one by one with some real-world examples.

Generator Pattern

In the generator pattern, we create and consume data simultaneously via a channel. We can use this pattern in places where we have huge data that needs to be read and processed. If we process that data all at once, it may lead to uncertain problems like memory overflow, high processor usage, transaction failure, etc.

An example implementation would be reading a large file line by line to calculate the total number of characters in that file. Here is the code for the same:

package main

import (
    "bufio"
    "fmt"
    "log"
    "os"
    "unicode/utf8"
)

func charCount(fileName string) chan int {
    // communication channel
    charCountChannel := make(chan int)

    file, err := os.Open(fileName)
    if err != nil {
        log.Fatalf("failed opening file: %s", err)
    }

    scanner := bufio.NewScanner(file)
    scanner.Split(bufio.ScanLines)

    // goroutine to start putting data into channel
    go func() {
        for scanner.Scan() {
            // passing character count of line(s) to the channel
            charCountChannel <- utf8.RuneCountInString(scanner.Text())
        }

        // close channel and file after complete exhaustion
        close(charCountChannel)
        file.Close()
    }()

    return charCountChannel
}

func main() {
    var numOfChars int
    var fileName = "sample-file.txt"

    // range through channel until it is closed
    for charCount := range charCount(fileName) {
        numOfChars += charCount
    }

    fmt.Printf("Number of characters in file %v is %v\n", fileName, numOfChars)
}

Futures Pattern

Futures pattern is analogous to async/await in JS. In futures pattern, we launch goroutines to carry on tasks beforehand and we use the result later somewhere in the application. This pattern is useful to make code non-blocking in cases where we have some dependency on some external infrastructure e.g. external API calls, database queries, etc.

A very simple example can be calling an external API and using the result later on in the application. Here is the demo code:

package main

import (
    "fmt"
    "io/ioutil"
    "net/http"
)

type data struct {
    Body  []byte
    Error error
}

// this returns channel which contains future data
func futureData(url string) <-chan data {
    // buffer size 1 to make channel non-blocking
    c := make(chan data, 1)

    // goroutine to execute GET call to get data and put into channel
    go func() {
        resp, err := http.Get(url)
        if err != nil {
            c <- data{Body: []byte{}, Error: err}
        }

        defer resp.Body.Close()

        body, err := ioutil.ReadAll(resp.Body)
        c <- data{Body: body, Error: err}
    }()

    return c
}

func main() {
    future := futureData("https://jsonplaceholder.typicode.com/posts")

    /****************************
    ** Do all the other stuffs **
    ****************************/

    body := <-future
    fmt.Printf("response: %v", string(body.Body))
    fmt.Printf("error: %v", body.Error)
}

Fan-In/ Fan-Out Pattern

Fan-In Fan-Out patterns are a way to Multiplex and Demultiplex data in Go. In the Fan-In pattern, multiple data channels are merged into one or fewer channels. Fan-Out is the exact opposite, it divides the data into multiple channels, distributing the work amongst a group of workers to parallelize CPU usage and I/O.

One practical example of these patterns can be in messaging queues. Let's assume that we have a service running on a server sending a stream of enormous log data and we want to push this log to other external locations via queues. In that case, we can push these log streams into another service with multiple workers which can push the data into the relevant queues (info, warning, error, etc). Here, we are Demultiplexing one data stream into multiple streams via workers. This is Fan-Out.

Now, let's say our consumer gets this log data and writes that into a single file. In that case, we'll use a single stream to write into the file as we cannot write on the file simultaneously. This is the Fan-In pattern. Here we're Multiplexing multiple data streams into one stream to write into a file.

Here is a diagram showing the same: autodraw 29_12_2021.png The below example is a different implementation though. In this example, we're creating random strings and calculating their lengths, and publishing it as the result. Here we have Job data that needs to be processed and Result data is the output of processing on Job data. Job data is passed to "workers" via the jobs channel (Fan-Out) and workers after processing the data pass results via the results channel (Fan-In).

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
    "unicode/utf8"
)

type Job struct {
    id           int
    randomString string
}

type Result struct {
    job        Job
    charLength int
}

var jobs = make(chan Job, 10)
var results = make(chan Result, 10)

// creates multiple workers to Fan-Out jobs channel data processing across them
func createWorkerPool(noOfWorkers int) {
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}

// workers waiting for data from jobs channel
func worker(wg *sync.WaitGroup) {
    for job := range jobs {
        output := Result{job, charCount(job.randomString)}
        results <- output
    }
    wg.Done()
}

// function used by workers to process the data
func charCount(randomString string) int {
    return utf8.RuneCountInString(randomString)
}

// this creates/ fetches data to push in jobs channel
// data can be fetched from external sources as well
// e.g. message queues, broadcasts etc.
func startJobs(noOfJobs int) {
    var letters = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

    for i := 0; i < noOfJobs; i++ {
        maxStringLength := rand.Intn(20)

        b := make([]rune, maxStringLength)
        for i := range b {
            b[i] = letters[rand.Intn(len(letters))]
        }

        job := Job{i, string(b)}
        jobs <- job
    }
    close(jobs)
}

// this function receives the final output from the worker
// we can further process or pass this data from here
// this is Fan-In pattern as we're passing on data from all the workers through this single channel
func result(done chan bool) {
    for result := range results {
        fmt.Printf("Job ID: %d, Input Random String: %s, Char Length: %d\n", result.job.id, result.job.randomString, result.charLength)
    }
    done <- true
}

func init() {
    rand.Seed(time.Now().UnixNano())
}

func main() {
    // spawn workers
    noOfWorkers := 12
    go createWorkerPool(noOfWorkers)

    // start jobs
    noOfJobs := 100
    go startJobs(noOfJobs)

    done := make(chan bool)
    go result(done)

    <-done

    fmt.Println("Processing Completed!")
}
ย