介绍

最近接触了一个项目,需要从海量的数据中提取模型来导入到neo4j中,然后做一些关系的分析,这个项目分为两个流程方向,一个是初始化的数据导入,一个是增量的数据导入.

  1. 初始化数据导入,很简单了,使用neo4j-admin 导入csv
  2. 数据增量,这个就需要数据处理脚本来定时计算和导入了 这次内容主要是围绕Spark 和 Neo4j的增量的两个方法的介绍,以及源码的一些展示和修改

源码分析和修改

  1. 下面代码是创建节点,是修改源代码后的样子

解释:
如果节点存在,直接跳过,如果不存在就新建节点,并且包括属性

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
   * 创建节点
   *
   * @param sc  
   * @param dataFrame 
   * @param nodes
   * @param renamedColumns
   */
  def mergeNodes(sc: SparkContext, dataFrame: DataFrame, nodes: (String, Seq[String]), renamedColumns: Map[String, String] = Map.empty): Unit = {
    val nodeLabel: String = renamedColumns.getOrElse(nodes._2.head, nodes._2.head)
    val query =
      s"""
        UNWIND {rows} as row
        MERGE (node:`${nodes._1}` {`${nodeLabel}` : row.node.`${nodeLabel}`}) ON CREATE SET node = row.node
        """
    val partitions = Math.max(1, (dataFrame.count() / 10000).asInstanceOf[Int])
    val config = Neo4jConfig(sc.getConf)
    dataFrame.repartition(partitions).foreachPartition(rows => {
      val params: AnyRef = rows.map(r =>
        Map(
          "node" -> nodes._2.map(c => (renamedColumns.getOrElse(c, c), r.getAs[AnyRef](c))).toMap.asJava)
          .asJava).asJava
      Neo4jDataFrame.execute(config, query, Map("rows" -> params).asJava, write = true)
    })
  }

参数介绍:

  1. sc 是SparkContext的实例
  2. dataFrame 是数据源
  3. nodes 结构式是 (String, Seq[String]) 例如: (节点标签, Seq(“属性名称”)))
     注意:第一个属性会作为一个查询是否存在的作用
  4. renamedColumns 结构式是 Map[String, String] 例如: Map[原有的名称, 入neo4j的名称]

执行过程:
第10行: 获取节点的标签
第12行: 组装Cypher的语句
第16行: 计算出需要分几个区
第17行: 获取neo4j的配置
第18行: 准备每个分区的任务
第19行: 准备某一个分区的任务
第20行: 处理一个分组中的一行数据
第21行: 处理节点的属性数据
第23行: 完成组装语句的填充,并执行语句

  1. 下面是创建关系的源代码,没有修改过

解释:
创建关系,如果两个节点没有的话,会自动创建

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/***
   * 创建关系
   * @param sc
   * @param dataFrame
   * @param source
   * @param relationship
   * @param target
   * @param renamedColumns
   */
  def mergeEdgeList(sc: SparkContext,
                    dataFrame: DataFrame,
                    source: (String,Seq[String]),
                    relationship: (String,Seq[String]),
                    target: (String,Seq[String]),
                    renamedColumns: Map[String,String] = Map.empty): Unit = {

    val sourceLabel: String = renamedColumns.getOrElse(source._2.head, source._2.head)
    val targetLabel: String = renamedColumns.getOrElse(target._2.head, target._2.head)
    val mergeStatement = s"""
        UNWIND {rows} as row
        MERGE (source:`${source._1}` {`${sourceLabel}` : row.source.`${sourceLabel}`}) ON CREATE SET source += row.source
        MERGE (target:`${target._1}` {`${targetLabel}` : row.target.`${targetLabel}`}) ON CREATE SET target += row.target
        MERGE (source)-[rel:`${relationship._1}`]->(target) ON CREATE SET rel += row.relationship
        """
    val partitions = Math.max(1,(dataFrame.count() / 10000).asInstanceOf[Int])
    val config = Neo4jConfig(sc.getConf)
    dataFrame.repartition(partitions).foreachPartition( rows => {
      val params: AnyRef = rows.map(r =>
        Map(
          "source" -> source._2.map( c => (renamedColumns.getOrElse(c,c), r.getAs[AnyRef](c))).toMap.asJava,
          "target" -> target._2.map( c => (renamedColumns.getOrElse(c,c), r.getAs[AnyRef](c))).toMap.asJava,
          "relationship" -> relationship._2.map( c => (c, r.getAs[AnyRef](c))).toMap.asJava)
          .asJava).asJava
      execute(config, mergeStatement, Map("rows" -> params).asJava, write = true)
    })
  }

参数介绍:

  1. sc 是SparkContext的实例
  2. dataFrame 是数据源
  3. source 是开始节点(叫法很多,意义明白就好) 结构式:(String,Seq[String]) 例如:(节点标签, Seq(“属性名称”))),第一个属性用来判断是否存在节点
  4. relationship 是关系 结构式:(String,Seq[String]) 例如:(“关系类型名称”,Seq(“属性名称”))
  5. target 是结束节点 结构式:(String,Seq[String]) 例如:(节点标签, Seq(“属性名称”))),第一个属性用来判断是否存在节点
  6. renamedColumns 结构式是 Map[String, String] 例如: Map[原有的名称, 入neo4j的名称]

执行过程:与创建节点过程类似,主要区别是Cyhper的语句不同

总结

Spark结合Neo4j 可以做很多的事情,這里只是简单的一个场景应用.