hammerlab/magic-rdds

Name: magic-rdds

Owner: Hammer Lab

Description: Miscellaneous functionality for manipulating Apache Spark RDDs.

Created: 2016-06-01 14:22:24.0

Updated: 2018-01-12 19:52:47.0

Pushed: 2018-01-13 21:04:11.0

Homepage: null

Size: 476

Language: Scala

GitHub Committers

UserMost Recent Commit# Commits

Other Committers

UserEmailMost Recent Commit# Commits

README

Magic RDDs

Join the chat at https://gitter.im/hammerlab/magic-rdds Build Status Coverage Status Maven Central

Enrichment methods for Apache Spark RDDs:

ark-shell --packages org.hammerlab:magic-rdds_2.11:4.0.0

a> import magic_rdds._
a> sc.parallelize(List(1, 1, 1, 2, 2, 2, 2, 2, 2, 10)).runLengthEncode.collect()
: Array[(Int, Int)] = Array((1,3), (2,6), (10,1))
Using

Use these Maven coordinates to depend on magic-rdds' latest Scala 2.11 build:

endency>
roupId>org.hammerlab</groupId>
rtifactId>magic-rdds_2.11</artifactId>
ersion>4.0.0</version>
pendency>

In SBT, use:

.hammerlab" %% "magic-rdds" % "4.0.0"
Overview

Following are explanations of some of the RDDs provided by this repo and the functionality they provide:

RDDs

RDD-helpers found in the org.hammerlab.magic.rdd package.

Run-length encoding

Exposes a runLengthEncode method on RDDs, per the example above.

Scans Basic .scanLeft / .scanRight
rdd = sc.parallelize(1 to 10)

scanLeft(0)(_ + _).collect
rray(0, 1, 3, 6, 10, 15, 21, 28, 36, 45)

scanRight(0)(_ + _).collect
rray(54, 52, 49, 45, 40, 34, 27, 19, 10, 0)

Before the .collect, these each return a ScanRDD, which is a wrapper around the post-scan RDD, the total sum, and an array with the first element in each partition, and is automatically implicitly-convertible to its contained RDD.

Include each element in the partial-sum that replaces it
scanLeftInclusive(0)(_ + _).collect
rray[Int] = Array(1, 3, 6, 10, 15, 21, 28, 36, 45, 55)

scanRightInclusive(0)(_ + _).collect
rray(55, 54, 52, 49, 45, 40, 34, 27, 19, 10)
Use cats.Monoids
rt cats.instances.int._

scanLeft
scanRight
scanLeftInclusive
scanRightInclusive
Operate on “value"s of key-values pairs
pairRDD = sc.parallelize('a' to 'j' zip (1 to 10))

RDD.scanLeftValues.collect
rray((a,0), (b,1), (c,3), (d,6), (e,10), (f,15), (g,21), (h,28), (i,36), (j,45))

RDD.scanLeftValuesInclusive.collect
rray((a,1), (b,3), (c,6), (d,10), (e,15), (f,21), (g,28), (h,36), (i,45), (j,55))

RDD.scanRightValues.collect
rray((a,54), (b,52), (c,49), (d,45), (e,40), (f,34), (g,27), (h,19), (i,10), (j,0))

RDD.scanRightValuesInclusive.collect
rray((a,55), (b,54), (c,52), (d,49), (e,45), (f,40), (g,34), (h,27), (i,19), (j,10))

Additionally, .scanRight and .scanRightValues expose two implementations with performance tradeoffs:

.reverse

Reverse the elements in an RDD, optionally preserving (though still inverting) their partitioning:

arallelize(1 to 10).reverse().collect
: Array[Int] = Array(10, 9, 8, 7, 6, 5, 4, 3, 2, 1)
reduceByKey extensions

Self-explanatory: .maxByKey, .minByKey

RDD Comparisons / Diffs

Example setup:

rdd1 = sc.parallelize(1 to 10)
rdd2 = sc.parallelize((1 to 5) ++ (12 to 7 by -1))

rt cmp._
unorderedCmp: compare, disregarding order/position of elements:
Unordered.Stats(both, onlyA, onlyB) = rdd1.unorderedCmp(rdd2).stats
 = 9   (1 to 5, 7 to 10)
A = 1  (6)
B = 2  (11, 12)
orderedCmp: distinguish between common elements at the same vs. different indices:
Ordered.Stats(eq, reordered, onlyA, onlyB) = rdd1.orderedCmp(rdd2).stats
q = 6        (1, 2, 3, 4, 5, 9)
eordered = 3 (7, 8, 10)
nlyA = 1     (6)
nlyB = 2     (11, 12)
compareByKey: unordered comparison of values for each key of paired-RDDs:
rt hammerlab.iterator._

chars10  = ('a' to 'j') zipWithIndex
chars4   = chars10.take(4)
chars5_2 = chars10.mapValues(_ + 10).takeEager(5)

rdd1 = sc.parallelize(chars10  ++ chars4)
rdd2 = sc.parallelize(chars5_2 ++ chars4)

Keyed.Stats(eq, extraA, extraB, onlyA, onlyB) = rdd1.compareByKey(rdd2).stats
q     = 4 (a ?  1, b ?  2, c ?  3, d ?  4)
xtraA = 4 (a ?  1, b ?  2, c ?  3, d ?  4; second copy of each)
xtraB = 0 
nlyA  = 6 (e ?  5, f ?  6, g ?  7, h ?  8, i ?  9, j ?  10)
nlyB  = 5 (a ? 11, b ? 12, c ? 13, d ? 14, e ? 15)
collectParts

collect an RDD while keeping elements in their respective partitions:

arallelize(1 to 12).collectParts
rray(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9), Array(10, 11, 12))
.size

A smarter version of RDD.count:

.lazyZipWithIndex

Functionally equivalent to RDD.zipWithIndex, but runs the first of the two necessary jobs (computing per-partition sizes and cumulative offsets) lazily, in a manner truer to the spirit of the lazy-wherever-possible RDD API than the .zipWithIndex implementation.

Sliding / Windowed Traversals

Exposes several methods in the spirit of Scala collections' similar API.

Tuple2 / Tuple3:

arallelize(1 to 6).sliding2.collect
rray((1,2), (2,3), (3,4), (4,5), (5,6))

arallelize(1 to 6).sliding3.collect
rray((1,2,3), (2,3,4), (3,4,5), (4,5,6))

2- and 3-element windows with Option contexts; input and output RDDs have same number of eleemnts:

arallelize(1 to 6).sliding2Next.collect
rray((1,Some(2)), (2,Some(3)), (3,Some(4)), (4,Some(5)), (5,Some(6)), (6,None))

arallelize(1 to 6).sliding2Prev.collect
rray((None,1), (Some(1),2), (Some(2),3), (Some(3),4), (Some(4),5), (Some(5),6))

arallelize(1 to 6).sliding3Opt.collect
rray((None,1,Some(2)), (Some(1),2,Some(3)), (Some(2),3,Some(4)), (Some(3),4,Some(5)), (Some(4),5,Some(6)), (Some(5),6,None))

arallelize(1 to 6).sliding3Next.collect
rray((1,Some(2),Some(3)), (2,Some(3),Some(4)), (3,Some(4),Some(5)), (4,Some(5),Some(6)), (5,Some(6),None), (6,None,None))

Arbitrary number of elements in Seqs, optionally preserving total number of elements by keeping partial entries:

arallelize(1 to 6).sliding(4).collect
rray(List(1, 2, 3, 4), List(2, 3, 4, 5), List(3, 4, 5, 6))

a> sc.parallelize(1 to 6).sliding(4, includePartial = true).collect
rray(List(1, 2, 3, 4), List(2, 3, 4, 5), List(3, 4, 5, 6), List(4, 5, 6), List(5, 6), List(6))
.cappedGroupByKey

Like RDD.groupByKey but takes only the first maxPerKey elements for each key:

arallelize(1 to 10).keyBy(_ % 2).cappedGroupByKey(3).collect
rray((0,Vector(2, 4, 6)), (1,Vector(1, 3, 5)))
.sampleByKey

Similar to .cappedGroupByKey, but samples elements from each key in an unbiased manner:

arallelize(1 to 100).keyBy(_ % 2).sampleByKey(3).collect
rray((0,ArrayBuffer(24, 78, 98)), (1,ArrayBuffer(17, 47, 49)))
.splitByKey

Split an RDD[(K, V)] into a Map[K, RDD[V]], i.e. multiple RDDs each containing the values corresponding to one key.

This is generally a questionable thing to want to do, as subsequent operations on each RDD lose out on Spark's ability to parallelize things.

However, if you are going to do it, this implementation is much better than what you might do naively, i.e. using .filter N times on the original RDD.

Instead, this shuffles the full RDD once, into a partitioning where each key's pairs occupy a contiguous range of partitions; it then partition-slices views over those ranges and exposes them as standalone, per-key RDDs.

2-D prefix sum

Given an RDD of elements that each have a logical “row”, “column”, and “summable” value (an RDD[((Int, Int), V)]), generate an RDD that replaces each value with the sum of all values at greater (or equal) (row,col) positions:

n = 4
rdd = sc.parallelize(1 to n flatMap { r ? 1 to n map { c ? r -> c -> r*c } })
printGrid(rdd: RDD[((Int, Int), Int)]) = {
l map = rdd.collectAsMap.toMap
r {
r ? n to 1 by -1
c ? 1 to n
{
print("% 4d".format(map((r, c))))
if (c == n) print("\n")



tGrid(rdd)
 4   8  12  16
 3   6   9  12
 2   4   6   8
 1   2   3   4

prefix_sum.Result(_, cdf, _, _) = rdd.prefixSum2D()

tGrid(cdf)
40  36  28  16
70  63  49  28
90  81  63  36
00  90  70  40

Examples from the tests may clarify further.

“Batch” execution

This feature exposes a coarse avenue for forcing fewer than spark.executor.cores tasks to run concurrently on each executor, during a given stage.

The stage in question is split up into multiple Spark stages, each comprised of a set number of partitions from the upstream RDD (the “batch size”).

If this size is chosen to be ? the number of executors, then in general a maximum of one task will be assigned to each executor.

This can be useful when some stages in an app are very memory-expensive, while others are not: the memory-expensive ones can be “batched” in this way, each task availing itself of a full executor's-worth of memory.

The implementation splits the upstream partitions into (N/batch size) batches and executes each batch as a single stage before combining the results into single RDD:

a> import org.hammerlab.magic.rdd.batch.implicits._
reate RDD of 10 partitions
rdd = sc.parallelize(0 until 100, 10)
res = rdd.batch(numPartitionsPerBatch = 4)

rint operations graph to see how many batches are selected
toDebugString
10) ReduceRDD[4] at RDD at ReduceRDD.scala:19 []
+-(2) MapRDD[3] at RDD at MapRDD.scala:21 []
   +-(4) MapRDD[2] at RDD at MapRDD.scala:21 []
      +-(4) MapRDD[1] at RDD at MapRDD.scala:21 []
         |  ParallelCollectionRDD[0] at parallelize at <console>:27 []

See the package README for more info!

And more!

Browse the code and tests, file an issue, or drop by Gitter for more info.

Building

Typical SBT commands will build/test/package the project:

test
assembly
Releasing

While set to a -SNAPSHOT version:

publish

To release a non--SNAPSHOT version:

publishSigned sonatypeRelease

This work is supported by the National Institutes of Health's National Center for Advancing Translational Sciences, Grant Number U24TR002306. This work is solely the responsibility of the creators and does not necessarily represent the official views of the National Institutes of Health.