日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 综合教程 >内容正文

综合教程

Spark学习笔记--Graphx

發(fā)布時(shí)間:2024/8/5 综合教程 40 生活家
生活随笔 收集整理的這篇文章主要介紹了 Spark学习笔记--Graphx 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

淺談Graphx: http://blog.csdn.net/shangwen_/article/details/38645601

Pregel: http://blog.csdn.net/shangwen_/article/details/38479835

Bagel:http://ju.outofmemory.cn/entry/712

Graphx的主要接口:

基本信息接口(numEdges , num Vertices , degrees(in/out) )
聚合操作 (mapVertices , mapEdges , mapTriplets)
轉(zhuǎn)換接口 (mapReduceTriplets , collectNeighbors)
結(jié)構(gòu)操作 (reverse , subgraph , mask , groupEdges)
緩存操作 (cache , unpersistVertices)

要點(diǎn):

每個(gè)圖由3個(gè)RDD組成

名稱 對(duì)應(yīng)RDD 包含的屬性
Vertices VertexRDD ID、點(diǎn)屬性
Edges EdgeRDD 源頂點(diǎn)的ID,目標(biāo)頂點(diǎn)的ID,邊屬性
Triplets 源頂點(diǎn)ID,源頂點(diǎn)屬性,邊屬性,目標(biāo)頂點(diǎn)ID,目標(biāo)頂點(diǎn)屬性

Triplets其實(shí)是對(duì)Vertices和Edges做了join操作
點(diǎn)分割、邊分割

應(yīng)用:

基于最大連通圖的社區(qū)發(fā)現(xiàn)
基于三角形計(jì)數(shù)的關(guān)系衡量
基于隨機(jī)游走的用戶屬性傳播

注意:

GraphX通過(guò)引入*Resilient Distributed Property Graph*(一種點(diǎn)和邊都帶屬性的有向多圖)擴(kuò)展了Spark RDD這種抽象數(shù)據(jù)結(jié)構(gòu),這種Property Graph擁有兩種Table和Graph兩種視圖(及視圖對(duì)應(yīng)的一套API),而只有一份物理存儲(chǔ)。
Table視圖將視圖看成Vertex Property Table和Edge Property Table等的組合,這些組合繼承了Spark RDD的API(filter,map等)。
Graph視圖上包括reverse/subgraph/mapV(E)/joinV(E)/mrTriplets等操作。

Graph上的函數(shù):(官網(wǎng))

/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
  // Information about the Graph ===================================================================
  val numEdges: Long
  val numVertices: Long
  val inDegrees: VertexRDD[Int]
  val outDegrees: VertexRDD[Int]
  val degrees: VertexRDD[Int]
  // Views of the graph as collections =============================================================
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
  val triplets: RDD[EdgeTriplet[VD, ED]]
  // Functions for caching graphs ==================================================================
  def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  def cache(): Graph[VD, ED]
  def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
  // Change the partitioning heuristic  ============================================================
  def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
  // Transform vertex and edge attributes ==========================================================
  def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
    : Graph[VD, ED2]
  // Modify the graph structure ====================================================================
  def reverse: Graph[VD, ED]
  def subgraph(
      epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
      vpred: (VertexID, VD) => Boolean = ((v, d) => true))
    : Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
  // Join RDDs with the graph ======================================================================
  def joinVertices[U](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)])
      (mapFunc: (VertexID, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
  // Aggregate information about adjacent triplets =================================================
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]]
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A]
  // Iterative graph-parallel computation ==========================================================
  def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
      vprog: (VertexID, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]
  // Basic graph algorithms ========================================================================
  def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  def connectedComponents(): Graph[VertexID, ED]
  def triangleCount(): Graph[Int, ED]
  def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED]
}

pregel函數(shù)參數(shù)解釋:

VD:頂點(diǎn)的數(shù)據(jù)類型。
ED:邊的數(shù)據(jù)類型
A:Pregel message的類型。

graph:輸入的圖
initialMsg:在第一次迭代的時(shí)候頂點(diǎn)收到的消息。
maxIterations:迭代的次數(shù)
vprog:用戶定義的頂點(diǎn)程序運(yùn)行在每一個(gè)頂點(diǎn)中,負(fù)責(zé)接收進(jìn)來(lái)的信息,和計(jì)算新的頂點(diǎn)值。在第一次迭代的時(shí)候,所有的頂點(diǎn)程序?qū)?huì)被默認(rèn)的defaultMessage調(diào)用,在次輪迭代中,頂點(diǎn)程序只有接收到message才會(huì)被調(diào)用。
sendMsg:用戶提供的函數(shù),應(yīng)用于邊緣頂點(diǎn)在當(dāng)前迭代中接收message
mergeMsg:用戶提供定義的函數(shù),將兩個(gè)類型為A的message合并為一個(gè)類型為A的message。(thisfunction must be commutative and associative and ideally the size of A shouldnot increase)

示例:

import org.apache.spark.graphx._
// Import random graph generation library
import org.apache.spark.graphx.util.GraphGenerators
// A graph with edge attributes containing distances
val graph: Graph[Long, Double] = GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
  (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
  triplet => {  // Send Message
    if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
      Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
    } else {
      Iterator.empty
    }
  },
  (a,b) => math.min(a,b) // Merge Message
  )
println(sssp.vertices.collect.mkString("
"))

總結(jié)

以上是生活随笔為你收集整理的Spark学习笔记--Graphx的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。