Notes: Pipe and Filter architecture

Notes: Pipe and Filter architecture

Implementation of pipe and filters architecture by implementing e-commerce search page filters

ยท

4 min read

Table of contents

No heading

No headings in the article.

Pipe and Filter architecture can be used to divide a larger processing task into a sequence of smaller, independent processing steps (Filters) that are connected by channels (Pipes).

Each filter exposes a very simple interface: it receives messages on the inbound pipe, processes the message, and publishes the results to the outbound pipe. The pipe connects one filter to the next, sending output messages from one filter to the next.

Since they all follow the same simple interface they can be composed into different solutions. Individual filters can be added, removed, or rearranged into a new sequence, within reason, without any modification to the filters themselves.

Pipe and Filter architecture can be used for e-commerce store filters as well which we'll work on as an example.

Let's assume that on search listing page we have three kinds of filters, Rating filter, Price filter and Availability filter.

Rating filter will remove products that have ratings below the provided criteria, Price filter will remove products above the provided criteria and lastly, Availability filter will remove unavailable products.

autodraw 05_05_2022 (1).png

Example of rating filter:

var RatingFilter Filter = func(ctx context.Context, products []Product) ([]Product, error) {
    rating := ctx.Value(RatingFilterKey).(int)
    if rating != 0 {
        var filteredProducts []Product

        for _, product := range products {
            if product.Rating >= rating {
                filteredProducts = append(filteredProducts, product)
            }
        }

        return filteredProducts, nil
    }

    return []Product{}, errors.New("invalid rating")
}

Filters follow the following type: type Filter func(context.Context, []Product) ([]Product, error)

They take context data (which may contain the filter criteria) and Product list as input and return the filtered product list as output, or error.

Now, we define our pipeline. Pipeline is basically a struct that contains a slice of all the filters added in the pipeline.

type Pipeline struct {
    Filters []Filter
}

Now, this pipeline contains one method Execute which starts the execution of all the filters and returns the final output i.e. filtered product list. Execute method takes the initial unfiltered product list as input. Execute(ctx context.Context, input []Product) ([]Product, error)

Apart from that pipeline contains Use(filter Filter) method to append filters to it.

Now, lastly, we have the product which can be defined like this:

type Product struct {
    Name         string
    Price        float64
    Rating       int
    Discount     float64 // percentage
    NetPrice     float64 // price - discount
    Availability bool
}

Now that we have defined everything, let's create an implementation step by step.

1) Initialize pipeline and add filters:

pipeline := Pipeline{}

pipeline.Use(RatingFilter)
pipeline.Use(AvailabilityFilter)
pipeline.Use(PriceFilter)

2) Now we pass the criteria for filters:

ctx := context.Background()
ctx = context.WithValue(ctx, RatingFilterKey, 3)
ctx = context.WithValue(ctx, PriceFilterKey, 17.5)

3) Not that everything is set, we execute pipeline: products, err := pipeline.Execute(ctx, productList)

products is the final output of all the filters.

One really important thing that we need to understand is that filters are independent of each other. They can be arranged in any sequence and still they should give the same result. We can add, remove or rearrange filters without impacting the output logically.

Complete working code to test:

package main

import (
    "context"
    "errors"
    "fmt"
)

const (
    RatingFilterKey = "filter.rating"
    PriceFilterKey  = "filter.price"
)

type Filter func(context.Context, []Product) ([]Product, error)

type Pipeline struct {
    Filters []Filter
}

func (p *Pipeline) Use(filter Filter) {
    p.Filters = append(p.Filters, filter)
}

func (p *Pipeline) Execute(ctx context.Context, input []Product) ([]Product, error) {
    var (
        output = input
        err    error
    )

    for _, filter := range p.Filters {
        output, err = filter(ctx, output)
        if err != nil {
            return nil, err
        }
    }

    return output, nil
}

/********************************
Implementtion of Pipe and Filter
*********************************/
type Product struct {
    Name         string
    Price        float64
    Rating       int
    Discount     float64 // percentage
    NetPrice     float64 // price - discount
    Availability bool
}

var RatingFilter Filter = func(ctx context.Context, products []Product) ([]Product, error) {
    rating := ctx.Value(RatingFilterKey).(int)
    if rating != 0 {
        var filteredProducts []Product

        for _, product := range products {
            if product.Rating >= rating {
                filteredProducts = append(filteredProducts, product)
            }
        }

        return filteredProducts, nil
    }

    return []Product{}, errors.New("invalid rating")
}

var PriceFilter Filter = func(ctx context.Context, products []Product) ([]Product, error) {
    price := ctx.Value(PriceFilterKey).(float64)
    if price != 0 {
        var filteredProducts []Product

        for _, product := range products {
            // getting net price with discount
            netPrice := product.Price * (1 - (product.Discount / 100.0))

            if netPrice <= price {
                product.NetPrice = netPrice
                filteredProducts = append(filteredProducts, product)
            }
        }

        return filteredProducts, nil
    }

    return []Product{}, errors.New("invalid price")
}

var AvailabilityFilter Filter = func(ctx context.Context, products []Product) ([]Product, error) {
    var filteredProducts []Product

    for _, product := range products {
        if product.Availability {
            filteredProducts = append(filteredProducts, product)
        }
    }

    return filteredProducts, nil
}

// initializing the product list
var productList []Product

func init() {
    productList = []Product{
        {Name: "cup", Price: 10.24, Rating: 3, Discount: 0, Availability: true},
        {Name: "glass", Price: 12.4, Rating: 5, Discount: 10, Availability: false},
        {Name: "pot", Price: 45.98, Rating: 4, Discount: 20, Availability: true},
        {Name: "dish", Price: 5.5, Rating: 4, Discount: 5, Availability: true},
        {Name: "plate", Price: 25.65, Rating: 2, Discount: 25, Availability: false},
    }
}

func main() {
    pipeline := Pipeline{}

    pipeline.Use(RatingFilter)
    pipeline.Use(AvailabilityFilter)
    pipeline.Use(PriceFilter)

    ctx := context.Background()
    ctx = context.WithValue(ctx, RatingFilterKey, 3)
    ctx = context.WithValue(ctx, PriceFilterKey, 17.5)

    products, err := pipeline.Execute(ctx, productList)
    if err != nil {
        fmt.Println(err.Error())
    }

    fmt.Printf("\nproducts: %#v\n", products)
}
ย