Запустите Apache Flink с Amazon S3

Кому-нибудь удается использовать Apache Flink 0.9 для обработки данных, хранящихся на AWS S3? Я обнаружил, что они используют собственную S3FileSystem вместо системы от Hadoop ... и похоже, что это не работает. Я поместил следующий путь s3: //bucket.s3.amazonaws.com/folder, он не удался со следующим исключением:

java.io.IOException: не удается установить соединение с Amazon S3: com.amazonaws.services.s3.model.AmazonS3Exception: рассчитанная нами подпись запроса не соответствует предоставленной вами подписи. Проверьте свой ключ и метод подписи. (Сервис: Amazon S3; Код состояния: 403;


person Konstantin Kudryavtsev    schedule 06.10.2015    source источник
comment
Похоже, поддержка собственной файловой системы S3 в Flink не работает. Похоже, что другой коммиттер Apache Flink также увидел и подтвердил проблему: apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/   -  person Robert Metzger    schedule 06.10.2015
comment
да это. К сожалению, конец истории не ясен из ветки ... так что я надеюсь, что это было исправлено ...   -  person Konstantin Kudryavtsev    schedule 06.10.2015
comment
Я продолжу обсуждение в списке рассылки Flink.   -  person Robert Metzger    schedule 06.10.2015


Ответы (2)


Обновление, май 2016 г.. В документации Flink теперь есть страница на как использовать Flink с AWS


Этот вопрос также был задан в списке рассылки пользователей Flink, и я ответил на него там: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Processing-S3-data-with-Apache-Flink-td3046.html

tl;dr:

Программа Flink

public class S3FileSystem {
   public static void main(String[] args) throws Exception {
      ExecutionEnvironment ee = ExecutionEnvironment.createLocalEnvironment();
      DataSet<String> myLines = ee.readTextFile("s3n://my-bucket-name/some-test-file.xml");
      myLines.print();
   }
}

Добавьте следующее в core-site.xml и сделайте его доступным для Flink:

<property>
    <name>fs.s3n.awsAccessKeyId</name>
    <value>putKeyHere</value>
</property>

<property>
    <name>fs.s3n.awsSecretAccessKey</name>
    <value>putSecretHere</value>
</property>
<property>
    <name>fs.s3n.impl</name>
    <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
</property>
person Robert Metzger    schedule 06.10.2015

вы можете получить артефакты из корзины S3, указанной в разделе вывода шаблона CloudFormation. т.е. после того, как среда выполнения Flink запущена, программа процессора потока такси может быть отправлена ​​в среду выполнения Flink, чтобы начать анализ событий поездки в потоке Amazon Kinesis в реальном времени.

$ aws s3 cp s3://«artifact-bucket»/artifacts/flink-taxi-stream-processor-1.0.jar .
$ flink run -p 8 flink-taxi-stream-processor-1.0.jar --region «AWS region» --stream «Kinesis stream name» --es-endpoint https://«Elasticsearch endpoint»

Обе приведенные выше команды используют Amazon S3 в качестве источника, вам необходимо указать имя артефакта соответственно.

Примечание: вы можете перейти по ссылке ниже и создать конвейер с использованием ведер EMR и S3.

https://aws.amazon.com/blogs/big-data/build-a-real-time-stream-processing-pipeline-with-apache-flink-on-aws/

person hanif s    schedule 25.07.2017