Достижение ускорения вычислений для TempusML

Акшай Метре, разработчик TempusML

TempusML и Apache Ignite

Во время работы над Tempus ML, новым ускорителем студии машинного обучения в Hashmap, мне понадобилась платформа, которая позволила бы мне эффективно управлять данными между элементами вычислений.

В частности, я хотел избежать записи на диск или записи в базу данных между дискретными вычислительными шагами. Я искал что-то, что позволило бы мне легко передавать результаты вычислений между слабосвязанными вычислительными модулями.

Войдите в Apache Ignite.

Apache Ignite - ускорение вычислений за счет кэширования данных в памяти

Apache Ignite - это чрезвычайно богатая, ориентированная на память, распределенная платформа со значительным количеством многофункциональных возможностей, включая совместную обработку, распределенный SQL и распределенное значение ключа, долговечность в памяти, а также масштабируемость, доступность и согласованность.

Если вы потратите время на просмотр списка компонентов, вы скоро поймете, что это не просто хранилище кеша, но также предоставляет несколько мощных API для обработки данных. Обязательно ознакомьтесь с примерами проверенных вариантов использования Apache Ignite.

Использование распределенного кэша Ignite с вычислительными механизмами Spark и Flink

В этом посте я рассмотрю предстоящие API-интерфейсы библиотеки Ignite из 2.5.0-SNAPSHOT, уделяя особое внимание кеш-хранилищу для вычислительных механизмов Spark и Flink. Я покажу вам, как мы можем эффективно обмениваться данными между заданиями Spark и даже между гетерогенными вычислительными средами.

Примечание: чтобы использовать версию ignite 2.5.0-SNAPSHOT, клонируйте ignite из GitHub и следуйте документации для сборки.

Запись фреймов данных Spark в Ignite

Вы можете напрямую записать Spark DataFrames в Ignite, установив параметры, связанные с Ignite, как показано в приведенном ниже фрагменте кода:

импорт org.apache.ignite.spark.IgniteDataFrameSettings. _

// Сохраняем данные с именем таблицы
val
tableName = «iot_data»

// Файл конфигурации кеширования
val
CONFIG = getClass.getResource («/ cache.xml»). getPath

df .write
.format (FORMAT_IGNITE)
.option (OPTION_CONFIG_FILE, CONFIG )
.option (OPTION_TABLE, tableName)
.option (OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, «device_id»)
.option (OPTION_CREATE_TABLE_PARAMETERS, «шаблон = реплицированный»)
.save ()

Это создаст кеш с именем «SQL_PUBLIC _» + tableName и сохранит заданный фрейм данных.

Вы можете предоставить список ключей, разделенных запятыми, для параметра OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS.

Чтобы создать столбцы таблицы в соответствии с DataFrame, используйте «template = replicated».

Чтение кадров данных Spark из Ignite

Вот фрагмент кода для чтения кэшированных данных в Spark DataFrames:

val df1 = spark .read
.format (FORMAT_IGNITE) // Тип источника данных. < br /> .option (OPTION_TABLE, tableName) // Таблица для чтения.
.option (OPTION_CONFIG_FILE, CONFIG ) // Зажигаем конфиг.
.load ()

Прямое чтение из кеша Ignite

После сохранения данных в Ignite вам не нужны Spark API для просмотра (или даже обработки данных). Вы можете использовать SQL API для запроса сохраненных данных, как показано ниже:

импорт scala.collection.JavaConversions._

val CACHE_NAME = «tempCache»
val
ccfg = new CacheConfiguration [Any, Any] (CACHE_NAME)
val cache = ignite .getOrCreateCache ( ccfg)

cursor = cache .query (new SqlFieldsQuery (s ”SELECT * FROM $ tableName limit 20 '))
val clCnt = cursor .getColumnsCount ()
val fieldNames = (от 0 до clCnt) .map (cursor .getFieldName (_))
val data = cusrsor .getAll

println («Чтение данных из кеша:»)
println (fieldNames .mkString («[«, «,«, «]»))
data .foreach {row ⇒ println (row.mkString («[«, «,«, «]»))}
println («Конец»)

Чтобы читать данные напрямую из Ignite, вам понадобится временный экземпляр кеша. Этот кеш можно создать с помощью API getOrCreateCache и предоставив CacheConfiguratio n.

Как только вы получите экземпляр кеша, используйте SQL API для запроса и изменения данных.

Чтение наборов данных Flink из Ignite

Вы также можете прочитать кэшированные данные в формате Flink DataSet, как показано ниже:

импорт scala.collection.JavaConversions._

val data InScalaCollection: List [List [String]] = data .toList.map (_. map (_. toString) .toList)

val env = ExecutionEnvironment. getExecutionEnvironment
неявный val typeInfo = TypeInformation . из (classOf [List [String]])

val набор данных: DataSet [List [String]] = env .fromCollection (data InScalaCollection)

println («Чтение данных из кеша:»)
println (fieldNames .mkString («[«, «,«, « ] »))
dataset.print ()
println (« Конец »)

Вы можете использовать метод fromCollection для создания наборов данных Flink из коллекции на основе Scala. Для этого метода также требуется тип коллекции (неявно). Если элементы внутри коллекции не относятся к базовому типу данных, вам нужно будет предоставить экземпляр TypeInformation, как показано выше.

Полный исходный код приведенных выше фрагментов находится на github: https://github.com/hashmapinc/Ignite2.5.0Examples

Ваши отзывы всегда приветствуются!

Если вы хотите присоединиться к нам в этом путешествии по Tempus, не стесняйтесь отправлять запросы функций, отчеты об ошибках или отправлять запросы на вытягивание на github, и если вам нужна дополнительная информация о том, как Hashmap, Apache Ignite , и TempusML может помочь вашей организации ускорить разработку программ обработки данных и искусственного интеллекта / машинного обучения, свяжитесь с нами.

Не стесняйтесь делиться на других каналах и будьте в курсе всего нового контента с Hashmap на https://medium.com/hashmapinc.

Акшай Метре - разработчик TempusML в Hashmap, работающий в команде инженеров в разных отраслях с группой инновационных технологов и экспертов в предметной области, ускоряющих получение ценных бизнес-результатов для наших клиентов.