Parallelized Streams in Scala – Not What You Thought

I had a Stream of data coming in from an input source and had to do some heavily-CPU bound work. The code looked like this:

stream.map(item => cpuIntensiveWork(item))

Behind the scenes, the input stream is enumerated and items are acted upon one-by-one to create a lazily loaded list.
Wanting to parallelize this, I added par:

stream.par.map(item => cpuIntensiveWork(item))

What I imagined this would do was that it would load the lines into a buffer and have workers read from that buffer an execute the map while it was still writing. Turns out that’s not what happens.
What really happens is that the whole stream is read into memory and then that gets parallelized. Hardly what I was aiming for.

Advertisements

Euler Project Problems 7-9 in Scala

This is my continuing series on using Project Euler to learn Scala.
You can see the other posts in the series by visiting the Project Euler category.

Problem #7

import collection.mutable.ListBuffer

object problem7 {
  private val primes: ListBuffer[Int] = ListBuffer[Int]()

  private def isPrime(number: Int) =
    !primes.toStream
      .takeWhile(_ <= math.sqrt(number).floor.toInt)
      .exists(number % _ == 0)

  private def findPrimesStream(): Stream[Int] = {
    // Start running from the last prime + 1 (or from 2 if no primes exist) until the first prime
    val startAt = if (primes.isEmpty) 2 else (primes.max + 1)
    val num: Int = Stream.from(startAt).find(isPrime(_)).head

    // Memoize
    primes.append(num)

    // Continue the stream
    num #:: findPrimesStream
  }

  def main(args: Array[String]) {
    // stream.apply at index 10001
    println(findPrimesStream()(10000))
  }
}

I tried to rely on Streams here, along with the obvious memoization that needed to take place.

Problem #8

object problem8 {
  private val number = "7316717653133062491922511967442657474235534919493496983520312774506326239578318016984801869478851843858615607891129494954595017379583319528532088055111254069874715852386305071569329096329522744304355766896648950445244523161731856403098711121722383113622298934233803081353362766142828064444866452387493035890729629049156044077239071381051585930796086670172427121883998797908792274921901699720888093776657273330010533678812202354218097512545405947522435258490771167055601360483958644670632441572215539753697817977846174064955149290862569321978468622482839722413756570560574902614079729686524145351004748216637048440319989000889524345065854122758866688116427171479924442928230863465674813919123162824586178664583591245665294765456828489128831426076900422421902267105562632111110937054421750694165896040807198403850962455444362981230987879927244284909188845801561660979191338754992005240636899125607176060588611646710940507754100225698315520005593572972571636269561882670428252483600823257530420752963450"

  private def numbersStream(implicit index: Int = 0): Stream[String] = {
    if (index > number.length() - 5)
      return Stream.empty

    number.substring(index, index + 5) #:: numbersStream(index + 1)
  }

  def main(args: Array[String]) {
    // Get all consecutive fives
    val max = numbersStream().map(
      // Convert each to a stream of bytes
      _.toStream
        // Convert each one to a number and find their product
        .map(c => Integer.parseInt(c.toString, 10))
        .foldLeft(1)(_ * _))
      // Find the max product
      .max

    println(max)
  }
}

Problem #9

object problem9 {
  def main(args: Array[String]) {
    println(
      Range(1, 999)
        .map(a => (a, Range(a + 1, 999 - a).find(b => a + b + math.sqrt(a*a + b*b) == 1000)))
        .collectFirst{case (a, b) if b != None => a * b.get * math.sqrt(a*a + b.get*b.get) }
        .get
        .toInt)
  }
}

This time I found out about the collect/collectFirst methods that are pretty much filter and map in one. Also, the fact that I worked with a tuple meant that it was easier to have a case handle it, rather than fall back to ._1, et al.

Additional Reading:

Euler Project Problems 4-6 (with 1 and 2 revisited) in Scala

This is my continuing series on using Project Euler to learn Scala.
You can see the other posts in the series by visiting the Project Euler category.

Problem #4

val result = Range(999, 900, -1)
        .map(a => (a, Range(999, 900, -1).find(b => {
            val number = a * b
            val digits = number.toString.toCharArray.map(digit => Integer.valueOf(digit - 0x30))
            var found = true

            for (i <- 0 to digits.length / 2)
              if (digits(i) != digits(digits.length - 1 - i))
                found = false

            found
          })))
        .filter(pair => pair._2 != None)
        .map(pair => (pair._1, pair._2.get, pair._1 * pair._2.get))
        .maxBy(tuple => tuple._3)

println(result._1 + " * " + result._2 + " = " + result._3)

Gotta love writing one statement solutions.
I went with the brute-force solution here, because it was easy to write and debug and ran at a decent time.

Problem #5

import collection.mutable.{ListBuffer, HashMap}

object problem5 {
  val primes = new HashMap[Int, Int]()

  def collectPrimeDivisors(n: Int) {
    var current: Int = n
    var divisor: Int = 0

    // Find out the prime divisors and list them in divisorList
    val divisorList = ListBuffer[Int]()

    while (divisor != current && current != 1) {
      divisor = current

      val found = primes.find((prime: (Int, Int)) => current % prime._1 == 0)

      if (found != None) {
        divisorList.append(found.get._1)
        current /= found.get._1
        divisor = found.get._1
      }
    }

    if (divisor == current) {
      divisorList.append(current)
    }

    // Zip up the list of divisors
    val map = HashMap[Int, Int]()
    divisorList.foreach((prime: Int) => { map.put(prime, map.get(prime).getOrElse(0) + 1); map })

    // Collect the prime divisors
    map.foreach((tuple: (Int, Int)) => primes.put(tuple._1, math.max(tuple._2, primes.get(tuple._1).getOrElse(0))))
  }

  def run(number: Int): Int = {
    // Collect prime divisors from 2 to 20
    Range(2, number).foreach((i: Int) => collectPrimeDivisors(i))

    // Multiply all prime divisors
    primes.foldLeft(1)((seed: Int, prime: (Int, Int)) => seed * math.pow(prime._1, prime._2).asInstanceOf[Int])
  }

  def main(args: Array[String]) {
    println(run(20))
  }
}

It took me less time to do this with pen and paper than it did to code the solution. Here’s to math :)

Problem #6

object problem6 {
  def apply(n: Int): Int = {
    val sumOfSquares = n * (n + 1) * (2 * n + 1) / 6
    val sum = n * (n + 1) / 2
    val squareOfSum = sum * sum

    squareOfSum - sumOfSquares
  }

  def main(args: Array[String]) {
    println(apply(100))
  }
}

Another win goes to math. O(1) trumps O(n).

Problem #1 (two more solutions)

object problem1_b {
  def problem1Runner(x: Int): Int = {
    val zeroCase: PartialFunction[Int, Int] = { case 0 => 0 }
    val otherNumbers: PartialFunction[Int, Int] = { case n => p1(n) }
    val divisible: PartialFunction[ Int, Int ] = { case n if (n % 3 == 0) || (n % 5 == 0) => n + p1(n) }

    val runner = zeroCase orElse divisible orElse otherNumbers

    runner(x)
  }

  def p1(x: Int) = problem1Runner(x - 1)

  def main(args: Array[String]) {
    println(p1(1000))
  }
}

This solution is thanks to Tomer who told me about the case if pattern, in which the domain of the function is defined in the if clause instead of having to implement PartialFunction all over again.

However, after reading some more, I came to a more succinct solution:

println(Range(1, 1000).filter(n => n % 3 == 0 || n % 5 == 0).sum)

Gotta love one-liners :)

Problem #2 (another solution)

This solution uses the Tail Recursion optimization in Scala that was pointed out to me by Tomer.

object problem2_b {
  @tailrec
  private def sum(a: Int, b: Int, runningSum: Long): Long = {
    if (b > 4000000)
      runningSum
    else
      sum(b, a + b, if (b % 2 == 0) b + runningSum else runningSum)
  }

  def main(args: Array[String]) {
    println(sum(1, 1, 0))
  }
}

Euler Project Problems 1-3 in Scala

As with when I learned Ruby, I find Project Euler to be an extremely useful way both to learn a language and its idioms and also flex my math muscles every once in a while. Since I’m starting to learn Scala, I decided to start answering the problems there again (the last time I got to 33) and thought I’d share what I came up with here.

Problem #1

object problem1 {
  def run(number: Int): Int = {
    sum(number - 1)
  }

  def sum(number: Int): Int = {
    number match {
      case 0 => 0
      case n => (if(n % 3 == 0 || n % 5 == 0) n else 0) + problem1.sum(n - 1)
    }
  }
}

I had wanted to use PartialFunction‘s functionality for the sake of some mental gymnastics (and the fact that filtering doesn’t occur until the inside of the case in the previous solution), so here’s another solution:

def problem1Runner(x: Int): Int = {
  val zeroCase: PartialFunction[Int, Int] = { case 0 => 0 }
  val otherNumbers: PartialFunction[Int, Int] = { case n => p1(n) }

  val runner = zeroCase orElse new Problem1PF orElse otherNumbers

  runner(x)
}

def p1(x: Int) = problem1Runner(x - 1)

class Problem1PF extends PartialFunction[Int, Int] {
  def isDefinedAt(n: Int): Boolean = {
    (n % 3 == 0 || n % 5 == 0)
  }

  def apply(n: Int): Int = {
    n + p1(n)
  }
}

println(p1(1000))

Obviously, this is the less readable solution, but it taught me more to try and code it.

Problem #2

def fibSum(): Int = fibSum(1, 2)

def fibSum(a: Int, b: Int): Int = {
  var sum = 0
  var x = a
  var y = b

  while (x <= 4000000) {
    if (x % 2 == 0) sum += x

    val z = x
    x = y
    y += z
  }

  sum
}

println(fibSum())

I tried to create an obvious recursive solution, but I quickly understood how it would overflow.

Problem #3

Here I tried the obvious solution first, but it had horrible complexity, so I went and implemented the (unreadable) Shanks’ square forms factorization algorithm (and used Euclid’s greatest common divisor algorithm as part of it).

import collection.mutable.ListBuffer

object problem3 {
  def apply(n: Long): Long = {
    var num = n
    val factors = ListBuffer[Long](problem3.findFactor(num))

    while (factors.last != 1) {
      num /= factors.last
      factors.append(problem3.findFactor(num))
    }

    factors.max
  }

  private def findFactor(n: Long): Long = {
    // initialize
    val k = 1
    var p: ListBuffer[Long] = ListBuffer(math.floor(math.sqrt(k * n)).asInstanceOf[Long])
    var q: ListBuffer[Long] = ListBuffer(1, (k * n - math.pow(p.head, 2.0)).asInstanceOf[Long])
    var b: Long = 0L

    // repeat
    var i = 1

    do {
      b = math.floor((math.floor(math.sqrt(k * n)) + p(i-1)) / q(i)).asInstanceOf[Long]
      p.append(b * q(i) - p(i-1))
      q.append(q(i-1) + b * (p(i-1) - p(i)))
      i += 1
    } while (!isPerfectSquare(q.last))

    // initialize
    b = math.floor((math.floor(math.sqrt(k * n)) - p(i-1)) / math.sqrt(q(i))).asInstanceOf[Long]
    p = ListBuffer((b * math.sqrt(q(i)) + p(i - 1)).asInstanceOf[Long])
    q = ListBuffer(math.sqrt(q(i)).asInstanceOf[Long], ((k * n - math.pow(p(0), 2.0)) / math.sqrt(q(i))).asInstanceOf[Long])

    // repeat
    i = 0

    do {
      i += 1
      b = math.floor((math.floor(math.sqrt(k * n)) + p(i-1)) / q(i)).asInstanceOf[Long]
      p.append(b * q(i) - p(i-1))
      q.append(q(i-1) + b * (p(i-1) - p(i)))
    } while (p(i-1) != p(i))

    gcd(n, p(i))
  }

  private def isPerfectSquare(n: Long): Boolean = {
    val root: Long = math.floor(math.sqrt(n)).asInstanceOf[Long]

    root * root == n
  }

  private def gcd(a: Long, b: Long): Long = {
    // Euclid's algorithm
    b match {
      case 0 => a
      case _ => gcd(b, a - b * math.floor(1.0 * a / b).asInstanceOf[Long])
    }
  }
}

println(problem3(600851475143L))

Notes:

  1. ListBuffers are better than Lists for appending items.
  2. Turns out you can’t do for on a range longer than Int.MAX_VALUE.
  3. The number of non-textual operators in Scala is insane!