Parallel Fold Map

We are going to parallelize result of our previous exercise, synchronous Fold Map implementation. If you haven’t done it, I suggested that you take a stab at that first exercise.

How ? Lets say you know number of processors you have, and you want to split the job into batches.

The required processing involve the following steps:

  • We need to know number of available processor you have on your machine
  • We need to distribute evenly our input data as batch to each processor we have
  • We need to perform transformation on each batch using our existing foldmap implementation
  • We need to collect the result by performing a final reduce step from the result.

Some helper pre processing steps :

// Getting number of processor available, unfortunately fiddling in JS we only have one of this
val nProcessors =  Runtime.getRuntime.availableProcessors

// Splitting existing vector to groups with size 3 
val batches = (1 to 10).toList.grouped(3)
 
s"""$nProcessors\nBatches:\n${batches.mkString("\n")}"""

Fiddle Around!

import cats.Monoid
import cats.Applicative             
import cats.syntax.traverse._       

import cats.instances.list._        
import cats.instances.int._
import cats.instances.future._

import scala.concurrent.Future 
import scala.concurrent.ExecutionContext.Implicits.global

def foldMap[A, B: Monoid](values: Vector[A])(func: A => B): B = {
    val monoidB = implicitly[Monoid[B]]
    values.foldLeft(monoidB.empty) {
      case (b, a) => monoidB.combine(b, func(a))
    }
}

def parallelFoldMap[A, B : Monoid]
      (values: Vector[A])
      (func: A => B): Future[B] = {
        val monoidB = implicitly[Monoid[B]]
        val nProcessors =  Runtime.getRuntime.availableProcessors
        val batchSize   =  (values.size-1)/nProcessors + 1
        //Split the values in batches 

        //Pass the batches to previous foldmap 

        //Collect the result
        ???
      }


// Debug 
import scala.util._ 

val actual = parallelFoldMap(Vector("Count","Total","Length","Of","String"))(_.length).map(println)


import cats.Monoid
import cats.Applicative             
import cats.syntax.traverse._       

import cats.instances.list._        
import cats.instances.int._
import cats.instances.future._

import scala.concurrent.Future 
import scala.concurrent.ExecutionContext.Implicits.global

def foldMap[A, B: Monoid](values: Vector[A])(func: A => B): B = {
    val monoidB = implicitly[Monoid[B]]
    values.foldLeft(monoidB.empty) {
      case (b, a) => monoidB.combine(b, func(a))
    }
}

def parallelFoldMap[A, B : Monoid]
      (values: Vector[A])
      (func: A => B): Future[B] = {
        val monoidB = implicitly[Monoid[B]]
        val nProcessors =  Runtime.getRuntime.availableProcessors
        val batchSize   =  (values.size-1)/nProcessors + 1

        //Split the values in batches 
        values.grouped(batchSize).toList
        //Pass the batches to previous foldmap 
        .traverse(batch => Future(foldMap(batch)(func)))
        //Collect the result
        .map(reduce => foldMap(reduce.toVector)(identity))
      }

// Debug 

parallelFoldMap(Vector("Count","Total","Length","Of","String"))(_.length).map(println)


This is the same solution, implemented with more power of the kittens.

Instead of reducing using previous foldmap and identity, we are using foldable and combineall.


import cats.Monoid
     
import cats.syntax.traverse._       
import cats.syntax.semigroup._
import cats.syntax.foldable._

import cats.instances.int._
import cats.instances.vector._        
import cats.instances.future._

import scala.concurrent.Future 
import scala.concurrent.ExecutionContext.Implicits.global

def foldMap[A, B: Monoid](values: Vector[A])(func: A => B): B = {
    values.foldLeft(Monoid[B].empty)(_ |+| func(_))
}

def parallelFoldMap[A, B : Monoid]
      (values: Vector[A])
      (func: A => B): Future[B] = {
        val nProcessors =  Runtime.getRuntime.availableProcessors
        val batchSize   =  (values.size-1)/nProcessors + 1

        //Split the values in batches 
        values.grouped(batchSize).toVector
        //Pass the batches to previous foldmap 
        .traverse(batch => Future(foldMap(batch)(func)))
        //Collect the result
        .map(_.combineAll)
      }

// Debug 

parallelFoldMap(Vector("Count","Total","Length","Of","String"))(_.length).map(println)