Cassandra write дает очень медленную производительность с использованием Spark

У меня есть таблица cassandra с примерно 500+ миллионами записей (в 6 узлах), теперь я пытаюсь вставить данные с помощью spark-cassandra-connector в Amazon EMR

Структура таблицы

  CREATE TABLE dmp.dmp_user_profiles_latest (
        pid text PRIMARY KEY,
        xnid int,
        day_count map<text, int>,
        first_seen map<text, timestamp>,
        last_seen map<text, timestamp>,
        usage_count map<text, int>,
        city text,
        country text,
        lid set<text>,

    )WITH bloom_filter_fp_chance = 0.01
    AND caching = '{"keys":"NONE", "rows_per_partition":"ALL"}'
    AND comment = ''
    AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'max_threshold': '32'}
    AND compression = {'chunk_length_kb': '256', 'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 172800
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.1
    AND speculative_retry = '99.0PERCENTILE';
CREATE INDEX dmp_user_profiles_latest_app_day_count_idx ON dmp.dmp_user_profiles_latest (day_count);
CREATE INDEX dmp_user_profiles_latest_country_idx ON dmp.dmp_user_profiles_latest (country);

Ниже приведены мои варианты отправки искры.

--class com.mobi.vserv.driver.Query5kPids1
--conf spark.dynamicAllocation.enabled=true  
--conf spark.yarn.executor.memoryOverhead=1024    
--conf spark.yarn.driver.memoryOverhead=1024 
--executor-memory 1g
--executor-cores 2
--driver-memory 4g

Но в журналах, которые я видел, запись в Кассандру занимает около 4-5 минут для загрузки 2 лакхов (200000) записей (в то время как общее время выполнения составляет 6+ минут)

Я также добавил следующее в Spark conf

conf.set("spark.cassandra.output.batch.size.rows", "auto");
conf.set("spark.cassandra.output.concurrent.writes", "500");
conf.set("spark.cassandra.output.batch.size.bytes", "100000");
conf.set("spark.cassandra.output.throughput_mb_per_sec","1");

Но все равно повышения производительности нет, также увеличение количества ядер в Amazon EMR не помогает.

Обратите внимание, что в моей таблице Cassandra мы не использовали столбец секционирования / кластеризации, так что это могло быть причиной такой низкой производительности.

Обратите внимание: скорость сети составляет 30 МБ PS, первичный ключ - это буквенно-цифровые значения, например - a9be3eb4-751f-48ee-b593-b3f89e18622d.

Cassandra.yaml

cluster_name: 'dmp Cluster'
num_tokens: 100
hinted_handoff_enabled: true
max_hint_window_in_ms: 10800000 # 3 hours
hinted_handoff_throttle_in_kb: 1024
max_hints_delivery_threads: 2
batchlog_replay_throttle_in_kb: 1024
authenticator: AllowAllAuthenticator
authorizer: AllowAllAuthorizer
permissions_validity_in_ms: 2000
partitioner: org.apache.cassandra.dht.Murmur3Partitioner
data_file_directories:
     - /data/cassandra/data
disk_failure_policy: stop
commit_failure_policy: stop

key_cache_size_in_mb:

key_cache_save_period: 14400
row_cache_size_in_mb: 0
row_cache_save_period: 0
counter_cache_size_in_mb:
counter_cache_save_period: 7200
saved_caches_directory: /data/cassandra/saved_caches
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
seed_provider:
 - class_name: org.apache.cassandra.locator.SimpleSeedProvider
    parameters:
 - seeds: "10.142.76.97,10.182.19.301"

concurrent_reads: 256
concurrent_writes: 128
concurrent_counter_writes: 32

memtable_allocation_type: heap_buffers
memtable_flush_writers: 8
index_summary_capacity_in_mb:
index_summary_resize_interval_in_minutes: 60
trickle_fsync: false
trickle_fsync_interval_in_kb: 10240
storage_port: 7000
ssl_storage_port: 7001
listen_address: 10.142.76.97
start_rpc: true
rpc_address: 10.23.244.172
rpc_port: 9160
rpc_keepalive: true
rpc_server_type: sync
thrift_framed_transport_size_in_mb: 15
incremental_backups: false
snapshot_before_compaction: false
auto_snapshot: true
tombstone_warn_threshold: 1000
tombstone_failure_threshold: 100000
column_index_size_in_kb: 64
batch_size_warn_threshold_in_kb: 5
concurrent_compactors: 4
compaction_throughput_mb_per_sec: 64
sstable_preemptive_open_interval_in_mb: 50
read_request_timeout_in_ms: 500000

range_request_timeout_in_ms: 1000000

write_request_timeout_in_ms: 200000

counter_write_request_timeout_in_ms: 500000

cas_contention_timeout_in_ms: 100000

endpoint_snitch: Ec2Snitch

dynamic_snitch_update_interval_in_ms: 100

dynamic_snitch_reset_interval_in_ms: 600000

dynamic_snitch_badness_threshold: 0.1

request_scheduler: org.apache.cassandra.scheduler.NoScheduler

server_encryption_options:
    internode_encryption: none
    keystore: conf/.keystore
    keystore_password: cassandra
    truststore: conf/.truststore
    truststore_password: cassandra

client_encryption_options:
    enabled: false
    keystore: conf/.keystore
    keystore_password: cassandra

internode_compression: all

inter_dc_tcp_nodelay: false

person Rahul Koshaley    schedule 05.04.2016    source источник
comment
можем ли мы получить структуру вашей базы данных?   -  person Whitefret    schedule 05.04.2016
comment
У вас есть доступ к узлу? Чтобы увидеть, как ваша база данных разбросана по кластеру? Возможно, все ваши записи попадают в один и тот же узел (таким образом, увеличивать количество узлов бесполезно)   -  person Whitefret    schedule 05.04.2016
comment
Да, у меня есть доступ, как мне это проверить? Поскольку статус nodetool показывает 6 работающих узлов, каждый из которых имеет 100 токенов.   -  person Rahul Koshaley    schedule 05.04.2016
comment
Я действительно не знаю, я не использую Amazon EMR: / Я просто хотел знать, попадают ли случайно все ваши хэши pid в один и тот же диапазон (что было бы очень неудачно)   -  person Whitefret    schedule 05.04.2016
comment
Amazon EMR я использую для обработки, а Cassandra находится в EC2 (6 узлов)   -  person Rahul Koshaley    schedule 05.04.2016
comment
У вас есть что-то вроде OpsCenter?   -  person Whitefret    schedule 05.04.2016
comment
Опять же, как мне проверить хеш-код PID, попадает ли он в тот же диапазон? Вы имеете в виду хеш-функцию Java, извините за вопрос, но что вы имеете в виду под OpsCenter?   -  person Rahul Koshaley    schedule 05.04.2016
comment
Я бы просто посмотрел, сколько данных Cassandra принимает на каждом узле. Не могли бы вы дать некоторую конфигурацию на Кассандре, например, разделитель и стратегию размещения?   -  person Whitefret    schedule 05.04.2016
comment
Итак, несколько замечаний: 1. Ваши рабочие Spark размещены на тех же машинах, что и процесс Cassandra? Если нет, у вас нет локальности данных, и все рабочие потоковые данные в сети, чтобы достичь узлов Cassandra, так что это очень дорого 2. Каковы аппаратные характеристики 6 узлов Cassandra? ПРОЦЕССОР ? Тип диска (спиннинг, SSD)? Объем памяти ? 3. Какая у вас конфигурация Cassandra? Можете ли вы дать ссылку на $CASSANDRA_HOME/conf/cassandra.yaml или /etc/cassandra/cassandra.yaml где-нибудь? Обычно сценарии массивной вставки нагружают больше ЦП, чем диск.   -  person doanduyhai    schedule 05.04.2016
comment
Привет, я использую c3.2xlarge с 8 ядрами, 15 ГБ памяти и 2 x 80 SSD, а также рабочие Cassandra и Spark на разных машинах, также я поделюсь параметрами cassandra.yaml здесь   -  person Rahul Koshaley    schedule 05.04.2016
comment
Характеристики оборудования очень приличные, установлены ли у вас рабочие Spark на тех же машинах, что и процесс Cassandra?   -  person doanduyhai    schedule 05.04.2016
comment
Хорошо, вот упражнение: передача данных по сети стоит дорого, независимо от того, идет ли речь об облаке или нет. Итак, это первое узкое место. Второе узкое место, память вашего драйвера - 4G, а ваши исполнители - 1G, как какой объем данных? Допустим, 20G, я думаю! Вы видите картинку сейчас?   -  person eliasah    schedule 05.04.2016
comment
Нет они на разных машинах   -  person Rahul Koshaley    schedule 05.04.2016
comment
Привет, elishah, я тестирую только файл размером 15 МБ (т.е. 2 лакх строк данных), так что я думаю, что памяти исполнителя достаточно.   -  person Rahul Koshaley    schedule 05.04.2016
comment
Так что ваша проблема, вероятно, может быть там. Вы генерируете много данных от рабочих Spark, и им необходимо пересечь сеть, чтобы добраться до машины Cassandra. Чтобы исключить аппаратную проблему Cassandra, установите dstat и выполните dstat в течение 30 секунд во время вставки с любого узла Cassandra. Сделайте снимок экрана с дисплеем dstat и разместите его здесь, пожалуйста   -  person doanduyhai    schedule 05.04.2016
comment
Whitefret, я отредактировал сообщение с точной структурой таблицы, так как я не знаю, как проверить стратегию разделения.   -  person Rahul Koshaley    schedule 05.04.2016
comment
15 МБ, разделенные на 6 узлов, могут быть излишними, попробуйте добавить больше данных, сравните и обновите свой вопрос   -  person eliasah    schedule 05.04.2016
comment
Я пробовал с 500 Мбайт, производительность такая же, т.е. работает более 5 часов.   -  person Rahul Koshaley    schedule 05.04.2016
comment
Также скорость сети составляет 30 МБ PS   -  person Rahul Koshaley    schedule 05.04.2016
comment
Whitefret Я проверил файл cassandra.yaml, в котором мы используем этот разделитель: org.apache.cassandra.dht.Murmur3Partitioner   -  person Rahul Koshaley    schedule 05.04.2016
comment
ну тут ничего плохого. Следующее, что я мог подумать, это индекс, у которого day_count много разных значений?   -  person Whitefret    schedule 05.04.2016
comment
Да, для каждой записи есть свое значение.   -  person Rahul Koshaley    schedule 05.04.2016
comment
Я разместил cassandra.yaml в исходном посте   -  person Rahul Koshaley    schedule 05.04.2016
comment
хорошо, вот в чем проблема. каждый раз, когда вы что-то добавляете, он должен вычислять новый индекс, что занимает много времени. Индекс предназначен для значений, которые не сильно меняются и имеют ограниченное количество значений, таких как страны, категории, жанры ...   -  person Whitefret    schedule 05.04.2016
comment
Так что для обновления он тоже изменит индекс?   -  person Rahul Koshaley    schedule 05.04.2016
comment
см. эту ссылку, которая объясняет это лучше, чем я;) docs. datastax.com/en/cql/3.1/cql/ddl/ddl_when_use_index_c.html   -  person Whitefret    schedule 05.04.2016
comment
Спасибо за подсказку, Уайтфрет, да, это похоже на проблему, поскольку day_count - это карта, и гении здесь сделали ее индексом, так что это огромный удар. Не могли бы вы подсказать, как сбросить эти индексы. Поскольку это производственная среда.   -  person Rahul Koshaley    schedule 05.04.2016
comment
Это поможет? docs.datastax.com/en/cql/3.1/cql/ cql_reference / однако я бы подумал о том, чтобы посмотреть, почему это индекс, прежде чем отбрасывать его в prod   -  person Whitefret    schedule 05.04.2016
comment
Точно я говорю с человеком, который ее разработал, но все же я чувствую, что создания карты индекса, кажется (что тоже со всеми разными значениями) следует избегать.   -  person Rahul Koshaley    schedule 05.04.2016
comment
Хорошо, предупредите меня, если это не решило вашу проблему   -  person Whitefret    schedule 05.04.2016
comment
РАХУЛ, можно попробовать сбросить все индексы и заново вставить, чтобы посмотреть, улучшится ли скорость. Если да, то виновником является вторичный индекс, если нет, проблема в другом месте.   -  person doanduyhai    schedule 05.04.2016
comment
@doanduyhai он не может этого сделать в прод   -  person Whitefret    schedule 05.04.2016
comment
Ах, не видел, что это в производстве, извините ...   -  person doanduyhai    schedule 05.04.2016


Ответы (1)


Как говорилось в комментарии, похоже, ваша проблема связана с вашим индексом на day_count.

Как видно на этой странице, индекс не будет эффективен, если вы должны обновлять их все время, и это происходит, когда вы вставляете другое значение в day_count (что, возможно, каждый раз).

Вам необходимо переработать вашу базу данных, но поскольку это ваша производственная среда, вы не можете просто DROP INDEX IF EXISTS keyspace.index_name, если этот индекс необходим, но вы можете создать вторичную базу данных, используя day_count в качестве первичного ключа, или использовать day_count в качестве индекса упорядочивания.

person Whitefret    schedule 05.04.2016
comment
Между прочим, ЕСЛИ проблема исходит из вторичного индекса (у нас нет абсолютной уверенности, если не измерить скорость вставки без этих индексов, измерять, не догадываться). Одним из решений может быть удаление индекса, чтобы вставить все данные в Cassandra, а затем воссоздать индекс. Это означало бы, что приложение не может запрашивать по индексу, пока вставка не будет завершена. - person doanduyhai; 05.04.2016
comment
Привет, как обновление, мы не используем этот индекс для запросов. Можно, я его уроню? Также вы сказали, что это неэффективно, если его обновление каждый раз, и это происходит, когда мы вставляем другое значение, в моем случае мы обновляем значения карты (индекс) каждый раз, а в некоторых случаях мы вставляем новое значение в карту. - person Rahul Koshaley; 05.04.2016
comment
Пожалуйста, предложите. Поскольку мне нужно удалить index. - person Rahul Koshaley; 05.04.2016
comment
@RahulKoshaley Я думаю, ты мог бы бросить это, но трудно сказать. лучше сначала попробовать в тестовой среде. - person Whitefret; 05.04.2016
comment
Если вы уверены, что не используете его, бросьте его, если он выдает ошибку, верните его снова - person Whitefret; 05.04.2016
comment
Привет, Уайтфрет, спасибо за предложения, но в настоящее время я думаю о реструктуризации всей структуры таблиц и проверке производительности, так как я более чем уверен, что удаление индекса на day_Count приведет к производительности, но все еще не хочу касаться существующей таблицы как его в прод. - person Rahul Koshaley; 05.04.2016
comment
@RahulKoshaley Тогда, возможно, вы можете рассмотреть возможность создания для него дополнительной таблицы в качестве исправления при работе над вашей новой моделью. Это улучшит вашу производительность, но вы должны быть уверены, что ваш запрос является атомарным. - person Whitefret; 05.04.2016
comment
Привет Whitefret, Вы имеете в виду вторичную таблицу для коллекции day_count? , если да, можете ли вы предложить структуру из своего опыта, также для PID (PK) может быть несколько записей в карте. Так что вы предлагаете? - person Rahul Koshaley; 05.04.2016
comment
Извините, что обманул вас, но я не эксперт в проектировании баз данных. Я могу только дать вам несколько советов: - прочтите документ, найденный в datastax, который дает некоторое представление о том, как выбирать переменные в ключе раздела и ключах кластеризации - внимательно выбирайте, какой индекс вы хотите - при возникновении таких проблем, когда вам нужно индекс для постоянно меняющейся переменной, не бойтесь создать другую таблицу, поскольку репликация стоит недорого и повысит производительность чтения. - person Whitefret; 05.04.2016
comment
Хорошо, спасибо, последний вопрос, неужто иметь ключ кластеризации / разделения, а также индексы? - person Rahul Koshaley; 05.04.2016
comment
@RahulKoshaley Partition Key - это то, как ваши данные будут разбросаны по кластеру. Если вы хотите иметь быстрое чтение, вам нужно сгруппировать элементы, к которым вы хотите получить доступ, вместе, потому что вы читаете быстрее, чем меньше вы читаете разные разделы. Например, в моей базе данных ключ раздела - это код процесса и страна, поэтому все данные, связанные с процессом и страной, будут храниться в одном разделе. - person Whitefret; 05.04.2016
comment
@RahulKoshaley Ключ кластеризации полезен для упорядочивания данных и для запросов. Вы не можете запрашивать с помощью предложения Where, если поле не входит в первичный ключ или не индексируется. Таким образом, либо ваше поле имеет низкую мощность, и это может быть индекс, либо, как я, с именем файла, вам нужно заказать его, чтобы он имел быстрый доступ, если вы ищете файл в предложении where - person Whitefret; 05.04.2016
comment
По сути, это зависит от запроса, вам нужно построить свою модель данных с запросом, который вы ожидаете, а не с данными, которые у вас есть. Одно из моих требований - сделать запрос по дате, чего я не могу с моей моделью, вот где я сейчас, и я колеблюсь между изменением модели или созданием нескольких таблиц. - person Whitefret; 05.04.2016
comment
@RahulKoshaley только что подумал об этом: с Кассандрой, что вы получаете от чтения, вы теряете при записи. Вторичные индексы - хороший пример, если - person Whitefret; 05.04.2016