Spark GraphX 图操作

Spark GraphX 图操作

在GraphX中,核心操作都是被优化过的,组合核心操作的定义在GraphOps中。

由于Scala隐式转换,定义在GraphOps的操作可以在Graph的成员中获取。例如:我们计算图中每个顶点的入度.(该方法是定义在GraphOps)

val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val inDegrees: VertexRDD[Int] = graph.inDegrees

下面我们列出常用的几个图操作。

操作列表概述

这里只列出Graph中常用的操作函数API,仍有一些高级函数没有列出,如果需要还请参考Spark API文档。

/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
  // Information about the Graph ===================================================================
  val numEdges: Long
  val numVertices: Long
  val inDegrees: VertexRDD[Int]
  val outDegrees: VertexRDD[Int]
  val degrees: VertexRDD[Int]
  // Views of the graph as collections =============================================================
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
  val triplets: RDD[EdgeTriplet[VD, ED]]
  // Functions for caching graphs ==================================================================
  def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  def cache(): Graph[VD, ED]
  def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
  // Change the partitioning heuristic  ============================================================
  def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
  // Transform vertex and edge attributes ==========================================================
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
    : Graph[VD, ED2]
  // Modify the graph structure ====================================================================
  def reverse: Graph[VD, ED]
  def subgraph(
      epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
      vpred: (VertexId, VD) => Boolean = ((v, d) => true))
    : Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
  // Join RDDs with the graph ======================================================================
  def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])
      (mapFunc: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
  // Aggregate information about adjacent triplets =================================================
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A]
  // Iterative graph-parallel computation ==========================================================
  def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]
  // Basic graph algorithms ========================================================================
  def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  def connectedComponents(): Graph[VertexId, ED]
  def triangleCount(): Graph[Int, ED]
  def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
}

属性操作

属性图中包括类似RDD map的操作,如下图:

class Graph[VD, ED] {
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}

mapVertices遍历所有的顶点,mapEdges遍历所有的边,mapTriplets遍历所有的三元组。

注意,属性操作下,图的结构都不受影响。这些操作的一个重要特征是它允许所得图形重用原有图形的结构索引(indices)。

属性操作常用来进行特殊计算或者排除不需要的属性.我们依旧以上一章节的图为例,进行操作。下面没有列出全部代码,包的导入和图的构建请参考上一章节的内容 Spark GraphX - 简介 。

/***********************************
*属性操作
***********************************/
 println("---------------------------------------------")
 println("给图中每个顶点的职业名的末尾加上'dblab'字符串")
 graph.mapVertices{ case (id, (name, occupation)) => (id, (name, occupation+"dblab"))}.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))
 println("---------------------------------------------")
 println("给图中每个元组的Edge的属性值设置为源顶点属性值加上目标顶点属性值:")
 graph.mapTriplets(triplet => triplet.srcAttr._2 + triplet.attr + triplet.dstAttr._2).edges.collect.foreach(println(_))

结构操作

目前Spark GraphX只支持一些简单的常用结构操作,还在不断完善中。

常用的操作如下:

class Graph[VD, ED] {
  def reverse: Graph[VD, ED]
  def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
               vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}

reverse操作返回一个所有边方向取反的新图。该反转操作并没有修改图中顶点、边的属性,更没有增加边的数量。

subgraph操作主要利用顶点和边进行判断,返回的新图中包含满足判断要求的顶点、边。该操作常用于一些情景,比如:限制感兴趣的图顶点和边,删除损坏连接。如下代码:

/***********************************
*展示结构操作
***********************************/
graph.triplets.map(
   triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
println("---------------------------------------------")
println("删除不存在的节点,构建子图")
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
   triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
println("---------------------------------------------")
println("构建职业是professor的子图,并打印子图的顶点")
val subGraph = graph.subgraph(vpred = (id, attr) => attr._2 == "prof")
subGraph.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))

mask 操作构造一个子图,这个子图包含输入图中包含的顶点和边。这个操作可以和subgraph操作相结合,基于另外一个相关图的特征去约束一个图。例如,我们可以使用丢失顶点的图运行连通分支,然后限制有效子图的返回,见如下代码:

println("---------------------------------------------")
println("运行联通分支")
val ccGraph = graph.connectedComponents() 
val validCCGraph = ccGraph.mask(validGraph)

groupEdges 操作合并多重图中的并行边(如顶点对之间重复的边)。在大量的 应用程序中,并行的边可以合并(它们的权重合并)为一条边从而降低图的大小。

关联操作

在很多情况下,需要将外部数据添加到图中。例如,我们可能有额外的用户属性,我们想把它融合到一个存在图中或者从一个图提取数据属性到另一个图。这些任务可以使用join操作来实现。下面我们列出了关键的join操作:

class Graph[VD, ED] {
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
    : Graph[VD, ED]
  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
}

joinVertices操作连接外部RDD的顶点,返回一个新的带有顶点特征的图。这些特征是通过在连接顶点的结果上使用用户自定义的 map 函数获得的。没有 匹配的顶点保留其原始值。

outerJoinVertices操作和joinVertices操作相似,但用户自定义的map函数可以被应用到所有顶点和改变顶点类型。

/***********************************
 *展示关联操作
***********************************/
println("**********************************************************")
println("关联操作")
println("**********************************************************")
val inDegrees: VertexRDD[Int] = graph.inDegrees
case class User(name: String, occupation: String, inDeg: Int, outDeg: Int)
//创建一个新图,顶点VD的数据类型为User,并从graph做类型转换
val initialUserGraph: Graph[User, String] = graph.mapVertices { case (id, (name,   occupation)) => User(name, occupation , 0, 0)}
//initialUserGraph与inDegrees、outDegrees(RDD)进行连接,并修改initialUserGraph中inDeg值、outDeg值
val userGraph = initialUserGraph.outerJoinVertices(initialUserGraph.inDegrees) {
      case (id, u, inDegOpt) => User(u.name, u.occupation, inDegOpt.getOrElse(0), u.outDeg)
    }.outerJoinVertices(initialUserGraph.outDegrees) {
      case (id, u, outDegOpt) => User(u.name, u.occupation, u.inDeg,outDegOpt.getOrElse(0))
    }
println("连接图的属性:")
userGraph.vertices.collect.foreach(v => println(s"${v._2.name} inDeg: ${v._2.inDeg}  outDeg: ${v._2.outDeg}"))
println("出度和入度相同的人员:")
userGraph.vertices.filter {
   case (id, u) => u.inDeg == u.outDeg
 }.collect.foreach {
   case (id, property) => println(property.name)
}

聚合操作

在很多图分析任务中一个关键步骤就是集合每一个顶点的邻居信息。例如,我们想知道每一个用户的追随者数量或者追随者的平均年龄。一些迭代的图算法(像PageRank,最短路径和联通组件)就需要反复得聚合相邻顶点的属性。

信息聚合(aggregateMessages)

在GraphX中最核心的聚合操作就是aggregateMessages。它主要功能是向邻边发消息,合并邻边收到的消息。

class Graph[VD, ED] {
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[Msg]
}

接口中含有三个参数,分别表示:

sendMsg:发消息函数
mergeMsg:合并消息函数
tripletFields:发消息的方向

下面使用aggregateMessages来计算比用户年龄更大的追随者的平均年龄,这里我们新建一个模型图代替上一节创建的模型的图,整体代码如下:

import org.apache.log4j.{Level,Logger}
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.GraphGenerators
object SimpleGraphX {
  def main(args: Array[String]) {
    //屏蔽日志
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    //设置运行环境
    val conf = new SparkConf().setAppName("SimpleGraphX").setMaster("local")
    val sc = new SparkContext(conf)
    val graph: Graph[Double, Int] =
      GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
    // Compute the number of older followers and their total age
    val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
      triplet => { // Map Function
        if (triplet.srcAttr > triplet.dstAttr) {
          // Send message to destination vertex containing counter and age
          triplet.sendToDst(1, triplet.srcAttr)
        }
      },
      // Add counter and age
      (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
    )
    val avgAgeOfOlderFollowers: VertexRDD[Double] =
      olderFollowers.mapValues( (id, value) =>
        value match { case (count, totalAge) => totalAge / count } )
    avgAgeOfOlderFollowers.collect.foreach(println(_))
  }

计算每个顶点的度

在上一节中,就已经用了计算每个顶点的度(每一个顶点边的数量)的实例,这也是一种常见的聚合操作。在有向图的情况下,它经常知道入度,出度和每个顶点的总度。 GraphOps 类包含了每一个顶点的一系列的度的计算。例如:在下面将计算最大入度,出度和总度:

def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
  if (a._2 > b._2) a else b
}
val maxInDegree: (VertexId, Int)  = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int)   = graph.degrees.reduce(max)

缓存操作

Spark中,RDDs默认是没有持久存储在内存。当多次使用RDDs时,为了避免重复计算,RDDs必须被显式缓存。GraphX中的图也是相同的方式,当使用一个图多次时,首先确认调用Graph.cache()

对于迭代计算来说,迭代的中间结果将填充到缓存中。虽然最终会被删除,但是保存在内存中的不需要的数据将会减慢垃圾回收。对于迭代计算,我们建议使用 Pregel API,它可以正确的不持久化中间结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/495765.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

推特Twitter有直播功能吗?如何用Twitter直播?

现在各大直播平台已经成为社交媒体营销的一种重要渠道,它让品牌能够即时地与全球受众进行互动。据统计,直播市场正在迅速增长,预计到2028年将达到2230亿美元的规模。在这个不断扩张的市场中,许多社交媒体平台如YouTube、Facebook、…

【OS探秘】【虚拟化】【软件开发】【网络安全】在Windows11上安装Kali Linux虚拟机

一、所需原料 Windows 11主机、Oracle VM VirtualBox虚拟化平台、Kali Linux镜像文件 二、安装步骤 1、 在VBox管理器中,点击“新建”,进入向导模式,指定各个字段的值: 2、 安装完成,启动虚拟机: 3、 选择…

[linux初阶][vim-gcc-gdb] OneCharter: vim编辑器

一.vim编辑器基础 目录 一.vim编辑器基础 ①.vim的语法 ②vim的三种模式 ③三种模式的基本切换 ④各个模式下的一些操作 二.配置vim环境 ①手动配置(不推荐) ②自动配置(推荐) vim是vi的升级版,包含了更加丰富的功能. ①.vim的语法 vim [文件名] ②vim的三种模式 命令…

慧天[HTWATER]:采用CUDA框架实现耦合模型并行求解

慧天[HTWATER]软件简介 针对城市排水系统基础设施数据管理的需求,以及水文、水力及水质模拟对数据的需求,实现了以数据库方式对相应数据的存储。可以对分流制排水系统及合流制排水系统进行地表水文、管网水力、水质过程的模拟计算。可以对城市低影响开发…

CV论文--2024.3.28

1、Efficient Video Object Segmentation via Modulated Cross-Attention Memory 中文标题:通过调制交叉注意力记忆进行高效视频对象分割 简介:最近,基于Transformer的方法在半监督视频对象分割方面取得了出色的结果。然而,由于这…

【C++】手撕哈希表的闭散列和开散列

> 作者:დ旧言~ > 座右铭:松树千年终是朽,槿花一日自为荣。 > 目标:手撕哈希表的闭散列和开散列 > 毒鸡汤:谁不是一边受伤,一边学会坚强。 > 专栏选自:C嘎嘎进阶 > 望小伙伴们…

通过在线编程彻底搞懂transformer模型之三:为啥大语言模型都做不好数学题

为什么大语言模型做不好数学题?这个要从大语言模型的原理来讲。 这里是这篇文字的视频讲解,可能视频讲得更清楚一些: 写代码彻底搞懂attention注意力机制 – LLM transformer系列,附:在线编程地址 现代大语言模型都源自于2017年…

Excel 十字交叉聚光灯查询,再也不用担心看串行与列

当Excel表格行列较多时,要想跟条件找到目标数据可以用查找引用函数自动调取,如果又想让找出来的结果突出显示,有什么好办法呢? 先来看一个做好的案例效果,用户选择查询条件后,结果突出显示。 当查询条件变…

第20篇:逻辑门控D锁存器

Q:基本RS锁存器存在不确定状态,本篇我们设计可以消除不确定状态的锁存器--逻辑门控D锁存器。 A:逻辑门控D锁存器逻辑图: 其工作原理:在CLK1期间,数据输入端D的值被传输到输出端Q,而当CLK由1 跳…

【Redis】redis哨兵模式

概述 Redis Sentinel,即Redis哨兵,在Redis 2.8版本开始引入。它是Redis高可用的实现方案之一。Sentinel是一个管理多个Redis实例的工具,它的核心功能是可以实现对Redis的监控、通知、自动故障转移。 监控(Monitoring&#xff09…

docker部署-RabbitMq

1. 参考 RabbitMq官网 docker官网 2. 拉取镜像 这里改为自己需要的版本即可,下面容器也需要同理修改 docker pull rabbitmq:3.12-management3. 运行容器 docker run \ --namemy-rabbitmq-01 \ -p 5672:5672 \ -p 15672:15672 \ -d \ --restart always \ -…

盏多多生物现已加入2024第七届燕窝天然滋补品展

参展企业介绍 广东省盏多多生物科技有限公司是一家从事食品销售,食品销售,食品进出口等业务的公司,成立于2018年12月07日,公司坐落在广东省,详细地址为:惠州市东江三路45号悦榕湾27层05号(仅限办公);经国家…

用系统观念打造智慧公厕,引领智慧城市的发展

智慧公厕,作为智慧城市建设的一部分,具有重要意义。在高度发达的科技条件下,如何打造高质量的智慧公厕是一个值得思考的问题。本文将以智慧公厕源头实力厂家广州中期科技有限公司,大量精品案例项目现场实景实图实例,探…

UE小:基于UE5的两种Billboard material(始终朝向相机材质)

本文档展示了两种不同的效果,分别是物体完全朝向相机和物体仅Z轴朝向相机。通过下面的演示和相关代码,您可以更加直观地理解这两种效果的差异和应用场景。 1. 完全朝向相机效果 此效果下,物体将完全面向相机,不论相机在哪个角度…

Element

1、Element 基本使用 1.1、Element介绍 Element:网站快速成型工具。是饿了么公司前端开发团队提供的一套基于Vue的网站组件库。 使用Element前提必须要有Vue。 组件:组成网页的部件,例如超链接、按钮、图片、表格等等~ Element官网&#…

【上云API】GB28181流媒体服务器搭建

docker拉取配置好的ZLMediaKIt和wvp-GB28181-pro docker pull 648540858/wvp_pro第一次运行 docker一键运行ZLMediaKIt和wvp-GB28181-pro docker run --env WVP_IP"自己电脑的ip" -it -p 18080:18080 -p 30000-30500:30000-30500/udp -p 30000-30500:30000-3050…

伦敦金实时行情交易需要了解的3个事实

在伦敦金市场中,我们要交易就要面对伦敦金实时行情。然而,在伦敦金实时行情交易中,有几个事实是我们不得不去了解的,下面我们就来讨论一下。 盈利的经历不等于盈利的能力。我们经常看到一些卖课的或者卖指标、卖策略的人会宣传自己…

双通道内存@DDR5多通道内存

文章目录 多通道内存DDR4及以前的内存的双通道DDR5往后的双通道和多通道半位宽4通道组合 其他组合测试 DDR5介绍概览重要Features特点 总结 多通道内存 DDR4及以前的内存的双通道 双通道内存是一种内存架构设计,通过在主板上配置两个或多个独立且同时工作的内存控制…

沃通国密SSL根证书入根赢达信国密浏览器

近日,沃通CA国密SSL根证书正式入根赢达信国密安全浏览器,携手推动国产密码技术应用、完善国密应用生态体系,也标志着沃通国密SSL证书兼容性再次得到提升,进一步夯实国密应用根基。 密码算法的安全性是信息安全保障的核心&#xff…

服务器BMC测试之postman测试redfish

postman初始化设置----Redfish测试 1.下载安装postman 下载链接:https://www.postman.com/downloads/ 2.安装完成后启动postman -----登录账号请自行申请 3.新建测试环境 ----增加测试BMC ip 为环境变量 点击 新建环境 4.修改环境 增加变量名字为BMCIP 这个名字…
最新文章