Таргетинг на определенные соединения Micronaut WebSocket

Я пытаюсь создать службу Kotlin, которая может передавать данные из Kafka на (определенные) соединения WebSocket. Например, если данные от пользователя проходят через Kafka, программа получает их, и я хочу передать их правильному соединению WebSocket, если этот же пользователь в настоящее время подключен.

Основы, которые у меня есть:

@KafkaListener()
@Controller
@Secured(SecurityRule.IS_ANONYMOUS)
@ServerWebSocket("/ws/{id}")
class WebSocket() {

    @OnOpen
    fun onOpen(session: WebSocketSession, id: String): Publisher<String> {
        return session.send("connection opened")
    }
}

Однако, если в том же классе я получаю сообщение Kafka, содержащее идентификатор пользователя, который я ищу, как я могу передать это правильному соединению WebSocket?

Я подумал, может быть, если я использую "/ws/{id}" из URL-адреса, я смогу легко отправить его по правильному идентификатору, но я не могу понять, как это сделать.

Единственный известный мне способ отправить данные в соединение - это использовать аннотированные функции веб-сокета, такие как OnOpen и OnMessage и т. Д. Кроме того, я обнаружил, что Micronaut WebSocketBroadcaster полезен, но, конечно, только для широковещательной передачи.

И еще кое-что, что я пытаюсь выяснить, - это где убедиться, что идентификатор, к которому подключается, также является фактическим идентификатором пользователя, а не чьим-то еще, должен ли я реализовать это в аннотации @Secured?


person Noahpls    schedule 02.04.2020    source источник


Ответы (1)


Для всех, кого это интересует, я нашел решение этой проблемы. Используя WebSocketBroadcaster, Micronaut, вы можете транслировать сообщения, и с помощью второго параметра, который принимает Predicate<WebSocketSession>, который определяет, каким WebSocketSessions он отправляет сообщение.

Если вам известен конкретный идентификатор WebSocketSession, вы можете просто использовать { it.id == "id" } для предиката, как это сделано в примере 1. Если вам действительно нужно отправить его определенному пользователю, на основе какого-либо другого атрибута, вы можете назначить атрибуты для WebSocketSession. сам, используя session.attributes.put("id", id), например, в функции onOpen. Предикат, который вы можете использовать для этого, находится в примере 2, единственное, что в нем заключается в том, что я не мог найти способ получить атрибуты без преобразования их в карту, что не очень красиво.

class ExampleClass(val broadcaster: WebSocketBroadcaster) {
    val userId = "user_001"
    val webSocketId = "asdasdasd"

    // [1] Send message to a specific socket ID
    broadcaster.broadcast("message", { it.id == webSocketId)

    // [2] Send message to a user with specific ID or any other identifying attribute of choice
    broadcaster.broadcast("message", { it.attributes.asMap()["id"]?.equals(id) ?: false})    
}
person Noahpls    schedule 28.05.2020