Как вычислить ребра между узлами v, w, на которые указывает один и тот же узел x

Этот вопрос касается Spark GraphX. Имея произвольный граф, я хочу вычислить новый граф, который добавляет ребра между любыми двумя узлами v, w, на которые указывает некоторый узел x. Новые ребра должны содержать указывающий узел в качестве атрибута.

То есть по данным ребрам (x, v, nil) и (x, w, nil) вычисляются ребра (v, w, x) и (w, v, x).

Это должно работать для любого графа и не требует, чтобы я знал что-либо о графе заранее, например, идентификаторы вершин.

Пример

[Задача] Добавить два направленных ребра между узлами (например, A, C), если на них указывает один и тот же узел (например, B).

Входной график:

          ┌────┐
    ┌─────│ B  │──────┐
    │     └────┘      │
    v                 v
 ┌────┐            ┌────┐
 │ A  │            │ C  │
 └────┘            └────┘
    ^                 ^
    │     ┌────┐      │
    └─────│ D  │──────┘
          └────┘

Выходной граф (двунаправленные ребра = два направленных ребра):

          ┌────┐
    ┌─────│ B  │──────┐
    │     └────┘      │
    v                 v
 ┌────┐<───by B───>┌────┐
 │ A  │            │ C  │
 └────┘<───by D───>└────┘
    ^                 ^
    │     ┌────┐      │
    └─────│ D  │──────┘
          └────┘

Как элегантно написать запрос GraphX, который возвращает выходной график?


person Pimin Konstantin Kefaloukos    schedule 19.05.2015    source источник
comment
Ваше ребро с двойной стрелкой на выходном графике не имеет смысла. У ребер есть src и dest — что на выходном графике?   -  person David Griffin    schedule 19.05.2015
comment
@DavidGriffin: вы должны читать это как два направленных края. Сейчас я немного обновлю вопрос.   -  person Pimin Konstantin Kefaloukos    schedule 20.05.2015
comment
Кстати, в настоящее время я работаю над версией решения Pregel. Было бы неплохо получить ваши отзывы о нем.   -  person Pimin Konstantin Kefaloukos    schedule 20.05.2015
comment
Без обид, но это не очень элегантно! Насколько обобщенным он должен быть? Потому что вы можете просто запустить graph.edges.flatMap и для каждого ребра просто создать новое на его основе. Это в основном все, что вы делаете.   -  person David Griffin    schedule 20.05.2015
comment
@DavidGriffin: Возможно, я недостаточно четко задал вопрос. Я (очевидно?) ищу общее решение, а не решение для этого конкретного примера графа... Другими словами, код не должен требовать от меня знания идентификаторов вершин. Плохо, что не написал это в вопросе.   -  person Pimin Konstantin Kefaloukos    schedule 20.05.2015
comment
Что еще в вопросе нужно обобщить? Всегда ли 4 вершины? Если вершин может быть больше 4, куда идут новые вершины и куда идут новые ребра? Как я уже сказал в другом комментарии, возможно, опишите, что вы пытаетесь решить.   -  person David Griffin    schedule 21.05.2015
comment
Запрос должен работать для любого графа, например. граф из 1 миллиона узлов. Он должен вычислить все ребра, соответствующие описанию в моем вопросе.   -  person Pimin Konstantin Kefaloukos    schedule 21.05.2015


Ответы (1)


Вот решение, которое использует pregel и совокупные сообщения

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

// Step 0: Create an input graph.
val nodes =
  sc.parallelize(Array(
    (101L, "A"), (102L, "A"), (201L, "B"), (202L, "B")
  ))
val edges = 
  sc.parallelize(Array(
    Edge(201L, 101L, ("B-to-A", 0L)), Edge(201L, 102L, ("B-to-A", 0L)),
    Edge(202L, 101L, ("B-to-A", 0L)), Edge(202L, 102L, ("B-to-A", 0L))
  ))    
val graph = Graph(nodes, edges, "default")

// Step 1: Transform input graph before running pregel.
val initialGraph = graph.mapVertices((id, _) => Set[(VertexId,VertexId)]())

// Step 2: Send downstream vertex IDs (A's) to upstream vertices (B's)
val round1 = initialGraph.pregel(
  initialMsg=Set[(VertexId,VertexId)](), 
  maxIterations=1, 
  activeDirection=EdgeDirection.In) 
(
  (id, old, msg) => old.union(msg),
  triplet => Iterator((triplet.srcId, Set((triplet.dstId, triplet.srcId)))),
  (a,b) => a.union(b)
)

// Step 3: Send (gathered) IDs back to downstream vertices
val round2 = round1.aggregateMessages[Set[(VertexId,VertexId)]](
  triplet => {
    triplet.sendToDst(triplet.srcAttr)
  },
  (a, b) => a.union(b)
)

// Step 4: Transform vertices to edges
val newEdges = round2.flatMap {v => v._2.filter(w => w._1 != v._1).map(w => Edge(v._1, w._1, ("shares-with", w._2)))}

// Step 5: Create a new graph that contains new edges
val newGraph = Graph(graph.vertices, graph.edges.union(newEdges))

// Step 6: print graph to verify result
newGraph.triplets foreach println

Это решение использует три основных шага для вычисления графа с новыми ребрами: 1) раунд прегеля. 2) раунд агрегатных сообщений. 3) раунд сопоставления узлов с ребрами.

person Pimin Konstantin Kefaloukos    schedule 20.05.2015