Я добавил iot hub и устройства. Все данные из iot-хаба сохраняются в озере данных 2 в формате json. Работает нормально, но если с устройства одновременно поступает несколько сообщений, оно сохраняется в одном json. Неприятности вызывает ... Есть ли способ сохранить каждое сообщение-событие в отдельном json? Я просмотрел настройки iot hub, но ничего не нашел.
Сохранение одного файла json из концентратора azure iot в datalake2
Ответы (1)
В механизме маршрутизации Центра Интернета вещей нет таких настроек, как постоянная пересылка одного сообщения в хранилище. В основном это требование может быть реализовано с помощью функции azure либо в потребителе конвейера потока (IoTHubTrigger), либо в подписчике сетки событий (EventGridTrigger).
Обновление:
Ниже приведен пример функции IoTHubTrigger с привязкой выходного большого двоичного объекта к контейнеру Data Lake Storage 2-го поколения:
run.csx:
#r "Microsoft.Azure.EventHubs"
#r "Newtonsoft.Json"
#r "Microsoft.WindowsAzure.Storage"
using System;
using System.IO;
using System.Text;
using System.Linq;
using Microsoft.Azure.EventHubs;
using Microsoft.WindowsAzure.Storage.Blob;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
public static async Task Run(EventData ed, CloudBlockBlob outputBlob, ILogger log)
{
//log.LogInformation($"DeviceId = {ed.SystemProperties["iothub-connection-device-id"]}\r\n{JObject.Parse(Encoding.ASCII.GetString(ed.Body))}");
var msg = new {
EnqueuedTimeUtc = ed.SystemProperties["iothub-enqueuedtime"],
Properties = ed.Properties,
SystemProperties = new {
connectionDeviceId = ed.SystemProperties["iothub-connection-device-id"],
connectionAuthMethod = ed.SystemProperties["iothub-connection-auth-method"],
connectionDeviceGenerationId = ed.SystemProperties["iothub-connection-auth-generation-id"],
enqueuedTime = ed.SystemProperties["iothub-enqueuedtime"]
},
Body = JObject.Parse(Encoding.ASCII.GetString(ed.Body))
};
byte[] buffer = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(msg));
await outputBlob.UploadFromStreamAsync(new MemoryStream(buffer));
await Task.CompletedTask;
}
function.json:
{
"bindings": [
{
"name": "ed",
"connection": "rk2020iot_IOTHUB",
"eventHubName": "rk2020iot_IOTHUBNAME",
"consumerGroup": "function",
"cardinality": "one",
"direction": "in",
"type": "eventHubTrigger"
},
{
"name": "outputBlob",
"path": "iot/rk2020iot/{DateTime}.json",
"connection": "rk2020datalake2_STORAGE",
"direction": "out",
"type": "blob"
}
]
}
person
Roman Kiss
schedule
05.10.2020
Не могли бы вы предоставить примеры или ссылки?
- person user3287801; 05.10.2020
Я добавил пример функции IoTHubTrigger для хранения всех данных телеметрии в файле большого двоичного объекта контейнера хранилища озера данных gen2.
- person Roman Kiss; 06.10.2020