Name: neo4j-spark-connector
Owner: Neo4j Contrib
Description: These are the beginnings / experiments of a Connector from Neo4j to Apache Spark using the new binary protocol for Neo4j, Bolt.
Created: 2016-03-03 16:01:08.0
Updated: 2018-05-22 08:17:58.0
Pushed: 2017-08-24 20:44:22.0
Homepage: https://spark-packages.org/package/neo4j-contrib/neo4j-spark-connector
Size: 104
Language: Scala
GitHub Committers
User | Most Recent Commit | # Commits |
---|
Other Committers
User | Most Recent Commit | # Commits |
---|
These are the beginnings of a Connector from Neo4j to Apache Spark 2.1 using the new binary protocol for Neo4j, Bolt.
Find more information about the Bolt protocol, available drivers and documentation.
Please note that I still know very little about Apache Spark and might have done really dumb things. Please let me know by creating an issue or even better submitting a pull request to this repo.
This neo4j-spark-connector is Apache 2 Licensed
Build target/neo4j-spark-connector_2.11-full-2.1.0-M4.jar
for Scala 2.11
mvn clean package
spark-shell, pyspark, or spark-submit
$SPARK_HOME/bin/spark-shell --jars neo4j-spark-connector_2.11-full-2.1.0-M4.jar
$SPARK_HOME/bin/spark-shell --packages neo4j-contrib:neo4j-spark-connector:2.1.0-M4
sbt
If you use the sbt-spark-package plugin, in your sbt build file, add:
rwise,
resolvers += “Spark Packages Repo” at “http://dl.bintray.com/spark-packages/maven” libraryDependencies += “neo4j-contrib” % “neo4j-spark-connector” % “2.1.0-M4”
ven**
our pom.xml, add:
<groupId>neo4j-contrib</groupId>
<artifactId>neo4j-spark-connector</artifactId>
<version>2.1.0-M4</version>
<id>SparkPackagesRepo</id>
<url>http://dl.bintray.com/spark-packages/maven</url>
onfig
ou're running Neo4j on localhost with the default ports, you onl have to configure your password in `spark.neo4j.bolt.password=<password>`.
rwise set the `spark.neo4j.bolt.url` in your `SparkConf` pointing e.g. to `bolt://host:port`.
can provide user and password as part of the URL `bolt://neo4j:<password>@localhost` or individually in `spark.neo4j.bolt.user` and `spark.neo4j.bolt.password`.
uilder API
ting with version 2.1.0-M4 you can use a fluent builder API to declare the queries or patterns you want to use, but also **partitions, total-rows and batch-sizes** and then select which Apache Spark Type to load.
library supports:
DD[Row], RDD[T]` (loadRowR)
ataFrame`
aphX `Graph`
raphFrame`
general usage is
reate `org.neo4j.spark.Neo4j(sc)`
et `cypher(query,[params])`,`nodes(query,[params])`,`rels(query,[params])` as direct query, or </br>
pattern("Label1",Seq("REL"),"Label2")` or `pattern(("Label1","prop1",("REL","prop"),("Label2","prop2))`
ptionally define `partitions(n)`, `batch(size)`, `rows(count)` for parallelism
hoose which datatype to return
`loadRowRdd`, `loadNodeRdds`, `loadRelRdd`, `loadRdd[T]`
`loadDataFrame`,`loadDataFrame(schema)`
`loadGraph[VD,ED]`
`loadGraphFrame[VD,ED]`
xecute Spark Operations
ave graph back:
`saveGraph(grap, [pattern],[nodeProp],[merge=false])`
Example:
org.neo4j.spark.Neo4j(sc).cypher(“MATCH (n:Person) RETURN n.name”).partitions(5).batch(10000).loadRowRdd
sage Examples
Create Test Data in Neo4j
UNWIND range(1,100) as id CREATE (p:Person {id:id}) WITH collect(p) as people UNWIND people as p1 UNWIND range(1,10) as friend WITH p1, people[(p1.id + friend) % size(people)] as p2 CREATE (p1)-[:KNOWS {years: abs(p2.id - p2.id)}]->(p2)
t the Spark-Shell with
ARK_HOME/bin/spark-shell --packages neo4j-contrib:neo4j-spark-connector:2.1.0-M4`
Loading RDDs
import org.neo4j.spark._
val neo = Neo4j(sc)
val rdd = neo.cypher(“MATCH (n:Person) RETURN id(n) as id “).loadRowRdd rdd.count
// inferred schema rdd.first.schema.fieldNames // => [“id”] rdd.first.schema(“id”) // => StructField(id,LongType,true)
neo.cypher(“MATCH (n:Person) RETURN id(n)“).loadRdd[Long].mean // => res30: Double = 236696.5
neo.cypher(“MATCH (n:Person) WHERE n.id <= {maxId} RETURN n.id”).param(“maxId”, 10).loadRowRdd.count // => res34: Long = 10
// provide partitions and batch-size neo.nodes(“MATCH (n:Person) RETURN id(n) SKIP {skip} LIMIT {limit}“).partitions(4).batch(25).loadRowRdd.count // => 100 == 4 * 25
// load via pattern neo.pattern(“Person”,Seq(“KNOWS”),“Person”).rows(80).batch(21).loadNodeRdds.count // => 80 = b/c 80 rows given
// load relationships via pattern neo.pattern(“Person”,Seq(“KNOWS”),“Person”).partitions(12).batch(100).loadRelRdd.count // => 1000
Loading DataFrames
import org.neo4j.spark._
val neo = Neo4j(sc)
// load via Cypher query neo.cypher(“MATCH (n:Person) RETURN id(n) as id SKIP {skip} LIMIT {limit}“).partitions(4).batch(25).loadDataFrame.count // => res36: Long = 100
val df = neo.pattern(“Person”,Seq(“KNOWS”),“Person”).partitions(12).batch(100).loadDataFrame // => org.apache.spark.sql.DataFrame = [id: bigint]
// TODO loadRelDataFrame
Loading GraphX Graphs
import org.neo4j.spark._
val neo = Neo4j(sc)
import org.apache.spark.graphx. import org.apache.spark.graphx.lib.
// load graph via Cypher query val graphQuery = “MATCH (n:Person)-[r:KNOWS]->(m:Person) RETURN id(n) as source, id(m) as target, type(r) as value SKIP {skip} LIMIT {limit}” val graph: Graph[Long, String] = neo.rels(graphQuery).partitions(7).batch(200).loadGraph
graph.vertices.count // => 100 graph.edges.count // => 1000
// load graph via pattern val graph = neo.pattern((“Person”,“id”),(“KNOWS”,“since”),(“Person”,“id”)).partitions(7).batch(200).loadGraph[Long,Long]
val graph2 = PageRank.run(graph, 5)
// => graph2: org.apache.spark.graphx.Graph[Double,Double] =
graph2.vertices.sort(.2).take(3) // => res46: Array[(org.apache.spark.graphx.VertexId, Long)] // => Array((236746,100), (236745,99), (236744,98))
// uses pattern from above to save the data, merge parameter is false by default, only update existing nodes neo.saveGraph(graph, “rank”) // uses pattern from parameter to save the data, merge = true also create new nodes and relationships neo.saveGraph(graph, “rank”,Pattern((“Person”,“id”),(“FRIEND”,“years”),(“Person”,“id”)), merge = true)
Loading GraphFrames
import org.neo4j.spark._
val neo = Neo4j(sc)
import org.graphframes._
val graphFrame = neo.pattern((“Person”,“id”),(“KNOWS”,null), (“Person”,“id”)).partitions(3).rows(1000).loadGraphFrame
graphFrame.vertices.count // => 100 graphFrame.edges.count // => 1000
val pageRankFrame = graphFrame.pageRank.maxIter(5).run() val ranked = pageRankFrame.vertices ranked.printSchema()
val top3 = ranked.orderBy(ranked.col(“pagerank”).desc).take(3) // => top3: Array[org.apache.spark.sql.Row] // => Array([236716,70,0.62285…], [236653,7,0.62285…], [236658,12,0.62285])
// example loading a graph frame with two dedicated Cypher statements val nodesQuery = “match (n:Person) RETURN id(n) as id, n.name as value UNION ALL MATCH (n:Company) return id(n) as id, n.name as value” val relsQuery = “match (p:Person)-[r]->(c:Company) return id(p) as src, id(c) as dst, type(r) as value”
val graphFrame = Neo4j(sc).nodes(nodesQuery,Map.empty).rels(relsQuery,Map.empty).loadGraphFrame
TE: The APIs below were the previous APIs which still work, but I recommend that you use and provide feedback on the new _builder_ API above.**
DD's
e are a few different RDD's all named `Neo4jXxxRDD`
eo4jTupleRDD` returns a Seq[(String,AnyRef)] per row
eo4jRowRDD` returns a spark-sql Row per row
ataFrames
eo4jDataFrame`, a SparkSQL `DataFrame` that you construct either with explicit type information about result names and types
inferred from the first result-row
o4jDataFrame provides `mergeEdgeList(sc: SparkContext, dataFrame: DataFrame, source: (label,Seq[prop]), relationship: (type,Seq[prop]), target: (label,Seq[prop]))` to merge a DataFrame back into a Neo4j graph
both nodes are merged by first propery in sequence, all the others will be set on the entity, relat
relationships are merged between the two nodes and all properties from sequence will be set on the relationship
property names from the sequence are used as column names for the data-frame, currently there is no name translation
the result are sent in batches of 10000 to the graph
raphX - Neo4jGraph
eo4jGraph` has methods to load and save a GraphX graph
eo4jGraph.execute` runs a Cypher statement and returns a `CypherResult` with the `keys` and an `rows` Iterator of `Array[Any]`
eo4jGraph.loadGraph(sc, label,rel-types,label2)` loads a graph via the relationships between those labeled nodes
eo4jGraph.saveGraph(sc, graph, [nodeProp], [relTypeProp (type,prop)], [mainLabelId (label,prop)],[secondLabelId (label,prop)],merge=false)` saves a graph object to Neo4j by updating the given node- and relationship-properties
eo4jGraph.loadGraphFromNodePairs(sc,stmt,params)` loads a graph from pairs of node-id's
eo4jGraph.loadGraphFromRels(sc,stmt,params)` loads a graph from pairs of start- and end-node-id's and and additional value per relationship
eo4jGraph.loadGraph(sc, (stmt,params), (stmt,params))` loads a graph with two dedicated statements first for nodes, second for relationships
raph Frames
phFrames](http://graphframes.github.io/) ([Spark Packages](http://spark-packages.org/package/graphframes/graphframes)) are a new Apache Spark API to process graph data.
s similar and based on DataFrames, you can create GraphFrames from DataFrames and also from GraphX graphs.
eo4jGraphFrame(sqlContext, (srcNodeLabel,nodeProp), (relType,relProp), dst:(dstNodeLabel,dstNodeProp)` loads a graph with the given source and destination nodes and the relationships in between, the relationship-property is optional and can be null
eo4jGraphFrame.fromGraphX(sc,label,Seq(rel-type),label)` loads a graph with the given pattern
eo4jGraphFrame.fromEdges(sqlContext, srcNodeLabel, Seq(relType), dstNodeLabel)`
xample Usage
Setup
load and install Apache Spark 2.1 from http://spark.apache.org/downloads.html
load and install Neo4j 3.0.0 or later (e.g. from http://neo4j.com/download/)
a simple dataset of connected people run the two following Cypher statements, that create 1M people and 1M relationships in about a minute.
FOREACH (x in range(1,1000000) | CREATE (:Person {name:"name"+x, age: x%100}));
UNWIND range(1,1000000) as x
MATCH (n),(m) WHERE id(n) = x AND id(m)=toInt(rand()*1000000)
CREATE (n)-[:KNOWS]->(m);
Dependencies
can also provide the dependencies to spark-shell or spark-submit via `--packages` and optionally `--repositories`.
$SPARK_HOME/bin/spark-shell \
--conf spark.neo4j.bolt.password=<neo4j-password> \
--packages neo4j-contrib:neo4j-spark-connector:2.1.0-M4
Neo4j(Row|Tuple)RDD
$SPARK_HOME/bin/spark-shell --conf spark.neo4j.bolt.password=<neo4j-password> \
--packages neo4j-contrib:neo4j-spark-connector:2.1.0-M4
import org.neo4j.spark._
Neo4jTupleRDD(sc,"MATCH (n) return id(n)",Seq.empty).count
// res46: Long = 1000000
Neo4jRowRDD(sc,"MATCH (n) where id(n) < {maxId} return id(n)",Seq("maxId" -> 100000)).count
// res47: Long = 100000
Neo4jDataFrame
$SPARK_HOME/bin/spark-shell --conf spark.neo4j.bolt.password=<neo4j-password> \
--packages neo4j-contrib:neo4j-spark-connector:2.1.0-M4
import org.neo4j.spark._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val df = Neo4jDataFrame.withDataType(sqlContext, "MATCH (n) return id(n) as id",Seq.empty, "id" -> LongType)
// df: org.apache.spark.sql.DataFrame = [id: bigint]
df.count
// res0: Long = 1000000
val query = "MATCH (n:Person) return n.age as age"
val df = Neo4jDataFrame.withDataType(sqlContext,query, Seq.empty, "age" -> LongType)
// df: org.apache.spark.sql.DataFrame = [age: bigint]
df.agg(sum(df.col("age"))).collect()
// res31: Array[org.apache.spark.sql.Row] = Array([49500000])
query: String = MATCH (n:Person) return n.age as age
// val query = "MATCH (n:Person)-[:KNOWS]->(m:Person) where n.id = {x} return m.age as age"
val query = "MATCH (n:Person) where n.id = {x} return n.age as age"
val rdd = sc.makeRDD(1.to(1000000))
val ages = rdd.map( i => {
val df = Neo4jDataFrame.withDataType(sqlContext,query, Seq("x"->i.asInstanceOf[AnyRef]), "age" -> LongType)
df.agg(sum(df("age"))).first().getLong(0)
})
// TODO
val ages.reduce( _ + _ )
val df = Neo4jDataFrame(sqlContext, "MATCH (n) WHERE id(n) < {maxId} return n.name as name",Seq("maxId" -> 100000),"name" -> "string")
df.count
// res0: Long = 100000
Neo4jGraph Operations
$SPARK_HOME/bin/spark-shell --conf spark.neo4j.bolt.password=<neo4j-password> \
--packages neo4j-contrib:neo4j-spark-connector:2.1.0-M4
import org.neo4j.spark._
val g = Neo4jGraph.loadGraph(sc, "Person", Seq("KNOWS"), "Person")
// g: org.apache.spark.graphx.Graph[Any,Int] = org.apache.spark.graphx.impl.GraphImpl@574985d8
g.vertices.count
// res0: Long = 999937
g.edges.count
// res1: Long = 999906
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
val g2 = PageRank.run(g, 5)
val v = g2.vertices.take(5)
// v: Array[(org.apache.spark.graphx.VertexId, Double)] = Array((185012,0.15), (612052,1.0153273593749998), (354796,0.15), (182316,0.15), (199516,0.38587499999999997))
Neo4jGraph.saveGraph(sc, g2, "rank")
// res2: (Long, Long) = (999937,0)
// full syntax example
Neo4jGraph.saveGraph(sc, graph, "rank",("LIKES","score"),Some(("Person","name")),Some(("Movie","title")), merge=true)
Neo4jGraphFrame
hFrames are a new Apache Spark API to process graph data.
s similar and based on DataFrames, you can create GraphFrames from DataFrames and also from GraphX graphs.
e was a recent release (0.5.0) of GraphFrames for Spark 2.1 and Scala 2.11 which we use.
s available on the [Maven repository for Apache Spark Packages](http://dl.bintray.com/spark-packages/maven/graphframes/graphframes).
urces:
ntroduction article](https://databricks.com/blog/2016/03/03/introducing-graphframes.html)
PI Docs](http://graphframes.github.io/api/scala/index.html#org.graphframes.GraphFrame$)
[Flights Example](https://databricks.com/blog/2016/03/16/on-time-flight-performance-with-spark-graphframes.html)
[SparkSummit Video](https://spark-summit.org/east-2016/speakers/ankur-dave/)
$SPARK_HOME/bin/spark-shell --conf spark.neo4j.bolt.password=<neo4j-password> \
--packages neo4j-contrib:neo4j-spark-connector:2.1.0-M4
import org.neo4j.spark._
val gdf = Neo4jGraphFrame(sqlContext,"Person" -> "name",("KNOWS"),"Person" -> "name")
// gdf: org.graphframes.GraphFrame = GraphFrame(v:[id: bigint, prop: string],
// e:[src: bigint, dst: bigint, prop: string])
val gdf = Neo4jGraphFrame.fromGraphX(sc,"Person",Seq("KNOWS"),"Person")
gdf.vertices.count // res0: Long = 1000000
gdf.edges.count // res1: Long = 999999
val results = gdf.pageRank.resetProbability(0.15).maxIter(5).run
// results: org.graphframes.GraphFrame = GraphFrame(
// v:[id: bigint, prop: string, pagerank: double],
// e:[src: bigint, dst: bigint, prop: string, weight: double])
results.vertices.take(5)
// res3: Array[org.apache.spark.sql.Row] = Array([31,name32,0.96820096875], [231,name232,0.15],
// [431,name432,0.15], [631,name632,1.1248028437499997], [831,name832,0.15])
// pattern matching
val results = gdf.find("(A)-[]->(B)").select("A","B").take(3)
// results: Array[org.apache.spark.sql.Row] = Array([[159148,name159149],[31,name32]],
// [[461182,name461183],[631,name632]], [[296686,name296687],[1031,name1032]])
gdf.find("(A)-[]->(B);(B)-[]->(C); !(A)-[]->(C)")
// res8: org.apache.spark.sql.DataFrame = [A: struct<id:bigint,prop:string>, B: struct<id:bigint,prop:string>, C: struct<id:bigint,prop:string>]
gdf.find("(A)-[]->(B);(B)-[]->(C); !(A)-[]->(C)").take(3)
// res9: Array[org.apache.spark.sql.Row] = Array([[904749,name904750],[702750,name702751],[122280,name122281]], [[240723,name240724],[813112,name813113],[205438,name205439]], [[589543,name589544],[600245,name600246],[659932,name659933]])
// doesn't work yet ... complains about different table widths
val results = gdf.find("(A)-[]->(B); (B)-[]->(C); !(A)-[]->(C)").filter("A.id != C.id")
// Select recommendations for A to follow C
val results = results.select("A", "C").take(3)
gdf.labelPropagation.maxIter(3).run().take(3)
eo4j-Java-Driver
project uses the [java driver](http://github.com/neo4j/neo4j-java-driver) for Neo4j's Bolt protocol.
se its `org.neo4j.driver:neo4j-java-driver:1.0.4` version.
esting
ing is done using `neo4j-harness`, a [test library](http://neo4j.com/docs/java-reference/current/#server-unmanaged-extensions-testing) for starting an in-process Neo4j-Server which you can use either with a JUnit `@Rule` or directly.
ly start one server and one SparkContext per test-class to avoid the lifecycle overhead.
se note that Neo4j running an in-process server pulls in Scala 2.11 for Cypher, so you need to run the tests with scala_2.11.
's why I had to add two profiles for the different Scala versions.