Как изменить файл конфигурации Apache flume через код Java?

В настоящее время я работаю над проектом больших данных для анализа настроений по актуальным темам в Твиттере. Я следовал руководству Cloudera и понял, как отправлять твиты в Hadoop через Flume.

http://blog.cloudera.com/blog/2012/09/analyzing-twitter-data-with-hadoop/

Flume.conf:

# Licensed to the Apache Software Foundation (ASF) under one

# or more contributor license agreements. See the NOTICE file

# distributed with this work for additional information

# regarding copyright ownership. The ASF licenses this file

# to you under the Apache License, Version 2.0 (the

# "License"); you may not use this file except in compliance

# with the License. You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing,

# software distributed under the License is distributed on an

# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

# KIND, either express or implied. See the License for the

# specific language governing permissions and limitations

# under the License.



# The configuration file needs to define the sources, 

# the channels and the sinks.

# Sources, channels and sinks are defined per agent, 

# in this case called 'TwitterAgent'


TwitterAgent.sources = Twitter

TwitterAgent.channels = MemChannel

TwitterAgent.sinks = HDFS


TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource

TwitterAgent.sources.Twitter.channels = MemChannel

TwitterAgent.sources.Twitter.consumerKey = 

TwitterAgent.sources.Twitter.consumerSecret = 

TwitterAgent.sources.Twitter.accessToken =  

TwitterAgent.sources.Twitter.accessTokenSecret =  

TwitterAgent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing


TwitterAgent.sinks.HDFS.channel = MemChannel

TwitterAgent.sinks.HDFS.type = hdfs

TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1:8020/user/flume/tweets/%Y/%m/%d/%H/

TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream

TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text

TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000

TwitterAgent.sinks.HDFS.hdfs.rollSize = 0

TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000


TwitterAgent.channels.MemChannel.type = memory

TwitterAgent.channels.MemChannel.capacity = 10000

TwitterAgent.channels.MemChannel.transactionCapacity = 100

Теперь, чтобы распространить это на мое приложение, мне нужны разделы ключевых слов в файле конфигурации Flume, чтобы иметь актуальные темы, я понял код Java, чтобы получить трендовые темы, но у меня есть проблема, теперь я не знаю, как подключить этот код к флюму файл конфигурации или как создать новый файл с актуальными темами, добавленными в раздел ключевых слов. Я много искал в Интернете для этого, так как я новичок в этой области, было бы очень полезно, если бы вы предоставили некоторую информацию или хотя бы какую-то другую альтернативу для этого.


person Mohammed Zubair Khan    schedule 03.04.2018    source источник
comment
Вы не можете редактировать конфигурацию во время выполнения без перезапуска агента Flume.   -  person OneCricketeer    schedule 04.04.2018
comment
Ооо, тогда я постараюсь найти об этом. Спасибо @cricket_007   -  person Mohammed Zubair Khan    schedule 04.04.2018


Ответы (1)


Очень интересная задача..!

Я согласен с комментарием @cricket_007 - редактирование конфигурации без перезапуска агента Flume невозможно.

Я не смогу много сказать, так как не видел вашего Java-кода, чтобы получить ключевое слово для актуальных тем. Однако с предоставленной вами информацией есть одна альтернатива (или, скорее, обходной путь), о которой я мог подумать, но сам еще не пробовал.

Вы потенциально можете изменить файл TwitterSource.java следующим образом:

public void configure(Context context) {
consumerKey = context.getString(TwitterSourceConstants.CONSUMER_KEY_KEY);
consumerSecret = context.getString(TwitterSourceConstants.CONSUMER_SECRET_KEY);
accessToken = context.getString(TwitterSourceConstants.ACCESS_TOKEN_KEY);
accessTokenSecret = context.getString(TwitterSourceConstants.ACCESS_TOKEN_SECRET_KEY);

//MODIFY THE FOLLOWING PORTION
String keywordString = context.getString(TwitterSourceConstants.KEYWORDS_KEY, "");
if (keywordString.trim().length() == 0) {
    keywords = new String[0];
} else {
  keywords = keywordString.split(",");
  for (int i = 0; i < keywords.length; i++) {
    keywords[i] = keywords[i].trim();
  }
}
//UNTIL THIS POINT

ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setOAuthConsumerKey(consumerKey);
cb.setOAuthConsumerSecret(consumerSecret);
cb.setOAuthAccessToken(accessToken);
cb.setOAuthAccessTokenSecret(accessTokenSecret);
cb.setJSONStoreEnabled(true);
cb.setIncludeEntitiesEnabled(true);

twitterStream = new TwitterStreamFactory(cb.build()).getInstance(); 
}

Я разместил комментарий выше, где вы инициализируете переменную ключевого слова - вы можете вызвать свой код Java (я предполагаю, что это метод, из которого вы можете вернуть строку ключевых слов, разделенных запятыми), вместо того, чтобы извлекать это из контекст доступен в flume.conf (просто удалите часть context.getString()).

Наряду с этим просто удалите следующий оператор из flume.conf:

TwitterAgent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing

Надеюсь, это поможет.

person Lalit    schedule 04.04.2018
comment
Я попробовал это, следуя этому коду, чтобы получить тенденции milindjagre.wordpress.com/2016/10/19/, но не удалось собрать пакет, так как API getTrends() может вызвать исключение TwitterException, которое необходимо обработать, но не может обрабатываться в функции configurable(), так как это переопределяющая функция. Чтобы получить тренды, мне нужно как-то обрабатывать исключения, но из-за переопределения я не могу. - person Mohammed Zubair Khan; 05.04.2018
comment
А, ну ладно. Я не думал об этом. Но будет ли ваша сборка ошибаться, даже если вы используете блок try-catch или просто при объявлении throws? Я только что провел эксперимент со своим собственным кодом и добавил фиктивный блок try-catch, и мне удалось собрать пакет. Просто чтобы добавить еще одно замечание, я использую метод doConfigure(), который находится в AbstractEventDrivenSource, в отличие от метода configure() в интерфейсе Configurable. - person Lalit; 05.04.2018
comment
Спасибо, это сработало, добавив блок try catch, и я смог получить от Джейсона твиты о популярных темах в файловой системе Hadoop. Теперь я хочу провести сентиментальный анализ этих твитов, у вас есть предложения по этому поводу. - person Mohammed Zubair Khan; 05.04.2018
comment
Я рад, что это сработало для вас. :) Однако в отношении анализа настроений я еще не пробовал это сам, но заметил, что на эту тему доступно довольно много ссылок. Поскольку вы делаете это на Java, вот один API — lexalytics.com/support, который предлагает Java SDK для этой цели. Итак, я бы порекомендовал вам просто начать и опубликовать еще один вопрос, если у вас возникнут какие-либо проблемы. Сообщество здесь очень полезное, и вам, безусловно, удастся пройти через это. - person Lalit; 06.04.2018