Я пытаюсь создать службу Kotlin, которая может передавать данные из Kafka на (определенные) соединения WebSocket. Например, если данные от пользователя проходят через Kafka, программа получает их, и я хочу передать их правильному соединению WebSocket, если этот же пользователь в настоящее время подключен.
Основы, которые у меня есть:
@KafkaListener()
@Controller
@Secured(SecurityRule.IS_ANONYMOUS)
@ServerWebSocket("/ws/{id}")
class WebSocket() {
@OnOpen
fun onOpen(session: WebSocketSession, id: String): Publisher<String> {
return session.send("connection opened")
}
}
Однако, если в том же классе я получаю сообщение Kafka, содержащее идентификатор пользователя, который я ищу, как я могу передать это правильному соединению WebSocket?
Я подумал, может быть, если я использую "/ws/{id}"
из URL-адреса, я смогу легко отправить его по правильному идентификатору, но я не могу понять, как это сделать.
Единственный известный мне способ отправить данные в соединение - это использовать аннотированные функции веб-сокета, такие как OnOpen и OnMessage и т. Д. Кроме того, я обнаружил, что Micronaut WebSocketBroadcaster полезен, но, конечно, только для широковещательной передачи.
И еще кое-что, что я пытаюсь выяснить, - это где убедиться, что идентификатор, к которому подключается, также является фактическим идентификатором пользователя, а не чьим-то еще, должен ли я реализовать это в аннотации @Secured?