Как отфильтровать граф со смешанными узлами по типам соседних вершин

Этот вопрос касается Spark GraphX. Я хочу вычислить подграф, удалив узлы, которые являются соседями некоторых других узлов.

Пример

[Задача] Сохранить узлы A и узлы B, которые не являются соседями узлов C2.

Входной график:

                    ┌────┐
              ┌─────│ A  │──────┐
              │     └────┘      │
              v                 v
┌────┐     ┌────┐            ┌────┐     ┌────┐
│ C1 │────>│ B  │            │ B  │<────│ C2 │
└────┘     └────┘            └────┘     └────┘
              ^                 ^
              │     ┌────┐      │
              └─────│ A  │──────┘
                    └────┘

Выходной график:

         ┌────┐
   ┌─────│ A  │
   │     └────┘
   v           
┌────┐         
│ B  │         
└────┘         
   ^           
   │     ┌────┐
   └─────│ A  │
         └────┘

Как элегантно написать запрос GraphX, который возвращает выходной график?


person Pimin Konstantin Kefaloukos    schedule 19.05.2015    source источник
comment
Есть ли у Edge.attr что-нибудь полезное?   -  person David Griffin    schedule 22.05.2015


Ответы (3)


Другой способ найти val nodesAB с помощью GraphOps.collectNeighbors

val nodesAB = graph.collectNeighbors(EdgeDirection.Either)
  .filter{case (vid,ns) => ! ns.map(_._2).contains("C2")}.map(_._1)
  .intersection(
    graph.vertices
      .filter{case (vid,attr) => ! attr.toString.startsWith("C") }.map(_._1)
  )

Остальное работает так же, как у вас:

val solution1 = Graph(nodesAB, graph.edges) .
subgraph(vpred = {case(id, label) => label != null})

Если вы хотите использовать DataFrames, которые могут быть (?) более масштабируемыми, то сначала нам нужно превратить nodesAB в DataFrame:

val newNodes = sqlContext.createDataFrame(
  nodesAB,
  StructType(Array(StructField("newNode", LongType, false)))
)

И вы создали и обрамили DataFrame следующим образом:

val edgeDf = sqlContext.createDataFrame(
  graph.edges.map{edge => Row(edge.srcId, edge.dstId, edge.attr)}, 
  StructType(Array(
    StructField("srcId", LongType, false),
    StructField("dstId", LongType, false),
    StructField("attr", LongType, false)
  ))
)

Затем вы можете сделать это, чтобы создать свой график без подграфа:

val solution1 = Graph(
  nodesAB, 
  edgeDf
  .join(newNodes, $"srcId" === $"newNode").select($"srcId", $"dstId", $"attr")
  .join(newNodes, $"dstId" === $"newNode")
  .rdd.map(row => Edge(row.getLong(0), row.getLong(1), row.getLong(2)))
)
person David Griffin    schedule 22.05.2015
comment
Мне нравится ваше решение, потому что оно использует collectNeighbors. - person Pimin Konstantin Kefaloukos; 26.05.2015
comment
Спасибо. Это было на удивление трудно сделать. Одна вещь, которую я мог бы сделать по-другому, недавно узнала о RDD.cogroup. collectNeighbors возвращает только VertexId основного узла, а не attr. Я, вероятно, смог бы избежать intersection в своем коде, если бы использовал cogroup для добавления атрибутов вершины. Тогда я мог бы отфильтровать startsWith("C") в первом фильтре. - person David Griffin; 26.05.2015
comment
Я остановился на третьем способе. Используя агрегатные сообщения, я отправляю сообщение «удалить меня» всем вершинам dst, которые должны быть удалены. Затем я отфильтровываю эти вершины из графа с помощью 1) externalJoinVertices и 2) шагов подграфа. - person Pimin Konstantin Kefaloukos; 27.05.2015

Вот еще одно решение. В этом решении агрегатные сообщения используются для отправки целого числа (1) тем B, которые должны быть удалены из графа. Результирующий набор вершин соединяется с графом, и последующий вызов подграфа удаляет нежелательные B из выходного графа.

// Step 1: send the message (1) to vertices that should be removed   
val deleteMe = graph.aggregateMessages[Int](
    ctx => {
      if (ctx.dstAttr.equals("B") && ctx.srcAttr.equals("C")) {
        ctx.sendToDst(1) // 1 means delete, but number is not actually used
      }
    },
    (a,b) => a  // choose either message, they are all (1)
  )

  // Step 2: join vertex sets, original and deleteMe
  val joined = graph.outerJoinVertices(deleteMe) {
    (id, origValue, msgValue ) => msgValue match {
      case Some(number) => "deleteme"  // vertex received msg
      case None => origValue
    }
  }

  // Step 3: Remove nodes with domain = deleteme
  joined.subgraph(vpred = (id, data) => data.equals("deleteme"))

Я думаю о способе использования только одного промежуточного флага удаления, например. «deleteme» вместо «1» и «deleteme». Но это хорошо, как я мог сделать это до сих пор.

person Pimin Konstantin Kefaloukos    schedule 29.05.2015

Одним из решений является использование триплетного представления для определения подмножества узлов B, которые являются соседями узлов C1. Затем объедините их с узлами A. Затем создайте новый график:

// Step 1
// Compute the subset of B's that are neighbors with C1
val nodesBC1 = graph.triplets .
    filter {trip => trip.srcAttr == "C1"} .
    map {trip => (trip.dstId, trip.dstAttr)}

// Step 2    
// Union the subset B's with all the A's
val nodesAB = nodesBC1 .
    union(graph.vertices filter {case (id, label) => label == "A"})

// Step 3
// Create a graph using the subset nodes and all the original edges
// Remove nodes that have null values
val solution1 = Graph(nodesAB, graph.edges) .
    subgraph(vpred = {case(id, label) => label != null})

На шаге 1 я воссоздаю RDD узла (содержащий B-узлы), сопоставляя вместе dstID и dstAttr триплетного представления. Не уверены, насколько эффективно это будет для больших графов?

person Pimin Konstantin Kefaloukos    schedule 19.05.2015
comment
Поиграв с этим пару часов, я не уверен, что есть лучший способ идентифицировать ребра, которые нужно удалить, чем то, как вы это делаете, позволяя ребрам создавать вершины с attr значениями null, а затем используя subgraph для сделайте окончательную обрезку, по крайней мере, не используя что-то вроде использования DataFrames или большого RDD.cartesian. - person David Griffin; 22.05.2015
comment
Прохладно. Спасибо за попытку @DavidGriffin :-) - person Pimin Konstantin Kefaloukos; 24.05.2015