Используйте устойчивый распределенный набор данных (RDD) Apache Spark с Databricks

Из-за физических ограничений индивидуальный компьютерный процессор в значительной степени достиг верхнего предела скорости с текущими конструкциями. Поэтому производители оборудования добавили на материнскую плату больше процессоров (параллельных ядер ЦП, работающих с одинаковой скоростью).

Но… большинство программных приложений, написанных за последние несколько десятилетий, не были написаны для параллельной обработки.

Кроме того, сбор данных экспоненциально увеличился из-за дешевых устройств, которые могут собирать определенные данные (например, температуру, звук, скорость…).

Для более эффективной обработки этих данных потребовались новые методы программирования.

Кластер вычислительных процессов похож на группу рабочих. Команда может работать лучше и эффективнее, чем один работник. Они объединяют ресурсы. Это означает, что они обмениваются информацией, разбивают задачи и собирают обновления и результаты, чтобы получить единый набор результатов.

Подобно тому, как фермеры перешли от работы на одном поле к работе с комбайнами и тракторами для эффективного производства продуктов питания на более крупных и большем количестве ферм, а сельскохозяйственные кооперативы упростили переработку, кластер работает вместе, чтобы справиться со сбором и обработкой более крупных и сложных данных.

Кластерные вычисления и параллельная обработка были ответом, и сегодня у нас есть фреймворк Apache Spark. Databricks - это унифицированная аналитическая платформа, используемая для запуска кластерных вычислений Spark простым и легким способом.

Что такое Spark?

Apache Spark - это молниеносная унифицированная аналитическая система для больших данных и машинного обучения. Первоначально он был разработан в Калифорнийском университете в Беркли.

Искра быстрая. Он использует преимущества вычислений в памяти и других оптимизаций. В настоящее время он является рекордсменом по масштабной сортировке на диске.

Spark использует устойчивые распределенные наборы данных (RDD) для выполнения параллельной обработки в кластере или процессорах компьютеров.

Он имеет простые в использовании API-интерфейсы для работы с большими наборами данных на различных языках программирования. Он также имеет API-интерфейсы для преобразования данных и знакомые API-интерфейсы фреймов данных для управления полуструктурированными данными.

По сути, Spark использует диспетчер кластера для координации работы в кластере компьютеров. Кластер - это группа компьютеров, которые связаны и координируют друг друга для обработки данных и вычислений.

Приложения Spark состоят из процесса-драйвера и процессов-исполнителей.

Короче говоря, процесс драйвера выполняет основную функцию, анализирует и распределяет работу между исполнителями. На самом деле исполнители выполняют поставленные задачи - выполняют код и отчитываются перед узлом драйвера.

В реальных бизнес-приложениях и в развивающемся программировании на основе искусственного интеллекта параллельная обработка становится необходимостью для повышения эффективности, скорости и сложности.

Отлично - так что же такое Databricks?

Databricks - это единая аналитическая платформа от создателей Apache Spark. Это позволяет легко запускать кластеры Spark, оптимизированные для облака, за считанные минуты.

Думайте об этом как о универсальном пакете для написания вашего кода. Вы можете использовать Spark (не беспокоясь о деталях) и получать результаты.

Он также включает записные книжки Jupyter, которые можно использовать совместно, а также обеспечивает интеграцию с GitHub, подключение ко многим широко используемым инструментам и автоматизацию мониторинга, планирования и отладки. Смотрите здесь для более подробной информации.

Вы можете бесплатно зарегистрироваться в версии для сообщества. Это позволит вам поэкспериментировать с кластерами Spark. Другие преимущества, в зависимости от плана, включают:

  • Устанавливайте и запускайте кластеры за секунды как на AWS, так и на экземплярах ЦП и графических процессоров Azure для максимальной гибкости.
  • Начните быстро работать с готовой интеграцией TensorFlow, Keras и их зависимостей от кластеров Databricks.

Давайте начнем. Если вы уже использовали Databricks раньше, переходите к следующей части. В противном случае вы можете зарегистрироваться здесь и выбрать Community Edition, чтобы попробовать ее бесплатно.

Следуйте инструкциям там. Они четкие, лаконичные и простые:

  • Создать кластер
  • Подключите записную книжку к кластеру и выполните команды в записной книжке кластера.
  • Управляйте данными и создавайте график
  • Операции с Python DataFrame API; создать DataFrame из набора данных Databricks
  • Управляйте данными и отображайте результаты

Теперь, когда вы создали программу обработки данных в кластере, давайте перейдем к другому набору данных с большим количеством операций, чтобы у вас было больше данных.

Набор данных представляет собой Отчет о мировом счастье за ​​2017 год по странам, основанный на различных факторах, таких как ВВП, щедрость, доверие, семья и другие. Поля и их описания перечислены ниже в статье.

Я ранее загрузил набор данных, а затем переместил его в DBFS (файловую систему DataBricks) Databricks, просто перетащив его в окно Databricks.

Или вы можете щелкнуть «Данные» на левой панели навигации, щелкнуть «Добавить данные», затем либо перетащить, либо просмотреть и добавить.

# File location and type
#this file was dragged and dropped into Databricks from stored #location; https://www.kaggle.com/unsdsn/world-happiness#2017.csv 
file_location = "/FileStore/tables/2017.csv"
file_type = "csv"
# CSV options
# The applied options are for CSV files. For other file types, these 
# will be ignored: Schema is inferred; first row is header - I 
# deleted header row in editor and intentionally left it 'false' to #contrast with later rdd parsing, #delimiter # separated, #file_location; if you don't delete header row, instead of reading #C0, C1, it would read "country", "dystopia" etc.

infer_schema = "true"
first_row_is_header = "false"
delimiter = ","
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)
display(df)

Теперь давайте загрузим файл в Spark Resilient Distributed Dataset (RDD), упомянутый ранее. RDD выполняет параллельную обработку в кластере или процессорах компьютеров и делает операции с данными более быстрыми и эффективными.

#load the file into Spark's Resilient Distributed Dataset(RDD)
data_file = "/FileStore/tables/2017.csv"
raw_rdd = sc.textFile(data_file).cache()
#show the top 5 lines of the file
raw_rdd.take(5)

Обратите внимание на «Задания Spark» ниже, прямо над выходными данными. Нажмите «Просмотр», чтобы просмотреть подробности, как показано во вложенном окне справа.

Databricks и Sparks отлично визуализируют процессы.

В Spark задание связано с цепочкой зависимостей RDD, организованной в прямой ациклический граф (DAG). В группе доступности базы данных ветви направляются от одного узла к другому без обратных петель. Задачи отправляются планировщику, который выполняет их с помощью конвейерной обработки для оптимизации работы и преобразования в минимальные этапы.

Не волнуйтесь, если перечисленные выше пункты покажутся вам сложными. Есть визуальные снимки процессов, происходящих на конкретном этапе, для которого вы нажали кнопку просмотра Spark Job. Эта информация может вам понадобиться, а может и не понадобиться - она ​​есть, если вам нужна.

Записи RDD разделяются запятыми, которые нам нужно разделить перед синтаксическим анализом и построением фрейма данных. Затем мы возьмем определенные столбцы из набора данных для использования.

#split RDD before parsing and building dataframe
csv_rdd = raw_rdd.map(lambda row: row.split(","))
#print 2 rows
print(csv_rdd.take(2))
#print types
print(type(csv_rdd))
print('potential # of columns: ', len(csv_rdd.take(1)[0]))
#use specific columns from dataset
from pyspark.sql import Row
parsed_rdd = csv_rdd.map(lambda r: Row(
    country = r[0],   #country, position 1, type=string
    happiness_rank = r[1],
    happiness_score = r[2],
    gdp_per_capita = r[5],
    family = r[6],
    health = r[7],
    freedom = r[8],
    generosity = r[9],
    trust = r[10],
    dystopia = r[11],
    label = r[-1]
    )
)
parsed_rdd.take(5)

Вот столбцы и определения для набора данных Happiness:

Столбцы и определения набора данных о счастье

Страна - Название страны.

Регион - регион, к которому принадлежит страна.

Рейтинг счастья - рейтинг страны на основе балла счастья.

Оценка счастья - показатель, измеряемый в 2015 году путем задания выбранным людям вопроса: «Как бы вы оценили свое счастье по шкале от 0 до 10, где 10 - самое счастливое».

Экономика (ВВП на душу населения) - степень, в которой ВВП (валовой внутренний продукт) влияет на расчет показателя счастья.

Семья - степень, в которой семья участвует в расчете показателя счастья.

Здоровье - (Ожидаемая продолжительность жизни) Степень, в которой ожидаемая продолжительность жизни способствовала расчету показателя счастья.

Свобода - степень, в которой свобода способствовала подсчету показателя счастья.

Доверие - (Коррупция правительства) Степень, в которой восприятие коррупции влияет на показатель счастья.

Щедрость - степень, в которой щедрость способствовала расчету показателя счастья.

Остаточная антиутопия - степень, в которой остаточная антиутопия способствовала расчету балла счастья (антиутопия = воображаемое место или состояние, в котором все неприятно или плохо, как правило, тоталитарное или экологически деградированное. Остаточное - то, что остается или остается после всего, остается другим учитывается или забирается).

# Create a view or table
temp_table_name = "2017_csv"
df.createOrReplaceTempView(temp_table_name)
#build dataframe from RDD created earlier
df = sqlContext.createDataFrame(parsed_rdd)
display(df.head(10)
#view the dataframe's schema
df.printSchema()

#build temporary table to run SQL commands
#table only alive for the session
#table scoped to the cluster; highly optimized
df.registerTempTable("happiness")
#display happiness_score counts using dataframe syntax
display(df.groupBy('happiness_score')
          .count()
          .orderBy('count', ascending=False)
       )
df.registerTempTable("happiness")
#display happiness_score counts using dataframe syntax
display(df.groupBy('happiness_score')
          .count()
          .orderBy('count', ascending=False)
       )

Теперь давайте воспользуемся SQL для выполнения запроса, чтобы сделать то же самое. Цель состоит в том, чтобы показать вам различные способы обработки данных и сравнить методы.

#use SQL to run query to do same thing as previously done with dataframe (count by happiness_score)
happ_query = sqlContext.sql("""
                        SELECT happiness_score, count(*) as freq
                        FROM happiness
                        GROUP BY happiness_score
                        ORDER BY 2 DESC
                        """)
display(happ_query)

Еще один SQL-запрос, чтобы попрактиковаться в обработке данных:

#another sql query
happ_stats = sqlContext.sql("""
                            SELECT
                              country,
                              happiness_rank,
                              dystopia
                            FROM happiness
                            WHERE happiness_rank > 20
                            """)
display(happ_stats)

Там! Вы сделали это - создали кластер на базе Spark и выполнили процесс запроса набора данных с использованием этого кластера. Вы можете использовать это со своими собственными наборами данных для обработки и вывода проектов больших данных.

Вы также можете поэкспериментировать с диаграммами - щелкните значок диаграммы / графика внизу любого вывода, укажите значения и тип диаграммы и посмотрите, что произойдет. Это весело.

Код размещен в записной книжке здесь на публичном форуме Databricks и будет доступен в течение примерно 6 месяцев согласно Databricks.

Спасибо за чтение! Я надеюсь, что у вас есть интересные программы с Databricks, и они вам понравятся так же, как и мне. Пожалуйста, хлопайте в ладоши, если вы нашли это интересным или полезным.

Полный список моих статей смотрите здесь.