Notes: Circuit Breaker in Go

Notes: Circuit Breaker in Go

Circuit breaker implementation in Go using various services communicating with each other

Table of contents

No heading

No headings in the article.

Circuit breaker pattern is a must-have pattern whenever we deal with external services or external network calls, like databases, API calls, disk read-write, etc. Circuit breaker makes sure that downtime of external services doesn't affect the caller service. The circuit breaker is a pattern intended to increase the resiliency of a system.

Circuit breaker has three available states:

  1. Closed – When everything is normal, the circuit breaker remains in the closed state and all requests pass through the service without any issues.
  2. Open – When the number of failures exceeds a set threshold the breaker trips, and it goes into the Open state. The circuit breaker can return an error or a predefined value or a cached value for subsequent requests immediately without even calling the external service.
  3. Half-Open – After a timeout period, the circuit switches to a half-open state to test if the underlying problem is fixed. If a single call fails in this half-open state, the breaker is once again tripped. If it succeeds, the circuit breaker resets back to the normal, closed state. Its state diagram looks like this: state.png

In our case, we're using five services that are calling each other as per the requirement. MainService is calling UserService, TodoService and PostService, while PostService is further calling PostCommentService. autodraw 12_06_2022.png Now, we are going to use this library for circuit breaker implementation: sony/gobreaker

We can set our parameters in Settings of this library:

type Settings struct {
    Name          string
    MaxRequests   uint32
    Interval      time.Duration
    Timeout       time.Duration
    ReadyToTrip   func(counts Counts) bool
    OnStateChange func(name string, from State, to State)
    IsSuccessful  func(err error) bool
}

You can read about these in their documentation: sony/gobreaker doc

In our case, we are setting ReadyToTrip which lets us set trip conditions:

st.ReadyToTrip = func(counts gobreaker.Counts) bool {
    failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
    return counts.Requests >= 3 && failureRatio >= 0.6
}

Here, we're returning true if the request count is more than 3 and the failure ratio is more than 0.6. Once it hits this condition, the circuit sets to open state. There are two other important parameters for which we are using default values only, MaxRequests and Timeout.

MaxRequests is the maximum number of requests allowed to pass through when the CircuitBreaker is half-open. If MaxRequests is 0, CircuitBreaker allows only 1 request.

Timeout is the period of the open state, after which the state of CircuitBreaker becomes half-open. If Timeout is 0, the timeout value of CircuitBreaker is set to 60 seconds.

Now to call external APIs with Circuit Breaker, we use Execute method to wrap our calling code. This is what it looks like in PostService:

comment, err := POST_COMMENT_SERVICE_CB.Execute(func() (interface{}, error) {
        resp, err := client.Get(POST_COMMENT_SERVICE + "/post/comments")
        if err != nil {
            return nil, err
        }
        defer resp.Body.Close()

        body, err := io.ReadAll(resp.Body)
        if err != nil {
            return nil, err
        }

        return string(body), nil
    })

Now that we are done with circuit breaker concept and some details about the library as well. Let's jump into our test implementation. This is what my directory structure looks like: Screenshot 2022-06-12 at 5.13.09 PM.png I've created different directories for each service except for the MainService which resides in the root itself. Here are the codes for each one of them:

post_comment_service/main.go

package main

import (
    "net/http"
    "time"

    "github.com/labstack/echo"
)

func getPostComments() echo.HandlerFunc {
    return func(c echo.Context) error {
        // simulated delay for timeout
        // time.Sleep(5 * time.Second)

        return c.String(http.StatusOK, "post comments service response")
    }
}

func main() {
    e := echo.New()
    e.GET("/post/comments", getPostComments())
    e.Logger.Fatal(e.Start(":8084"))
}

user_detail_service/main.go

package main

import (
    "net/http"
    "time"

    "github.com/labstack/echo"
)

func getUserDetails() echo.HandlerFunc {
    return func(c echo.Context) error {
        // simulated delay for timeout
        // time.Sleep(5 * time.Second)

        return c.String(http.StatusOK, "user details service response")
    }
}

func main() {
    e := echo.New()
    e.GET("/user/details", getUserDetails())
    e.Logger.Fatal(e.Start(":8081"))
}

user_post_service/main.go

package main

import (
    "io"
    "log"
    "net/http"
    "time"

    "github.com/labstack/echo"
    "github.com/sony/gobreaker"
)

const POST_COMMENT_SERVICE = "http://localhost:8084"

var POST_COMMENT_SERVICE_CB *gobreaker.CircuitBreaker

func init() {
    var st gobreaker.Settings
    st.Name = "HTTP GET"
    st.ReadyToTrip = func(counts gobreaker.Counts) bool {
        failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
        return counts.Requests >= 3 && failureRatio >= 0.6
    }

    POST_COMMENT_SERVICE_CB = gobreaker.NewCircuitBreaker(st)
}

// http client with timeout
var client = http.Client{
    Timeout: 2 * time.Second,
}

func getUserPosts() echo.HandlerFunc {
    return func(c echo.Context) error {
        comment, err := POST_COMMENT_SERVICE_CB.Execute(func() (interface{}, error) {
            resp, err := client.Get(POST_COMMENT_SERVICE + "/post/comments")
            if err != nil {
                return nil, err
            }
            defer resp.Body.Close()

            body, err := io.ReadAll(resp.Body)
            if err != nil {
                return nil, err
            }

            return string(body), nil
        })

        log.Println("post comment service breaker state: ", POST_COMMENT_SERVICE_CB.State())

        if err != nil {
            return c.String(http.StatusInternalServerError, "")
        }

        return c.String(http.StatusOK, "user details service response"+" [aggregated with] "+comment.(string))
    }
}

func main() {
    e := echo.New()
    e.GET("/user/posts", getUserPosts())
    e.Logger.Fatal(e.Start(":8083"))
}

user_todo_service/main.go

package main

import (
    "net/http"

    "github.com/labstack/echo"
)

func getUserTodos() echo.HandlerFunc {
    return func(c echo.Context) error {
        return c.String(http.StatusOK, "user todos service response")
    }
}

func main() {
    e := echo.New()
    e.GET("/user/todos", getUserTodos())
    e.Logger.Fatal(e.Start(":8082"))
}

main.go

package main

import (
    "io"
    "log"
    "net/http"
    "time"

    "github.com/labstack/echo/v4"
    "github.com/sony/gobreaker"
)

// services URIs
const (
    USER_DETAIL_SERVICE = "http://localhost:8081"
    USER_TODO_SERVICE   = "http://localhost:8082"
    USER_POST_SERVICE   = "http://localhost:8083"
)

// all circuit breakers
var (
    USER_DETAIL_SERVICE_CB *gobreaker.CircuitBreaker
    USER_TODO_SERVICE_CB   *gobreaker.CircuitBreaker
    USER_POST_SERVICE_CB   *gobreaker.CircuitBreaker
)

func init() {
    var st gobreaker.Settings
    st.Name = "HTTP GET"
    st.ReadyToTrip = func(counts gobreaker.Counts) bool {
        failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
        return counts.Requests >= 3 && failureRatio >= 0.6
    }

    USER_DETAIL_SERVICE_CB = gobreaker.NewCircuitBreaker(st)
    USER_TODO_SERVICE_CB = gobreaker.NewCircuitBreaker(st)
    USER_POST_SERVICE_CB = gobreaker.NewCircuitBreaker(st)
}

type getUserDetailsReturn struct {
    userDetails string
    errors      error
}

type getUserPostsReturn struct {
    postList string
    errors   error
}

type getUserTODOReturn struct {
    todoList string
    errors   error
}

type GetUserDataResponse struct {
    UserDetails string `json:"userDetails"`
    TODOList    string `json:"todoList"`
    PostList    string `json:"postList"`
}

// http client with timeout
var client = http.Client{
    Timeout: 2 * time.Second,
}

func getUserDetails() <-chan getUserDetailsReturn {
    var getUserDetailsReturnChan = make(chan getUserDetailsReturn)

    go func() {
        userDetails, err := USER_DETAIL_SERVICE_CB.Execute(func() (interface{}, error) {
            resp, err := client.Get(USER_DETAIL_SERVICE + "/user/details")
            if err != nil {
                return nil, err
            }
            defer resp.Body.Close()

            body, err := io.ReadAll(resp.Body)
            if err != nil {
                return nil, err
            }

            return string(body), nil
        })

        log.Println("user detail service breaker state: ", USER_DETAIL_SERVICE_CB.State())

        if err != nil {
            getUserDetailsReturnChan <- getUserDetailsReturn{errors: err}
        } else {
            getUserDetailsReturnChan <- getUserDetailsReturn{userDetails: userDetails.(string)}
        }
    }()

    return getUserDetailsReturnChan
}

func getUserTODOs() <-chan getUserTODOReturn {
    var getUserTODOReturnChan = make(chan getUserTODOReturn)

    go func() {
        todoList, err := USER_TODO_SERVICE_CB.Execute(func() (interface{}, error) {
            resp, err := client.Get(USER_TODO_SERVICE + "/user/todos")
            if err != nil {
                return nil, err
            }
            defer resp.Body.Close()

            body, err := io.ReadAll(resp.Body)
            if err != nil {
                return nil, err
            }

            return string(body), nil
        })

        // log.Println("user todos service breaker state: ", USER_TODO_SERVICE_CB.State())

        if err != nil {
            getUserTODOReturnChan <- getUserTODOReturn{errors: err}
        } else {
            getUserTODOReturnChan <- getUserTODOReturn{todoList: todoList.(string)}
        }
    }()

    return getUserTODOReturnChan
}

func getUserPosts() <-chan getUserPostsReturn {
    var getUserPostsReturnChan = make(chan getUserPostsReturn)

    go func() {
        postList, err := USER_POST_SERVICE_CB.Execute(func() (interface{}, error) {
            resp, err := client.Get(USER_POST_SERVICE + "/user/posts")
            if err != nil {
                return nil, err
            }
            defer resp.Body.Close()

            body, err := io.ReadAll(resp.Body)
            if err != nil {
                return nil, err
            }

            return string(body), nil
        })

        // log.Println("user post service breaker state: ", USER_POST_SERVICE_CB.State())

        if err != nil {
            getUserPostsReturnChan <- getUserPostsReturn{errors: err}
        } else {
            getUserPostsReturnChan <- getUserPostsReturn{postList: postList.(string)}
        }
    }()

    return getUserPostsReturnChan
}

func getUserData() echo.HandlerFunc {
    return func(c echo.Context) error {
        // async calls for user details and post details
        // as they're not dependent on each other
        userDetailsChan := getUserDetails()
        userTODOListChan := getUserTODOs()
        postListChan := getUserPosts()

        // resolving channel data
        userDetails := <-userDetailsChan
        todoList := <-userTODOListChan
        postList := <-postListChan

        response := &GetUserDataResponse{
            UserDetails: userDetails.userDetails,
            TODOList:    todoList.todoList,
            PostList:    postList.postList,
        }

        return c.JSON(http.StatusOK, response)
    }
}

func main() {
    e := echo.New()
    e.GET("/user", getUserData())
    e.Logger.Fatal(e.Start(":8080"))
}

Now to start the services, just run go run [service_directory]/main.go and they'll be up and running. To simulate failure, we can have two approaches:

  • we can pull down the whole service down
  • we can simulate timeout by adding sleep in the APIs

If we pull down the whole service, you'll observe that we'll get the response immediately which is something we actually want. Let's say we pull down the whole UserService and send the request to our MainService, we'll see that circuit will be open at the third request though:

Screenshot 2022-06-12 at 6.56.11 PM.png

Now, let's move to the second condition. For the second condition, we have added a timeout code in /user/details API of UserService, // time.Sleep(5 * time.Second). Now uncomment the code and try this again.

Remember we have set out HTTP clients with 2 seconds timeout.

var client = http.Client{
    Timeout: 2 * time.Second,
}

This time also we'll see the same result as we saw above but this time we'll see a delay of 2 seconds for each failed request due to timeout duration. Now, let's imagine if we have thousands of requests on the server and the API adds up these 2 seconds. This will bring down our systems, or create unwanted autoscaling costing us a lot of money. But thankfully after three requests, our system will learn that there is some issue and the circuit breaker will trip opening the circuit and fail fast just like the first case, returning values according to us.

autodraw 12_06_2022 (1).png

Now, let's try one more thing. In our system, we have chained calls as well, MainService is calling PostService and PostService is calling PostCommentService. What would happen if PostCommentService fails!

autodraw 12_06_2022 (3).png

You can try it by uncommenting the timeout code for /post/comments API. Here you'll observe that since PostService API is dependent on this API, it will also slow down, which will slow down MainService. This way failure propagates to the first service which is not good for the system at all. We call this kind of failure as cascading failure. Again thankfully we have circuit breakers at PostService itself, which will trip and open the PostCommentService's API, resulting in fast failures and returning whatever we want.