Name: spark-redis
Owner: HubSpot
Description: A connector for Spark that allows reading and writing to/from Redis cluster
Forked from: RedisLabs/spark-redis
Created: 2016-09-28 19:15:37.0
Updated: 2016-09-28 19:15:38.0
Pushed: 2016-09-28 19:25:17.0
Size: 1601
Language: Scala
GitHub Committers
User | Most Recent Commit | # Commits |
---|
Other Committers
User | Most Recent Commit | # Commits |
---|
A library for reading and writing data from and to Redis with Apache Spark
Spark-Redis provides access to all of Redis' data structures - String, Hash, List, Set and Sorted Set - from Spark as RDDs. The library can be used both with Redis stand-alone as well as clustered databases. When used with Redis cluster, Spark-Redis is aware of its partitioning scheme and adjusts in response to resharding and node failure events.
Spark-Redis also provides Spark-Streaming support.
You'll need the the following to use Spark-Redis:
This library is work in progress so the API may change before the official release.
You can download the library's source and build it:
clone https://github.com/RedisLabs/spark-redis.git
park-redis
clean package -DskipTests
Jedis' current version - v2.7 - does not support reading from Redis cluster's slave nodes. This functionality will only be included in its upcoming version, v2.8.
To use Spark-Redis with Redis cluster's slave nodes, the library's source includes a pre-release of Jedis v2.8 under the with-slaves
branch. Switch to that branch by entering the following before running mvn clean install
:
checkout with-slaves
Add Spark-Redis to Spark with the --jars
command line option. For example, use it from spark-shell, include it in the following manner:
n/spark-shell --jars <path-to>/spark-redis-<version>.jar,<path-to>/jedis-<version>.jar
ome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
g Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_79)
The following sections contain code snippets that demonstrate the use of Spark-Redis. To use the sample code, you'll need to replace your.redis.server
and 6379
with your Redis database's IP address or hostname and port, respectively.
Below is an example configuration of SparkContext with redis configuration:
rt com.redislabs.provider.redis._
new SparkContext(new SparkConf()
.setMaster("local")
.setAppName("myApp")
// initial redis host - can be any node in cluster mode
.set("redis.host", "localhost")
// initial redis port
.set("redis.port", "6379")
// optional redis AUTH password
.set("redis.auth", "")
The supported configuration keys are:
redis.host
- host or IP of the initial node we connect to. The connector will read the cluster
topology from the initial node, so there is no need to provide the rest of the cluster nodes.redis.port
- the inital node's TCP redis port.redis.auth
- the initial node's AUTH passwordredis.db
- optional DB number. Avoid using this, especially in cluster mode.Since data access in Redis is based on keys, to use Spark-Redis you'll first need a keys RDD. The following example shows how to read key names from Redis into an RDD:
rt com.redislabs.provider.redis._
keysRDD = sc.fromRedisKeyPattern("foo*", 5)
keysRDD = sc.fromRedisKeys(Array("foo", "bar"), 5)
The above example populates the keys RDD by retrieving the key names from Redis that match the given pattern (foo*
) or the keys can be listed by an Array. Furthermore, it overrides the default setting of 3 partitions in the RDD with a new value of 5 - each partition consists of a set of Redis cluster hashslots contain the matched key names.
Each of Redis' data types can be read to an RDD. The following snippet demonstrates reading Redis Strings.
rt com.redislabs.provider.redis._
stringRDD = sc.fromRedisKV("keyPattern*")
stringRDD = sc.fromRedisKV(Array("foo", "bar"))
Once run, stringRDD: RDD[(String, String)]
will contain the string values of all keys whose names are provided by keyPattern or Array[String].
hashRDD = sc.fromRedisHash("keyPattern*")
hashRDD = sc.fromRedisHash(Array("foo", "bar"))
This will populate hashRDD: RDD[(String, String)]
with the fields and values of the Redis Hashes, the hashes' names are provided by keyPattern or Array[String]
listRDD = sc.fromRedisList("keyPattern*")
listRDD = sc.fromRedisList(Array("foo", "bar"))
The contents (members) of the Redis Lists in whose names are provided by keyPattern or Array[String] will be stored in listRDD: RDD[String]
setRDD = sc.fromRedisSet("keyPattern*")
setRDD = sc.fromRedisSet(Array("foo", "bar"))
The Redis Sets' members will be written to setRDD: RDD[String]
.
zsetRDD = sc.fromRedisZSetWithScore("keyPattern*")
zsetRDD = sc.fromRedisZSetWithScore(Array("foo", "bar"))
Using fromRedisZSetWithScore
will store in zsetRDD: RDD[(String, Double)]
, an RDD that consists of members and their scores, from the Redis Sorted Sets whose keys are provided by keyPattern or Array[String].
zsetRDD = sc.fromRedisZSet("keyPattern*")
zsetRDD = sc.fromRedisZSet(Array("foo", "bar"))
Using fromRedisZSet
will store in zsetRDD: RDD[String]
, an RDD that consists of members, from the Redis Sorted Sets whose keys are provided by keyPattern or Array[String].
startPos: Int = _
endPos: Int = _
zsetRDD = sc.fromRedisZRangeWithScore("keyPattern*", startPos, endPos)
zsetRDD = sc.fromRedisZRangeWithScore(Array("foo", "bar"), startPos, endPos)
Using fromRedisZRangeWithScore
will store in zsetRDD: RDD[(String, Double)]
, an RDD that consists of members and the members' ranges are within [startPos, endPos] of its own Sorted Set, from the Redis Sorted Sets whose keys are provided by keyPattern or Array[String].
startPos: Int = _
endPos: Int = _
zsetRDD = sc.fromRedisZRange("keyPattern*", startPos, endPos)
zsetRDD = sc.fromRedisZRange(Array("foo", "bar"), startPos, endPos)
Using fromRedisZSet
will store in zsetRDD: RDD[String]
, an RDD that consists of members and the members' ranges are within [startPos, endPos] of its own Sorted Set, from the Redis Sorted Sets whose keys are provided by keyPattern or Array[String].
min: Double = _
max: Double = _
zsetRDD = sc.fromRedisZRangeByScoreWithScore("keyPattern*", min, max)
zsetRDD = sc.fromRedisZRangeByScoreWithScore(Array("foo", "bar"), min, max)
Using fromRedisZRangeByScoreWithScore
will store in zsetRDD: RDD[(String, Double)]
, an RDD that consists of members and the members' scores are within [min, max], from the Redis Sorted Sets whose keys are provided by keyPattern or Array[String].
min: Double = _
max: Double = _
zsetRDD = sc.fromRedisZRangeByScore("keyPattern*", min, max)
zsetRDD = sc.fromRedisZRangeByScore(Array("foo", "bar"), min, max)
Using fromRedisZSet
will store in zsetRDD: RDD[String]
, an RDD that consists of members and the members' scores are within [min, max], from the Redis Sorted Sets whose keys are provided by keyPattern or Array[String].
To write data from Spark to Redis, you'll need to prepare the appropriate RDD depending on the data type you want to use for storing the data in it.
For String values, your RDD should consist of the key-value pairs that are to be written. Assuming that the strings RDD is called stringRDD
, use the following snippet for writing it to Redis:
oRedisKV(stringRDD)
To store a Redis Hash, the RDD should consist of its field-value pairs. If the RDD is called hashRDD
, the following should be used for storing it in the key name specified by hashName
:
oRedisHASH(hashRDD, hashName)
Use the following to store an RDD in a Redis List:
oRedisLIST(listRDD, listName)
Use the following to store an RDD in a fixed-size Redis List:
oRedisFixedLIST(listRDD, listName, listSize)
The listRDD
is an RDD that contains all of the list's string elements in order, and listName
is the list's key name.
listSize
is an integer which specifies the size of the redis list; it is optional, and will default to an unlimited size.
For storing data in a Redis Set, use toRedisSET
as follows:
oRedisSET(setRDD, setName)
Where setRDD
is an RDD with the set's string elements and setName
is the name of the key for that set.
oRedisZSET(zsetRDD, zsetName)
The above example demonstrates storing data in Redis in a Sorted Set. The zsetRDD
in the example should contain pairs of members and their scores, whereas zsetName
is the name for that key.
Spark-Redis support streaming data from Redis instance/cluster, currently streaming data are fetched from Redis' List by the blpop
command. Users are required to provide an array which stores all the List names they are interested in. The storageLevel is MEMORY_AND_DISK_SER_2
by default, you can change it on your demand.
createRedisStream
will create a (listName, value)
stream, but if you don't care about which list feeds the value, you can use createRedisStreamWithoutListname
to get the only value
stream.
Use the following to get a (listName, value)
stream from foo
and bar
list
rt org.apache.spark.streaming.{Seconds, StreamingContext}
rt org.apache.spark.storage.StorageLevel
rt com.redislabs.provider.redis._
ssc = new StreamingContext(sc, Seconds(1))
redisStream = ssc.createRedisStream(Array("foo", "bar"), storageLevel = StorageLevel.MEMORY_AND_DISK_2)
sStream.print
awaitTermination()
Use the following to get a value
stream from foo
and bar
list
rt org.apache.spark.streaming.{Seconds, StreamingContext}
rt org.apache.spark.storage.StorageLevel
rt com.redislabs.provider.redis._
ssc = new StreamingContext(sc, Seconds(1))
redisStream = ssc.createRedisStreamWithoutListname(Array("foo", "bar"), storageLevel = StorageLevel.MEMORY_AND_DISK_2)
sStream.print
awaitTermination()
twoEndpointExample ( sc: SparkContext) = {
l redisConfig1 = new RedisConfig(new RedisEndpoint("127.0.0.1", 6379, "passwd"))
l redisConfig2 = new RedisConfig(new RedisEndpoint("127.0.0.1", 7379))
l rddFromEndpoint1 = {
//endpoint("127.0.0.1", 6379) as the default connection in this block
implicit val c = redisConfig1
sc.fromRedisKV("*")
l rddFromEndpoint2 = {
//endpoint("127.0.0.1", 7379) as the default connection in this block
implicit val c = redisConfig2
sc.fromRedisKV("*")
If you want to use multiple redis clusters/instances, an implicit RedisConfig can be used in a code block to specify the target cluster/instance in that block.
You're encouraged to contribute to the open source Spark-Redis project. There are two ways you can do so.
If you encounter an issue while using the Spark-Redis library, please report it at the project's issues tracker.
Code contributions to the Spark-Redis project can be made using pull requests. To submit a pull request: