Как анализировать данные json, которые поступают из темы kafka в классе схемы Storm?

Я получаю данные json из темы кафки. Как я могу применить синтаксический анализ json, чтобы получить все поля для всех объектов в классе схемы шторма, который использует метод десериализации, после чего я возвращаю значения в новые возвращаемые значения(). (Метод класса backtype.storm.tuple.Values ) ? То есть, если у меня есть 2 объекта json в моей теме, я зацикливаю их, чтобы получить все поля, наконец, я должен вернуть все значения методу возврата. мой возврат должен содержать все поля двух объектов json.

Моя проблема: в методе возврата возвращаются только 2 данных obj json. я думаю, что все поля 2-го объекта переопределяют поля 1-го объекта. Наконец, поля второго объекта возвращаются в конце.

может ли кто-нибудь из вас дать мне идею вернуть все поля объектов (1,2 поля объектов)....

Заранее спасибо

public class MainParserSpout implements Scheme{
  String tweet_created_at;
  String tweet_id;
  String tweet_id_str;
  String tweet_text;
  String tweet_source;`    
@Override

try{

public List<Object> deserialize(byte[] bytes){
  String twitterEvent = new String(bytes, "UTF-8");
   JSONArray JSON = new JSONArray(twitterEvent);
      for(int i=0;i<JSON.length();i++) {
        JSONObject object_tweet=JSON.getJSONObject(i);
//Tweet status                  
          try{
            this.tweet_created_at=object_tweet.getString("created_at");
            this.tweet_id=object_tweet.getString("id");
            this.tweet_id_str=object_tweet.getString("id_str");
            this.tweet_text=object_tweet.getString("text");
            this.tweet_source=object_tweet.getString("source");
          }catch(Exception e){}
    } //array for close
}catch(Exception e){}
} //JSON array close
  return new Values(tweet_created_at,tweet_id,tweet_id_str,tweet_text,tweet_source);
} //deserialize method close
public Fields getOutputFields() {
    return newFields("tweet_created_at","tweet_id","tweet_id_str","tweet_text","tweet_source");
} //getOutputFields method close
} //class close

person Dilip Bobby    schedule 30.12.2015    source источник
comment
Я не уверен, что вы хотите сделать... Можете ли вы привести небольшой пример, показывающий два объекта JSON и ожидаемые выходные кортежи, которые вы хотите получить?   -  person Matthias J. Sax    schedule 02.01.2016
comment
Я добавил пример кода. Если мой объект твита содержит два твита, только 2-е поля твита i., e: tweet_created_at, id.text, источник второго твита возвращается наконец. Пожалуйста, поделитесь идеей, как вернуть значения для каждого итерация @Matthias J. Sax   -  person Dilip Bobby    schedule 04.01.2016
comment
Ваш пример кода кажется неполным... Кроме того, deserialize должен возвращать один кортеж. Таким образом, все данные из вашего JSON должны быть собраны в одно возвращаемое значение. Вы не можете вернуть несколько кортежей из одного твита.   -  person Matthias J. Sax    schedule 04.01.2016
comment
Да ! мы не можем попытаться получить несколько кортежей. Мне нужен способ получить несколько значений кортежей, я могу назвать любой другой класс в пакете storm, который может помочь мне решить эту проблему. Я читаю данные из темы kafak. Поэтому я использовал метод десериализации. @ Matthias J. Sax   -  person Dilip Bobby    schedule 05.01.2016
comment
Не уверен, что вы подразумеваете под несколькими значениями кортежей - кортеж имеет несколько значений... Использование deserialize - правильный путь; однако вы не можете получить несколько кортежей за один вызов. Однако вы можете отправить два твита, удвоив свой кортеж, т. е. дважды указав каждое значение/поле/атрибут. После этого вы можете использовать болт, который принимает двойные твиты, разделяет этот кортеж и выдает два одиночных твита-кортежа.   -  person Matthias J. Sax    schedule 05.01.2016
comment
да хорошая идея спасибо! У меня есть идея использовать добавление с запятой для каждой итерации и сохранять значения каждой итерации в одном объекте кортежа, пока я его сохраняю. Я буду использовать регулярное выражение, я разделю их. @ Матиас Дж. Сакс   -  person Dilip Bobby    schedule 05.01.2016


Ответы (2)


Вы не можете получить несколько кортежей за один вызов deserialize. Однако вы можете отправить два твита, «удвоив» свой кортеж, т. е. дважды указав каждое значение/поле/атрибут. После этого вы можете использовать болт, который принимает «двойные твиты», разделяет этот кортеж и выдает два одиночных твита-кортежа.

Что-то вроде (я не знаком с форматом JSON Tweet, так что это больше предположение в отношении примера кода из вопроса):

@Override
public List<Object> deserialize(byte[] bytes){
  List<String> doubleTweet = new ArrayList<String>();

  try{
    String twitterEvent = new String(bytes, "UTF-8");
    JSONArray JSON = new JSONArray(twitterEvent);


    for(int i=0;i<JSON.length();i++) {
      JSONObject object_tweet=JSON.getJSONObject(i);
      for(int j=0;j<object_tweet.length();j++){
        //Tweet status                  
        try{
          doubleTweet.add(object_tweet.getString("created_at"));
          doubleTweet.add(object_tweet.getString("id"));
          doubleTweet.add(object_tweet.getString("id_str"));
          doubleTweet.add(object_tweet.getString("text"));
          doubleTweet.add(object_tweet.getString("source"));
        }catch(Exception e){}
      }
    }
  }catch(Exception e){}

  return doubleTweet;
}

doubleTweet содержит каждое поле дважды (поля 0-4 для первого твита и поля 5-9 для второго твита). Таким образом, последовательный болт может просто извлечь эти поля и создать кортеж из 5 полей для каждого твита).

В качестве альтернативы вы также можете использовать RawScheme и выполнить синтаксический анализ JSON в последующем болте. В этом болте вы можете создавать несколько кортежей (т. е. по одному для каждого твита): https://github.com/apache/storm/tree/master/external/storm-kafka#multischeme

Если вы используете RawScheme, носителем выдается кортеж с одним полем byte[]. Таким образом, вы можете сделать парсон JSON только в Bolt.execute() и вызывать Collector.emit() для каждого твита.

person Matthias J. Sax    schedule 06.01.2016
comment
можете ли вы привести пример кода для удвоения и RawScheme? @Matthias J. Sax - person Dilip Bobby; 06.01.2016
comment
мне нужно вернуть объект списка doubleTweet, чтобы вернуть newFields ()? сделал небольшую поправку в вопросе, пожалуйста, примите во внимание @Matthias J. Sax - person Dilip Bobby; 21.01.2016
comment
когда я вернул объект списка в новый feilds(); Я получаю следующую ошибку: Ошибка: java.lang.IllegalArgumentException: Кортеж создан с неправильным количеством полей. Ожидалось 1 поле, но получено 5 полей. @Matthias J. Sax - person Dilip Bobby; 21.01.2016
comment
Я никогда не работаю с Kafka и Schema в одиночку. Не уверен, почему он ожидает одно Поле вместо пяти. Можете ли вы добавить Stacktrace к своему вопросу? А код, который собирает топологию? - person Matthias J. Sax; 21.01.2016

Я упустил момент, что kafka - это система обмена сообщениями с публикацией и подпиской. когда я пытался отправить данные производителю, я отправлял 20 объектов Json chuck в виде одного сообщения, но моя схема работает только для одного Json chuck. Поэтому я разделил эти 20 объектов Json chuck на 20 json chuck и отправил каждый Чак продюсеру Json.

person Dilip Bobby    schedule 28.01.2016