Spark学习笔记--Graphx
淺談Graphx: http://blog.csdn.net/shangwen_/article/details/38645601
Pregel: http://blog.csdn.net/shangwen_/article/details/38479835
Bagel:http://ju.outofmemory.cn/entry/712
Graphx的主要接口:
基本信息接口(numEdges , num Vertices , degrees(in/out) )
聚合操作 (mapVertices , mapEdges , mapTriplets)
轉換接口 (mapReduceTriplets , collectNeighbors)
結構操作 (reverse , subgraph , mask , groupEdges)
緩存操作 (cache , unpersistVertices)
要點:
每個圖由3個RDD組成
| 名稱 | 對應RDD | 包含的屬性 |
| Vertices | VertexRDD | ID、點屬性 |
| Edges | EdgeRDD | 源頂點的ID,目標頂點的ID,邊屬性 |
| Triplets | 源頂點ID,源頂點屬性,邊屬性,目標頂點ID,目標頂點屬性 |
Triplets其實是對Vertices和Edges做了join操作
點分割、邊分割
應用:
基于最大連通圖的社區發現
基于三角形計數的關系衡量
基于隨機游走的用戶屬性傳播
注意:
GraphX通過引入*Resilient Distributed Property Graph*(一種點和邊都帶屬性的有向多圖)擴展了Spark RDD這種抽象數據結構,這種Property Graph擁有兩種Table和Graph兩種視圖(及視圖對應的一套API),而只有一份物理存儲。
Table視圖將視圖看成Vertex Property Table和Edge Property Table等的組合,這些組合繼承了Spark RDD的API(filter,map等)。
Graph視圖上包括reverse/subgraph/mapV(E)/joinV(E)/mrTriplets等操作。
Graph上的函數:(官網)
/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
// Information about the Graph ===================================================================
val numEdges: Long
val numVertices: Long
val inDegrees: VertexRDD[Int]
val outDegrees: VertexRDD[Int]
val degrees: VertexRDD[Int]
// Views of the graph as collections =============================================================
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val triplets: RDD[EdgeTriplet[VD, ED]]
// Functions for caching graphs ==================================================================
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
def cache(): Graph[VD, ED]
def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
// Change the partitioning heuristic ============================================================
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
// Transform vertex and edge attributes ==========================================================
def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
: Graph[VD, ED2]
// Modify the graph structure ====================================================================
def reverse: Graph[VD, ED]
def subgraph(
epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
vpred: (VertexID, VD) => Boolean = ((v, d) => true))
: Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
// Join RDDs with the graph ======================================================================
def joinVertices[U](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD): Graph[VD, ED]
def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)])
(mapFunc: (VertexID, VD, Option[U]) => VD2)
: Graph[VD2, ED]
// Aggregate information about adjacent triplets =================================================
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]]
def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[A]
// Iterative graph-parallel computation ==========================================================
def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
vprog: (VertexID, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]
// Basic graph algorithms ========================================================================
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
def connectedComponents(): Graph[VertexID, ED]
def triangleCount(): Graph[Int, ED]
def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED]
}
pregel函數參數解釋:
VD:頂點的數據類型。
ED:邊的數據類型
A:Pregel message的類型。
graph:輸入的圖
initialMsg:在第一次迭代的時候頂點收到的消息。
maxIterations:迭代的次數
vprog:用戶定義的頂點程序運行在每一個頂點中,負責接收進來的信息,和計算新的頂點值。在第一次迭代的時候,所有的頂點程序將會被默認的defaultMessage調用,在次輪迭代中,頂點程序只有接收到message才會被調用。
sendMsg:用戶提供的函數,應用于邊緣頂點在當前迭代中接收message
mergeMsg:用戶提供定義的函數,將兩個類型為A的message合并為一個類型為A的message。(thisfunction must be commutative and associative and ideally the size of A shouldnot increase)
示例:
import org.apache.spark.graphx._
// Import random graph generation library
import org.apache.spark.graphx.util.GraphGenerators
// A graph with edge attributes containing distances
val graph: Graph[Long, Double] = GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a,b) => math.min(a,b) // Merge Message
)
println(sssp.vertices.collect.mkString("
"))
總結
以上是生活随笔為你收集整理的Spark学习笔记--Graphx的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【数据结构】排序相关题目及各种排序方法的
- 下一篇: C语言练习题