Socket.io с PubSub: данные в режиме реального времени отображаются меньше или вообще не отображаются после многократного обновления страницы.

Я пытаюсь визуализировать потоковые данные с помощью Google PubSub, Node js и Google App Engine. То, что я делаю, это просто:

  1. Отправляйте поток сообщений в тему PubSub каждую 1 секунду (используя скрипт python)
  2. Создайте подписку на эту тему
  3. Создайте простое веб-приложение, которое прослушивает подписку, анализирует каждое входящее сообщение и отображает их в браузере в виде интерактивной гистограммы (используя node js, socket.io и fusionchart).

Я следовал этому руководству: https://www.fusioncharts.com/blog/visualize-real-time-data-socket-io-charts/, единственная разница в том, что я использовал PubSub вместо PubNub, также следуя этой документации: https://cloud.google.com/pubsub/docs/quickstart-client-libraries#pubsub-client-libraries-nodejs для части «получить сообщение».

На самом деле это работает, когда я публикую 10 сообщений каждую секунду, диаграмма будет отображать ровно 10 сообщений каждую секунду в режиме реального времени. Вот как выглядит диаграмма, когда я запускаю свое приложение:

Пример визуализации

Проблема в том, что это работает только при первом запуске приложения и открытии страницы. Когда я обновляю страницу и отправляю еще 10 новых сообщений, для отображения данных требуется больше времени. Когда он, наконец, появляется, отображаются только 5 из 10 сообщений. Если я попытаюсь обновить ту же страницу, данных будет отображаться все меньше и меньше, или в какой-то момент они вообще не появятся.

Я отслеживаю все входящие и исходящие сообщения через свою консоль, и вроде все в порядке. 10 отправленных сообщений, 10 полученных. Только почему-то все меньше и меньше отображается после каждого обновления.

Вот мой код app.js:

var express = require('express');
var app = require('express')();
var http = require('http').Server(app);
//creates a new socket.io instance attached to the http server.
var io = require('socket.io')(http);

// Imports the Google Cloud client library
const PubSub = require('@google-cloud/pubsub');

// Your Google Cloud Platform project ID
const projectId = 'myprojecthere';




//Provide the absolute path to the dist directory.
app.use(express.static(__dirname + '/dist'));

//On get request send 'index.html' page as a response.
app.get('/', function(req, res) {
   res.sendFile(__dirname +'/index.html');
});

//Whenever someone connects this gets executed
//original : connection
io.on('connection', function (socket) {
  console.log(`Enter io connection`);
  console.log(' %s sockets connected', io.engine.clientsCount)

  // Instantiates a client
  const pubsub = new PubSub({
    projectId: projectId,
    key: """censored"""
  });

  var strData;
    /**
     * TODO(developer): Uncomment the following lines to run the sample.
     * https://cloud.google.com/pubsub/docs/pull#pubsub-pull-messages-async-nodejs
     */
    const subscriptionName = 'testing_subscription';
    const topicName = 'testing';
    const timeout = 50;

    // References an existing subscription
    //var topic = pubsub.topic(topicName)
    const subscription = pubsub.subscription(subscriptionName);
    

    //Function to format time and date
    function formatDatetime (TimeStamp){
    	var formatted =  (TimeStamp.getHours()) + ':' + (TimeStamp.getMinutes()) + ':' + (TimeStamp.getSeconds()) + ':' + (TimeStamp.getMilliseconds());
    	return formatted;
    }

    // Create an event handler to handle messages
    let messageCount = 0;
    const messageHandler = message => {
      console.log(`Received message: ${message.id}`);
      console.log(`\tData: ${message.data}`);
      console.log(`\tAttributes: ${message.attributes}`);
      var obj = JSON.parse(message.data);
	  console.log(`\tTimeStamp: ${obj.messages.timestamp}`);
	  console.log(`\tAmount: ${obj.messages.amount}`);
	  
      
      messageCount += 1;
      console.log(`Message count : ${messageCount}`);
      
      message.ack();
      console.log(`Message Acknowledged`);

      // This doesn't ack the message, but allows more messages to be retrieved
      // if your limit was hit or if you don't want to ack the message.
      // message.nack();
    


      // Get creation timestamp
      var x = new Date(obj.messages.timestamp);    
      // Time formatting for x-axis in chart
      var formatted = formatDatetime(x);
      var Count = obj.messages.amount;

      console.log(`Extracting Timestamp: ${formatted}`);
      console.log(`Counts : ${Count}`);
      strData = {"label": formatted,
                     "value": Count
                  }
      socket.emit('news', strData);
      console.log(``);
      };

    // Listen for new messages until timeout is hit
      subscription.on(`message`, messageHandler);
      
      setTimeout(() => {
      	console.log(`Enter timeout`);
      	//subscription.removeListener('message', messageHandler);
        console.log(`0 message(s) received.`);
        var x = new Date();
        var formatted =  formatDatetime(x);
      	var Count = 0;
      	console.log(`Extracting Timestamp: ${formatted}`)
      	strData = {"label": formatted,
                     "value": Count
                  }
        console.log(`strData : ${strData}`)
        console.log(``);
        socket.emit('news', strData);
        
      }, timeout);

    //other handling
    if ( typeof strData == 'undefined') {
    	console.log(`Something else happened`)
    	var x = new Date();
        var formatted =  formatDatetime(x);
        console.log(`Extracting Timestamp: ${formatted}`)
    	strData = {"label": formatted,
                     "value": 9
                  }
        socket.emit('news', strData);
              }

    console.log(`strData : ${strData}`);
    console.log(``);
    
    
    
});


//server listening on port 8080
http.listen(8080, function() {
   console.log('listening on *:8080');
});

Вот код, который я использовал для отображения диаграммы:

/*globals io */
var FusionCharts = require("fusioncharts");
require("fusioncharts/fusioncharts.charts")(FusionCharts);
require("fusioncharts/fusioncharts.widgets")(FusionCharts);

  var socket = io();
    	var transactionChart = new FusionCharts({
    		id: "mychart",
	        type: 'realtimecolumn',
	        width: '700',
	        height: '350',
	        dataFormat: 'json',
	        dataSource: {
	            "chart": {
    	         "caption": "Streaming Data Visualization",
                    "subCaption": "Testing",
                    "yaxismaxvalue": "10",
                    "numdisplaysets": "10",
                    "yAxisName":"Quantity",
                    "labeldisplay": "rotate",
                    "showLegend":"0",
                    "showValues": "0",
                    "numbersuffix": "Kg",
                    "showlabels": "1",
/*This parameter lets you set whether you want the latest value (received from server) to be displayed on the chart or not*/
                    "showRealTimeValue": "0",
/*For this parameter, you can specify the number of seconds after which the chart will look for new data. This process will happen continuously - i.e., if you specify 5 seconds here, the chart will look for new data every 5 seconds*/
                     "refreshInterval":".1",
/*If you want the chart to keep polling for new data every x seconds and queue it, you can specify that x seconds as updateInterval. This helps you poll at different intervals and then draw at another interval (specified as refreshInterval)*/
                    "updateInterval":".1",
                    "yAxisNamePadding":"10",
                    //Cosmetics
                    "paletteColors" : "#0075c2,#1aaf5d",
                    "baseFontColor" : "#333333",
                    "baseFont" : "Helvetica Neue,Arial",
                    "captionFontSize" : "14",
                    "subcaptionFontSize" : "14",
                    "subcaptionFontBold" : "0",
                    "showBorder" : "0",
                    "bgColor" : "#ffffff",
                    "showShadow" : "0",
                    "canvasBgColor" : "#ffffff",
                    "canvasBorderAlpha" : "0",
                    "divlineAlpha" : "100",
                    "divlineColor" : "#999999",
                    "divlineThickness" : "1",
                    "divLineIsDashed" : "1",
                    "divLineDashLen" : "1",
                    "divLineGapLen" : "1",
                    "usePlotGradientColor" : "0",
                    "showplotborder" : "0",
                    "valueFontColor" : "#ffffff",
                    "placeValuesInside" : "1",
                    "rotateValues" : "1",
                    "showXAxisLine" : "1",
                    "xAxisLineThickness" : "1",
                    "xAxisLineColor" : "#999999",
                    "showAlternateHGridColor" : "0",
                    "legendBgAlpha" : "0",
                    "legendBorderAlpha" : "0",
                    "legendShadow" : "0",
                    "legendItemFontSize" : "10",
                    "legendItemFontColor" : "#666666"
    	            },
	            "categories": [
	                {
	                    "category": [
	                        { "label": "Start" }
	                    ]
	                }
	            ],
	            "dataset": [ 
	                {
	                    "seriesname": "",
	                    "alpha": "100",
	                    "data": [
	                        { "value": "3" }
	                    ]
	                }
	            ]      
	        }
    	}).render("chart-container");
//On connection with socket, will start receiving the data
	  socket.connect('http://localhost:8080/');
	  socket.on('news', function (data) {
	    function updateData() {
                         //Converting the fetched data in FusionCharts format
	    	var strData = "&label=" + data.label + "&value=" + data.value;
                        //feeding the data to the real time chart
	    	FusionCharts.items.mychart.feedData(strData);
	    }
	    //calling the update method
	    updateData();

	 });

А вот мой код index.html:

<!DOCTYPE html>
<html>
   <head>
      <title>Hello world</title>
      <script src="/socket.io/socket.io.js"></script>
   </head>
 
   <body>
   <div id="chart-container">FusionCharts will render here</div>
        <script src="bundle.js"></script>
   </body>
</html>

Я все еще новичок в Javascript и никогда раньше не работал над веб-приложением. Там могут быть важные знания, которые я упустил о том, как все работает. Но у меня есть некоторые подозрения, хотя я и не уверен.

Возможно ли это из-за того, что каждый раз, когда я обновляю страницу, создается новое подключение к сокету, а отсутствующие сообщения фактически получены предыдущим подключением (поэтому оно не отображается)?

Несколько решений, которые я пробовал, и все еще не работали: Node.js Страница Socket.io обновляет несколько подключений

Кто-нибудь может помочь мне с этим?


person rsa0809999    schedule 28.08.2018    source источник


Ответы (1)


Я нахожу работающее решение своей проблемы.

Не совсем уверен, почему, но вот что произошло: всякий раз, когда я обновляю страницу, старое соединение сокета будет отключено, и будет создано новое соединение сокета. Этот новый сокет будет прослушивать ту же подписку.

Хотя статус старого сокета кажется отключенным во время обновления страницы, по какой-то причине он все еще слушает ту же подписку. Это приводит к разделению 10 сообщений между двумя или более соединениями (в зависимости от количества обновлений страницы).

Однако то, что отображается в браузере, — это только самое новое соединение. Может показаться, что сообщения отсутствуют, хотя на самом деле они разбросаны по многим (невидимым) соединениям. Это было очевидно, когда я пытался напечатать «идентификатор сокета», в котором заканчивалось каждое сообщение.

Итак, что я сделал, так это добавил небольшую обработку во время отключения сокета:

//on Disconnect
socket.on('disconnect', function () {
console.log("LOG: just disconnected: " + socket.id);
subscription.removeListener('message', messageHandler);

Таким образом, всякий раз, когда сокет отключается, он также прекращает прослушивание подписки, и новый сокет получает полный набор сообщений.

person rsa0809999    schedule 28.08.2018