?

Fan Out, Fan In While Maintaining Order

Learn how we have optimized our codebase when running concurrent and parallel data processing pipelines.

JW
Jake Walsh
September 15, 2020 9 minute read

My team and I have been using golang to devlop APIs and ETL processing pipelines. We are tasked with a common problem where we listen to change data capture (CDC) events, call associated API(s) to fetch hydrated data, then apply the events to our data store in order. The bottleneck in our pipeline is fetching the hydrated data. 

We wanted to be able to run the data hydration logic in parallel while maintaining the order of which the events occurred. We experimented with multiple implementations until deciding on the current implementation.

**Please note: The solution in the example is working with an input channel type ‘Event’ and a process that takes an 'Event' and returns a ‘HydratedEvent.’ Data types will vary per implementation.

Previous iteration

In our first iteration, we processed the input data stream in multiple steps to maintain order. This solution achieved our desired outcome, but presented a few drawbacks. One drawback was reprocessing the input channel to add an order number to the items. This was a code smell because channels already maintain order. Another drawback was that the workers waste time synchronizing their output rather than grabbing another unit of work. Although the desired outcome was achieved, we believed the drawbacks were worth investigating another solution.

Example code

type EventProcessor func(Event) HydratedEvent

type Event struct {
   Time time.Time
   Key  int32
}

type HydratedEvent struct {
   Event
   Payload interface{}
}

func OldWay(asyncProcessor EventProcessor, numWorkers int, inputC chan Event) chan HydratedEvent {
   outputC := make(chan HydratedEvent, 0)

   type input struct {
      orderNo int64
      value   Event
   }

   // take the items on the input channel and apply an order number to them
   orderedInputC := make(chan input)
   counter := int64(0)
   go func() {
      defer close(orderedInputC)
      for in := range inputC {
         orderedInputC <- input{
            orderNo: counter,
            value:   in,
         }
         counter++
      }
   }()

   // start a go-routine to instantiate a counter and wait on the worker wait group
   go func() {
      defer close(outputC)
      nextItem := int64(0)
      wg := sync.WaitGroup{}
      wg.Add(numWorkers)
      for i := 0; i < numWorkers; i++ {
         // start workerC in go-routines
         go func() {
            defer wg.Done()
            for in := range orderedInputC {
               val := asyncProcessor(in.value)
               for {
                  // check to see if this worker is processing the next item.  If so add to output chan
                  if atomic.LoadInt64(&nextItem) == in.orderNo {
                     atomic.AddInt64(&nextItem, 1)
                     outputC <- val
                     break
                  }
               }
            }
         }()
      }
      wg.Wait()
   }()
   return outputC
}

Technical Summary

First, we start a go routine to loop over the input channel, ‘inputC,’ that contains our CDC events. Then we write the message with an order number to the internal channel, ‘orderedInputC.’ Next, we start a go routine that kicks off the workers and maintains the state of the process using the ‘nextItem’ variable. 

Each worker routine is responsible for reading a message off the internal ordered channel ('orderedInputC'), proccessing the unit of work, then writing its output to the output channel ('outputC'). The worker's take responsibility for determing if they are the next item in line by continuosly polling the sychronization variable, ‘nextItem.’ This allowed us to parallelize the data hydration step, mocked by ‘asyncProcessor,’ and write output in the order the messages were received.

Strengths

  1. Easy to understand.
  2. Works well when the aysnc function runs in constant time.

Weaknesses

  1. Workers are blocked synchronizing their output rather than picking up additional units of work.

Current solution

In the current solution, we look to keep the previous iteration's strengths while eliminating the weaknesses associated with that solution. A teammate of mine recommended that we switch to a worker subscriber model. 

In this model we will have independent workers that will communicate to the work delegator when they are ready to take on work. In the previous iteration of this solution, we used a slice to maintain order. I enhanced the solution by eliminating the need for a slice to maintain message order by leveraging a channel to maintain order of the workers' output channels.

Solution code

type EventProcessor func(Event) HydratedEvent

type Event struct {
	Time time.Time
	Key  int32
}

type HydratedEvent struct {
	Event
	Payload interface{}
}

func Process(asyncProcessor EventProcessor, numWorkers int, inputC chan Event) chan HydratedEvent {
	workerGroup := NewWorkerGroup(context.Background(), asyncProcessor, numWorkers)
	go func() {
		for in := range inputC {
			workerGroup.AddWork(in)
		}
		workerGroup.Stop()
	}()
	return workerGroup.Output()
}

type worker struct {
	inputC  chan Event
	outputC chan HydratedEvent
}

func newWorker(bufferSize int) *worker {
	return &worker{
		inputC:  make(chan Event, bufferSize),
		outputC: make(chan HydratedEvent, bufferSize),
	}
}
func (w *worker) start(ctx context.Context, processor EventProcessor, registerC chan *worker, wg *sync.WaitGroup) {
	go func() {
		defer func() {
			close(w.inputC)
			close(w.outputC)
			wg.Done()
		}()
		for {
			registerC <- w
			select {
			case input := <-w.inputC:
				select {
				case w.outputC <- processor(input):
				case <-ctx.Done():
					return
				}
			case <-ctx.Done():
				return
			}
		}
	}()
}

type workerGroup struct {
	workerC       chan *worker
	workerOutputC chan chan HydratedEvent
	outputC       chan HydratedEvent
	group         *sync.WaitGroup
	stop          func()
}

func NewWorkerGroup(ctx context.Context, processor EventProcessor, size int) workerGroup {
	ctx, cancel := context.WithCancel(ctx)
	workerGroup := workerGroup{
		workerC:       make(chan *worker, size),
		workerOutputC: make(chan chan HydratedEvent, size),
		outputC:       make(chan HydratedEvent),
		stop:          cancel,
		group:         &sync.WaitGroup{},
	}
	workerGroup.group.Add(size)
	for i := 0; i < size; i++ {
		w := newWorker(1)
		w.start(ctx, processor, workerGroup.workerC, workerGroup.group)
	}
	go workerGroup.startOutput(ctx)
	go workerGroup.cleanUp(ctx)
	return workerGroup
}

func (wg *workerGroup) startOutput(ctx context.Context) {
	defer close(wg.outputC)
	for woc := range wg.workerOutputC {
		select {
		case wg.outputC <- <-woc:
		case <-ctx.Done():
			return
		}
	}
}

func (wg *workerGroup) cleanUp(ctx context.Context) {
	<-ctx.Done()
	wg.group.Wait()
	close(wg.workerOutputC)
	close(wg.workerC)
}

func (wg *workerGroup) AddWork(input Event) {
	for w := range wg.workerC {
		w.inputC <- input
		wg.workerOutputC <- w.outputC
		break
	}
}

func (wg *workerGroup) Output() chan HydratedEvent {
	return wg.outputC
}

func (wg *workerGroup) Stop() {
	wg.stop()
}

Technical summary

In this solution, the ‘Process’ function instantiates a new ‘WorkerGroup,’ delegates work and returns the group's output channel.

The ‘NewWorkerGroup’ constructor requires a context for cancellation, an event processor and the number of workers to start. The constructor starts the workers and kicks off go routines to publish the worker group's output and clean up the group's internal channels.

In the ‘worker.start’ method, the worker begins by registering its input channel to the ‘WorkerGroup’ ('registerC <-w'). The ‘worker’ then blocks on the select statement waiting to receive a signal from its input channel or context cancellation. If the worker receives input, it will call the execute function and write the output to its output channel ('outputC'). Otherwise, if the context is cancelled, the worker will return. 

This triggers the deferred function to close its input and output channels and signal to the ‘WorkerGroup’ it has completed by calling ‘Done()’ on the supplied ‘WaitGroup.’

The process continues by assigning units of work to the group by calling the ‘AddWork’ method. The ‘AddWork’ method is responsible for delegating the unit of work to the first available worker and synchronizing the worker's output. It delegates work by reading off the first available worker from ‘workerC’ and sending the unit of work to the worker's input channel. It synchronizes the worker's output by immediately writing the worker's output channel to ‘workerOutputC.’ 

Writing the ‘worker’'s output channel in the order work is received allows us to maintain order when outputting the units of work. ‘startOutput’ is then responsible for reading worker channels in the order work was delegating and sending the signals to the ‘WorkerGroup’'s output channel. The diagram below shows a use case of processing three events with two workers.

state of the WorkerGroup as units of work are added
Diagram illustrating the state of the WorkerGroup as units of work are added

In our implementation, we add CDC events to the ‘WorkerGroup’ through the ‘AddWork’ method. We then loop over the channel returned by the ‘Output’ method and apply the hydrated events to our data store in the order they were received. The diagram below shows output processing for the three events shown in the first example.

processing order of the WorkerGroup's events
Diagram illustrating the processing order of the WorkerGroup's events

Advantages

  1. Workers are no longer blocked synchronizing their output.
  2. Eliminates need to reprocess the input channel to add ordering.

Weaknesses

  1. Increased code complexity.

Benchmarks

I have included the code required to run benchmarks against the methods outlined above and a base case. The function under test adds a millisecond sleep if the input is divisible by 3. This was set up to emulate an asynchronous process that varies in processing time.

Benchmark code

// TestCode
func BenchmarkFanStep(b *testing.B) {
	testFunc := func(input Event) HydratedEvent {
		if input.Key != 0 && input.Key%3 == 0 {
			time.Sleep(time.Millisecond)
		}
		return HydratedEvent{
			Event:   input,
			Payload: "Some payload from an API",
		}
	}

	b.Run("testing new way", func(b *testing.B) {
		for i := 1; i < b.N; i++ {
			ProcessNewWay(testFunc, 5, 10)
		}
	})
	b.Run("testing old Way", func(b *testing.B) {
		for i := 1; i < b.N; i++ {
			ProcessOldWay(testFunc, 5, 10)
		}
	})
	b.Run("testing single routine", func(b *testing.B) {
		for i := 1; i < b.N; i++ {
			ProcessSingleRoutine(testFunc, 10)
		}
	})
}

func ProcessNewWay(processor EventProcessor, numWorkers, maxItems int) {
	inputC := make(chan Event, maxItems)
	for i := int32(0); i < int32(maxItems); i++ {
		inputC <- Event{Time: time.Now(), Key: i}
	}
	close(inputC)
	outC := Process(processor, numWorkers, inputC)
	for _ = range outC {
	}
}

func ProcessOldWay(processor EventProcessor, numWorkers int, maxItems int) {
	inputC := make(chan Event, maxItems)
	for i := int32(0); i < int32(maxItems); i++ {
		inputC <- Event{Time: time.Now(), Key: i}
	}
	close(inputC)
	outC := OldWay(processor, numWorkers, inputC)
	for _ = range outC {
	}
}

func ProcessSingleRoutine(processor EventProcessor, maxItems int) {
	inputC := make(chan Event, maxItems)
	for i := int32(0); i < int32(maxItems); i++ {
		inputC <- Event{Time: time.Now(), Key: i}
	}
	close(inputC)
	outC := make(chan HydratedEvent)

	go func() {
		defer close(outC)
		for in := range inputC {
			outC <- processor(in)
		}
	}()
	for _ = range outC {

	}
}

Benchmark results

goos: darwin
goarch: amd64
pkg: fanout/benchmark
BenchmarkFanStep/testing_new_way-8         	     894	   1423830 ns/op
BenchmarkFanStep/testing_old_Way-8         	     468	   2598577 ns/op
BenchmarkFanStep/testing_single_routine-8  	     292	   4150616 ns/op
PASS

 

We want to hear your thoughts! Sound off in the comment section below or reach out directly to start the discussion with our team of experts.

Learn more about our software expertise.
Share this

Comments