Я пытаюсь визуализировать потоковые данные с помощью Google PubSub, Node js и Google App Engine. То, что я делаю, это просто:
- Отправляйте поток сообщений в тему PubSub каждую 1 секунду (используя скрипт python)
- Создайте подписку на эту тему
- Создайте простое веб-приложение, которое прослушивает подписку, анализирует каждое входящее сообщение и отображает их в браузере в виде интерактивной гистограммы (используя node js, socket.io и fusionchart).
Я следовал этому руководству: https://www.fusioncharts.com/blog/visualize-real-time-data-socket-io-charts/, единственная разница в том, что я использовал PubSub вместо PubNub, также следуя этой документации: https://cloud.google.com/pubsub/docs/quickstart-client-libraries#pubsub-client-libraries-nodejs для части «получить сообщение».
На самом деле это работает, когда я публикую 10 сообщений каждую секунду, диаграмма будет отображать ровно 10 сообщений каждую секунду в режиме реального времени. Вот как выглядит диаграмма, когда я запускаю свое приложение:
Проблема в том, что это работает только при первом запуске приложения и открытии страницы. Когда я обновляю страницу и отправляю еще 10 новых сообщений, для отображения данных требуется больше времени. Когда он, наконец, появляется, отображаются только 5 из 10 сообщений. Если я попытаюсь обновить ту же страницу, данных будет отображаться все меньше и меньше, или в какой-то момент они вообще не появятся.
Я отслеживаю все входящие и исходящие сообщения через свою консоль, и вроде все в порядке. 10 отправленных сообщений, 10 полученных. Только почему-то все меньше и меньше отображается после каждого обновления.
Вот мой код app.js:
var express = require('express');
var app = require('express')();
var http = require('http').Server(app);
//creates a new socket.io instance attached to the http server.
var io = require('socket.io')(http);
// Imports the Google Cloud client library
const PubSub = require('@google-cloud/pubsub');
// Your Google Cloud Platform project ID
const projectId = 'myprojecthere';
//Provide the absolute path to the dist directory.
app.use(express.static(__dirname + '/dist'));
//On get request send 'index.html' page as a response.
app.get('/', function(req, res) {
res.sendFile(__dirname +'/index.html');
});
//Whenever someone connects this gets executed
//original : connection
io.on('connection', function (socket) {
console.log(`Enter io connection`);
console.log(' %s sockets connected', io.engine.clientsCount)
// Instantiates a client
const pubsub = new PubSub({
projectId: projectId,
key: """censored"""
});
var strData;
/**
* TODO(developer): Uncomment the following lines to run the sample.
* https://cloud.google.com/pubsub/docs/pull#pubsub-pull-messages-async-nodejs
*/
const subscriptionName = 'testing_subscription';
const topicName = 'testing';
const timeout = 50;
// References an existing subscription
//var topic = pubsub.topic(topicName)
const subscription = pubsub.subscription(subscriptionName);
//Function to format time and date
function formatDatetime (TimeStamp){
var formatted = (TimeStamp.getHours()) + ':' + (TimeStamp.getMinutes()) + ':' + (TimeStamp.getSeconds()) + ':' + (TimeStamp.getMilliseconds());
return formatted;
}
// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = message => {
console.log(`Received message: ${message.id}`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
var obj = JSON.parse(message.data);
console.log(`\tTimeStamp: ${obj.messages.timestamp}`);
console.log(`\tAmount: ${obj.messages.amount}`);
messageCount += 1;
console.log(`Message count : ${messageCount}`);
message.ack();
console.log(`Message Acknowledged`);
// This doesn't ack the message, but allows more messages to be retrieved
// if your limit was hit or if you don't want to ack the message.
// message.nack();
// Get creation timestamp
var x = new Date(obj.messages.timestamp);
// Time formatting for x-axis in chart
var formatted = formatDatetime(x);
var Count = obj.messages.amount;
console.log(`Extracting Timestamp: ${formatted}`);
console.log(`Counts : ${Count}`);
strData = {"label": formatted,
"value": Count
}
socket.emit('news', strData);
console.log(``);
};
// Listen for new messages until timeout is hit
subscription.on(`message`, messageHandler);
setTimeout(() => {
console.log(`Enter timeout`);
//subscription.removeListener('message', messageHandler);
console.log(`0 message(s) received.`);
var x = new Date();
var formatted = formatDatetime(x);
var Count = 0;
console.log(`Extracting Timestamp: ${formatted}`)
strData = {"label": formatted,
"value": Count
}
console.log(`strData : ${strData}`)
console.log(``);
socket.emit('news', strData);
}, timeout);
//other handling
if ( typeof strData == 'undefined') {
console.log(`Something else happened`)
var x = new Date();
var formatted = formatDatetime(x);
console.log(`Extracting Timestamp: ${formatted}`)
strData = {"label": formatted,
"value": 9
}
socket.emit('news', strData);
}
console.log(`strData : ${strData}`);
console.log(``);
});
//server listening on port 8080
http.listen(8080, function() {
console.log('listening on *:8080');
});
Вот код, который я использовал для отображения диаграммы:
/*globals io */
var FusionCharts = require("fusioncharts");
require("fusioncharts/fusioncharts.charts")(FusionCharts);
require("fusioncharts/fusioncharts.widgets")(FusionCharts);
var socket = io();
var transactionChart = new FusionCharts({
id: "mychart",
type: 'realtimecolumn',
width: '700',
height: '350',
dataFormat: 'json',
dataSource: {
"chart": {
"caption": "Streaming Data Visualization",
"subCaption": "Testing",
"yaxismaxvalue": "10",
"numdisplaysets": "10",
"yAxisName":"Quantity",
"labeldisplay": "rotate",
"showLegend":"0",
"showValues": "0",
"numbersuffix": "Kg",
"showlabels": "1",
/*This parameter lets you set whether you want the latest value (received from server) to be displayed on the chart or not*/
"showRealTimeValue": "0",
/*For this parameter, you can specify the number of seconds after which the chart will look for new data. This process will happen continuously - i.e., if you specify 5 seconds here, the chart will look for new data every 5 seconds*/
"refreshInterval":".1",
/*If you want the chart to keep polling for new data every x seconds and queue it, you can specify that x seconds as updateInterval. This helps you poll at different intervals and then draw at another interval (specified as refreshInterval)*/
"updateInterval":".1",
"yAxisNamePadding":"10",
//Cosmetics
"paletteColors" : "#0075c2,#1aaf5d",
"baseFontColor" : "#333333",
"baseFont" : "Helvetica Neue,Arial",
"captionFontSize" : "14",
"subcaptionFontSize" : "14",
"subcaptionFontBold" : "0",
"showBorder" : "0",
"bgColor" : "#ffffff",
"showShadow" : "0",
"canvasBgColor" : "#ffffff",
"canvasBorderAlpha" : "0",
"divlineAlpha" : "100",
"divlineColor" : "#999999",
"divlineThickness" : "1",
"divLineIsDashed" : "1",
"divLineDashLen" : "1",
"divLineGapLen" : "1",
"usePlotGradientColor" : "0",
"showplotborder" : "0",
"valueFontColor" : "#ffffff",
"placeValuesInside" : "1",
"rotateValues" : "1",
"showXAxisLine" : "1",
"xAxisLineThickness" : "1",
"xAxisLineColor" : "#999999",
"showAlternateHGridColor" : "0",
"legendBgAlpha" : "0",
"legendBorderAlpha" : "0",
"legendShadow" : "0",
"legendItemFontSize" : "10",
"legendItemFontColor" : "#666666"
},
"categories": [
{
"category": [
{ "label": "Start" }
]
}
],
"dataset": [
{
"seriesname": "",
"alpha": "100",
"data": [
{ "value": "3" }
]
}
]
}
}).render("chart-container");
//On connection with socket, will start receiving the data
socket.connect('http://localhost:8080/');
socket.on('news', function (data) {
function updateData() {
//Converting the fetched data in FusionCharts format
var strData = "&label=" + data.label + "&value=" + data.value;
//feeding the data to the real time chart
FusionCharts.items.mychart.feedData(strData);
}
//calling the update method
updateData();
});
А вот мой код index.html:
<!DOCTYPE html>
<html>
<head>
<title>Hello world</title>
<script src="/socket.io/socket.io.js"></script>
</head>
<body>
<div id="chart-container">FusionCharts will render here</div>
<script src="bundle.js"></script>
</body>
</html>
Я все еще новичок в Javascript и никогда раньше не работал над веб-приложением. Там могут быть важные знания, которые я упустил о том, как все работает. Но у меня есть некоторые подозрения, хотя я и не уверен.
Возможно ли это из-за того, что каждый раз, когда я обновляю страницу, создается новое подключение к сокету, а отсутствующие сообщения фактически получены предыдущим подключением (поэтому оно не отображается)?
Несколько решений, которые я пробовал, и все еще не работали: Node.js Страница Socket.io обновляет несколько подключений
Кто-нибудь может помочь мне с этим?