Сохранение одного файла json из концентратора azure iot в datalake2

Я добавил iot hub и устройства. Все данные из iot-хаба сохраняются в озере данных 2 в формате json. Работает нормально, но если с устройства одновременно поступает несколько сообщений, оно сохраняется в одном json. Неприятности вызывает ... Есть ли способ сохранить каждое сообщение-событие в отдельном json? Я просмотрел настройки iot hub, но ничего не нашел.


person user3287801    schedule 05.10.2020    source источник


Ответы (1)


В механизме маршрутизации Центра Интернета вещей нет таких настроек, как постоянная пересылка одного сообщения в хранилище. В основном это требование может быть реализовано с помощью функции azure либо в потребителе конвейера потока (IoTHubTrigger), либо в подписчике сетки событий (EventGridTrigger).

Обновление:

Ниже приведен пример функции IoTHubTrigger с привязкой выходного большого двоичного объекта к контейнеру Data Lake Storage 2-го поколения:

run.csx:

#r "Microsoft.Azure.EventHubs"
#r "Newtonsoft.Json"
#r "Microsoft.WindowsAzure.Storage"

using System;
using System.IO;
using System.Text;
using System.Linq;
using Microsoft.Azure.EventHubs;
using Microsoft.WindowsAzure.Storage.Blob;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

public static async Task Run(EventData ed, CloudBlockBlob outputBlob, ILogger log)
{   
    //log.LogInformation($"DeviceId = {ed.SystemProperties["iothub-connection-device-id"]}\r\n{JObject.Parse(Encoding.ASCII.GetString(ed.Body))}");  

    var msg = new { 
        EnqueuedTimeUtc = ed.SystemProperties["iothub-enqueuedtime"],
        Properties = ed.Properties,
        SystemProperties = new {
          connectionDeviceId = ed.SystemProperties["iothub-connection-device-id"], 
          connectionAuthMethod = ed.SystemProperties["iothub-connection-auth-method"],
          connectionDeviceGenerationId = ed.SystemProperties["iothub-connection-auth-generation-id"],
          enqueuedTime = ed.SystemProperties["iothub-enqueuedtime"]   
        },
        Body = JObject.Parse(Encoding.ASCII.GetString(ed.Body))
    };

    byte[] buffer = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(msg));
    await outputBlob.UploadFromStreamAsync(new MemoryStream(buffer));

    await Task.CompletedTask;
}

function.json:

{
  "bindings": [
    {
      "name": "ed",
      "connection": "rk2020iot_IOTHUB",
      "eventHubName": "rk2020iot_IOTHUBNAME",
      "consumerGroup": "function",
      "cardinality": "one",
      "direction": "in",
      "type": "eventHubTrigger"
    },
    {
      "name": "outputBlob",
      "path": "iot/rk2020iot/{DateTime}.json",
      "connection": "rk2020datalake2_STORAGE",
      "direction": "out",
      "type": "blob"
    }
  ]
}
person Roman Kiss    schedule 05.10.2020
comment
Не могли бы вы предоставить примеры или ссылки? - person user3287801; 05.10.2020
comment
Я добавил пример функции IoTHubTrigger для хранения всех данных телеметрии в файле большого двоичного объекта контейнера хранилища озера данных gen2. - person Roman Kiss; 06.10.2020