Существуют ли в Java эквиваленты C ++ для функций ввода-вывода с разделителями протокольных буферов?

Я пытаюсь прочитать / записать несколько сообщений протокольных буферов из файлов как на C ++, так и на Java. Google предлагает писать префиксы длины перед сообщениями, но по умолчанию это невозможно сделать (что я мог видеть).

Однако Java API в версии 2.1.0 получил набор функций ввода-вывода с разделителями, которые, по-видимому, выполняют эту работу:

parseDelimitedFrom
mergeDelimitedFrom
writeDelimitedTo

Есть ли эквиваленты C ++? А если нет, то каков формат проводов для префиксов размера, которые прикрепляет Java API, чтобы я мог анализировать эти сообщения на C ++?


Обновлять:

Теперь они существуют в google/protobuf/util/delimited_message_util.h с v3.3.0.


person tzaman    schedule 26.02.2010    source источник
comment
Я не знаю, но он с открытым исходным кодом, так что вы можете узнать из источника.   -  person Douglas Leeder    schedule 26.02.2010
comment
Да, это то, чем я закончил. :) Смотрите мой ответ ниже.   -  person tzaman    schedule 26.02.2010
comment
Начиная с v3.3.0 google :: protobuf :: util предлагает методы сообщений с разделителями для MessageLite.   -  person Kenji Noguchi    schedule 20.12.2018
comment
@KenjiNoguchi Спасибо за подсказку! Я обновил вопрос, включив это.   -  person tzaman    schedule 21.12.2018


Ответы (11)


Я немного опоздал на вечеринку, но приведенные ниже реализации включают некоторые оптимизации, отсутствующие в других ответах, и не потерпят неудачу после ввода 64 МБ (хотя он по-прежнему применяет ограничение в 64 МБ для каждого отдельного сообщения, но не для всего потока) .

(Я являюсь автором библиотек protobuf C ++ и Java, но больше не работаю в Google. Извините, что этот код так и не попал в официальную библиотеку. Вот как бы он выглядел, если бы был.)

bool writeDelimitedTo(
    const google::protobuf::MessageLite& message,
    google::protobuf::io::ZeroCopyOutputStream* rawOutput) {
  // We create a new coded stream for each message.  Don't worry, this is fast.
  google::protobuf::io::CodedOutputStream output(rawOutput);

  // Write the size.
  const int size = message.ByteSize();
  output.WriteVarint32(size);

  uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
  if (buffer != NULL) {
    // Optimization:  The message fits in one buffer, so use the faster
    // direct-to-array serialization path.
    message.SerializeWithCachedSizesToArray(buffer);
  } else {
    // Slightly-slower path when the message is multiple buffers.
    message.SerializeWithCachedSizes(&output);
    if (output.HadError()) return false;
  }

  return true;
}

bool readDelimitedFrom(
    google::protobuf::io::ZeroCopyInputStream* rawInput,
    google::protobuf::MessageLite* message) {
  // We create a new coded stream for each message.  Don't worry, this is fast,
  // and it makes sure the 64MB total size limit is imposed per-message rather
  // than on the whole stream.  (See the CodedInputStream interface for more
  // info on this limit.)
  google::protobuf::io::CodedInputStream input(rawInput);

  // Read the size.
  uint32_t size;
  if (!input.ReadVarint32(&size)) return false;

  // Tell the stream not to read beyond that size.
  google::protobuf::io::CodedInputStream::Limit limit =
      input.PushLimit(size);

  // Parse the message.
  if (!message->MergeFromCodedStream(&input)) return false;
  if (!input.ConsumedEntireMessage()) return false;

  // Release the limit.
  input.PopLimit(limit);

  return true;
}
person Kenton Varda    schedule 08.04.2014
comment
Эй, спасибо за ответ, Кентон! Я переключусь с принятого на этот вместо своего. Хотя я подозреваю, что лучший ответ на данный момент - использовать вместо этого Cap'n Proto? :) - person tzaman; 09.04.2014
comment
Также - почему бы не объединить это с официальной библиотекой protobuf на code.google? - person tzaman; 09.04.2014
comment
В качестве официального патча потребуется дополнительная работа, например, модульные тесты, может быть, улучшенная история восстановления ошибок и т. Д., А между Cap'n Proto и Sandstorm.io у меня просто нет времени. : / Если кто-то хочет заявить об этом как о своем собственном и продвигать его вверх по течению, не стесняйтесь. Обе функции, вероятно, должны стать методами на MessageLite. Вам, вероятно, следует обсудить с текущими сопровождающими, прежде чем выполнять какую-либо работу, поскольку у них могут быть свои собственные планы. - person Kenton Varda; 09.04.2014
comment
как насчет создания OstreamOutputStream каждый раз (обертывание std :: ostream arg), это тоже быстро? - person mukunda; 13.11.2014
comment
@mukunda - я думаю, что OstreamOutputStream - это экземпляр CopyingOutputStream, который выделяет буфер, поэтому его создание и уничтожение происходит медленно. Кроме того, при уничтожении любые непрочитанные данные, находящиеся в данный момент в буфере, просто отбрасываются (не перематываются), поэтому вы можете остаться с неопределенным смещением в потоке. - person Kenton Varda; 14.11.2014
comment
Я только что заметил, что вторая часть моего предыдущего комментария неверна. Беспокойство о непрочитанных данных в буфере относится только к входным потокам (например, IstreamInputStream), но не к выходным потокам. Впрочем, первая часть комментария все еще применима. - person Kenton Varda; 24.02.2015
comment
Обратной стороной использования заголовка varint является то, что его сложно указать асинхронному API (например, ASIO), чтобы он уведомлял вас, когда был прочитан весь заголовок. Использование целых чисел фиксированного размера тривиально: вы просто просите подождать, пока не будут получены 4 байта (с asio :: transfer_at_least). Для varints вы хотите оптимизировать для общего случая, когда весь заголовок читается сразу, избегая при этом квадратичного поведения, если кто-то отправляет бесконечный поток байтов с установленным старшим битом. Кроме того, мне кажется, что вся эта логика слишком высока для кода чтения сокета. - person Jim Oldfield; 13.11.2015
comment
Не имеет отношения к моему последнему комментарию: если бы кто-то интегрировал это в основной проект protobuf, было бы здорово, если бы поддержка была также включена в переключатели protoc --encode / decode (с несколькими сообщениями) для чтения и записи двоичных файлов protobuf. - person Jim Oldfield; 13.11.2015
comment
О да, мне, наверное, следует упомянуть, что я отправил в protobuf запрос на добавление этих функций три месяца назад ... хотя еще не принят: github.com/google/protobuf/pull/710 - person Kenton Varda; 15.11.2015
comment
@KentonVarda Что было бы эквивалентом этих функций в Python? - person fireboot; 17.12.2015
comment
@fireboot Извините, я не писал библиотеку Python, поэтому я не так хорошо с ней знаком. Мне пришлось бы немного покопаться, чтобы понять это, и, к сожалению, у меня нет времени. : / Однако я, вероятно, мог бы проверить код, созданный кем-то другим. - person Kenton Varda; 17.12.2015
comment
@KentonVarda Я попробую в ближайшие дни и опубликую здесь - person fireboot; 18.12.2015
comment
@KentonVarda Я только что разместил свою версию в этой ветке. Комментарии приветствуются :) - person fireboot; 31.12.2015
comment
@kentonVarda, я ищу способ написать, например, несколько пользователей, используя пользовательское сообщение, а затем получить одного конкретного пользователя, используя байтовую позицию этого пользователя, это решение, которое мне нужно? - person Stellan; 28.06.2017
comment
@JimOldfield - полностью согласен с варинтами. Если вы контролируете обе стороны соединения, использование длины в 4 байта значительно упрощает работу. - person aggieNick02; 27.03.2018
comment
@KentonVarda, этот код отлично выглядит! Но я не могу заставить его работать. Я передаю новый :: google :: protobuf :: io :: OstreamOutputStream (& file) как rawInput, с сообщением длиной 9458 байт, он генерирует файл длиной 8192 байта ... Конечно, десериализация не удалась, сообщение усечено . Какие-то мысли? - person Alejandro Silvestri; 07.12.2018
comment
@AlejandroSilvestri Похоже, некоторые буферы не были очищены. Я считаю, что данные буфера OstreamOutputStream и std :: ostream. Они сбросят эти данные, когда вы уничтожите объекты. Я думаю, что у них также могут быть явные методы очистки, которые вы можете вызвать, если вы еще не хотите уничтожать объекты. - person Kenton Varda; 07.12.2018
comment
Подтвержденный. Как сказал @KentonVarda, мне пришлось уничтожить OstreamOutputStream перед закрытием файла ofstream. - person Alejandro Silvestri; 19.01.2019
comment
Работает безупречно! :) Я использовал Parse / SerializePartialFromZeroCopyStream, казалось, что с записью все в порядке (большой файл), но я смог разобрать только последнее сообщение. Это проблема файлового курсора? Я не понимаю различий между Parse / Serialize .. от read / writeDelimited ... кроме двух последних, записывающих размер сообщения - person Maskim; 21.03.2019
comment
Пришлось прочитать это дважды bc. Я имел в виду, что разделитель означает, что вы должны написать некоторый разделитель после сообщения. Вместо этого вы просто сначала напишите этот размер ... - person Nils; 19.07.2019
comment
К вашему сведению, упомянутый выше PR был объединен, и теперь функции, указанные в этом ответе, доступны в delimited_message_util.h. - person Lukas; 17.12.2019
comment
Помните, что в последней версии protobuf google :: protobuf :: io :: CodedInputStream input (rawInput); немедленно буферизует / читает много байтов, поэтому не полагайтесь на объект потока IstreamInputStream, чтобы узнать фактическое смещение текущего прочитанного сообщения. Это после прочтения первого сообщения, stream.tell () может быть в конце потока, а не в конце первого сообщения в потоке! - person Niki; 14.05.2021
comment
@Niki Это всегда было правдой. CodedInputStream немедленно запрашивает у базового ZeroCopyInputStream весь буфер данных. Деструктор CodedInputStream возвращает ту часть буфера, которая не использовалась. Итак, если CodedInputStream все еще активен, то базовый ZeroCopyInputStream будет в неопределенном состоянии. Вот как это работало, когда я писал это в 2006 году ... - person Kenton Varda; 15.05.2021
comment
Вы правы @KentonVarda. Я подтвердил, что это было то же самое в ранней версии, это было изменение способа использования, которое вызвало у нас проблему. Спасибо за исправление! - person Niki; 17.05.2021

Хорошо, поэтому мне не удалось найти функции C ++ верхнего уровня, реализующие то, что мне нужно, но некоторые подробности в справочнике по Java API обнаружили следующее внутри MessageLite:

void writeDelimitedTo(OutputStream output)
/*  Like writeTo(OutputStream), but writes the size of 
    the message as a varint before writing the data.   */

Таким образом, префикс размера Java - это переменная (буферы протокола)!

Вооружившись этой информацией, я покопался в C ++ API и нашел CodedStream, в котором есть:

bool CodedInputStream::ReadVarint32(uint32 * value)
void CodedOutputStream::WriteVarint32(uint32 value)

Используя их, я смогу развернуть свои собственные функции C ++, которые выполняют эту работу.

Однако им действительно стоит добавить это в основной API сообщений; ему не хватает функциональности, учитывая, что она есть в Java, как и отличный порт protobuf-net C # Марка Гравелла (через SerializeWithLengthPrefix и DeserializeWithLengthPrefix).

person tzaman    schedule 26.02.2010
comment
да. Вот так я решил эту проблему. Я добавил еще один ответ с образцом псевдокода для написания сообщения. - person Yukiko; 26.02.2010

Я решил ту же проблему, используя CodedOutputStream / ArrayOutputStream для записи сообщения (с размером) и CodedInputStream / ArrayInputStream для чтения сообщения (с размером).

Например, следующий псевдокод записывает размер сообщения, следующий за сообщением:

const unsigned bufLength = 256;
unsigned char buffer[bufLength];
Message protoMessage;

google::protobuf::io::ArrayOutputStream arrayOutput(buffer, bufLength);
google::protobuf::io::CodedOutputStream codedOutput(&arrayOutput);

codedOutput.WriteLittleEndian32(protoMessage.ByteSize());
protoMessage.SerializeToCodedStream(&codedOutput);

При написании вы также должны убедиться, что ваш буфер достаточно велик, чтобы поместиться в сообщение (включая размер). И при чтении вы должны убедиться, что ваш буфер содержит все сообщение (включая размер).

Определенно было бы удобно, если бы они добавили удобные методы в C ++ API, аналогичные тем, которые предоставляет Java API.

person Yukiko    schedule 26.02.2010
comment
Я буду использовать базовый OstreamOutputStream, поэтому проверка длины не потребуется, но спасибо за ответ. :) В вашем случае, я бы, вероятно, пошел с установкой bufLength на protoMessage.ByteSize() плюс немного дополнительных для префикса размера. - person tzaman; 28.02.2010

IsteamInputStream очень уязвим для eofs и других ошибок, которые легко возникают при использовании вместе с std :: istream. После этого потоки protobuf навсегда повреждаются, а все уже использованные данные буфера уничтожаются. В protobuf есть правильная поддержка чтения из традиционных потоков.

Внедрите google::protobuf::io::CopyingInputStream и используйте это вместе с CopyingInputStreamAdaptor" rel="noreferrer"> CopyingInputStreamAdaptor"> CopyingInputStreamAdaptor"> / а>. Сделайте то же самое для вариантов вывода.

На практике вызов синтаксического анализа заканчивается в google::protobuf::io::CopyingInputStream::Read(void* buffer, int size), где задан буфер. Осталось только как-то прочесть это.

Вот пример для использования с синхронизированными потоками Asio (SyncReadStream / SyncWriteStream) :

#include <google/protobuf/io/zero_copy_stream_impl_lite.h>

using namespace google::protobuf::io;


template <typename SyncReadStream>
class AsioInputStream : public CopyingInputStream {
    public:
        AsioInputStream(SyncReadStream& sock);
        int Read(void* buffer, int size);
    private:
        SyncReadStream& m_Socket;
};


template <typename SyncReadStream>
AsioInputStream<SyncReadStream>::AsioInputStream(SyncReadStream& sock) :
    m_Socket(sock) {}


template <typename SyncReadStream>
int
AsioInputStream<SyncReadStream>::Read(void* buffer, int size)
{
    std::size_t bytes_read;
    boost::system::error_code ec;
    bytes_read = m_Socket.read_some(boost::asio::buffer(buffer, size), ec);

    if(!ec) {
        return bytes_read;
    } else if (ec == boost::asio::error::eof) {
        return 0;
    } else {
        return -1;
    }
}


template <typename SyncWriteStream>
class AsioOutputStream : public CopyingOutputStream {
    public:
        AsioOutputStream(SyncWriteStream& sock);
        bool Write(const void* buffer, int size);
    private:
        SyncWriteStream& m_Socket;
};


template <typename SyncWriteStream>
AsioOutputStream<SyncWriteStream>::AsioOutputStream(SyncWriteStream& sock) :
    m_Socket(sock) {}


template <typename SyncWriteStream>
bool
AsioOutputStream<SyncWriteStream>::Write(const void* buffer, int size)
{   
    boost::system::error_code ec;
    m_Socket.write_some(boost::asio::buffer(buffer, size), ec);
    return !ec;
}

Использование:

AsioInputStream<boost::asio::ip::tcp::socket> ais(m_Socket); // Where m_Socket is a instance of boost::asio::ip::tcp::socket
CopyingInputStreamAdaptor cis_adp(&ais);
CodedInputStream cis(&cis_adp);

Message protoMessage;
uint32_t msg_size;

/* Read message size */
if(!cis.ReadVarint32(&msg_size)) {
    // Handle error
 }

/* Make sure not to read beyond limit of message */
CodedInputStream::Limit msg_limit = cis.PushLimit(msg_size);
if(!msg.ParseFromCodedStream(&cis)) {
    // Handle error
}

/* Remove limit */
cis.PopLimit(msg_limit);
person Community    schedule 15.11.2012
comment
Это было огромным подспорьем. Я пробовал делать protobuf через сокеты, используя интерфейс asio istream / ostream и обертывая их в IStreamInputStream / OStreamOutputStream, и не смог заставить его работать. Спасибо, что разместили это. С его помощью и функциями Кентона вы можете довольно легко создать клиент / сервер для работы с protobuf на C ++ с помощью asio. - person aggieNick02; 21.06.2016

Ну вот:

#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>

using namespace google::protobuf::io;

class FASWriter 
{
    std::ofstream mFs;
    OstreamOutputStream *_OstreamOutputStream;
    CodedOutputStream *_CodedOutputStream;
public:
    FASWriter(const std::string &file) : mFs(file,std::ios::out | std::ios::binary)
    {
        assert(mFs.good());

        _OstreamOutputStream = new OstreamOutputStream(&mFs);
        _CodedOutputStream = new CodedOutputStream(_OstreamOutputStream);
    }

    inline void operator()(const ::google::protobuf::Message &msg)
    {
        _CodedOutputStream->WriteVarint32(msg.ByteSize());

        if ( !msg.SerializeToCodedStream(_CodedOutputStream) )
            std::cout << "SerializeToCodedStream error " << std::endl;
    }

    ~FASWriter()
    {
        delete _CodedOutputStream;
        delete _OstreamOutputStream;
        mFs.close();
    }
};

class FASReader
{
    std::ifstream mFs;

    IstreamInputStream *_IstreamInputStream;
    CodedInputStream *_CodedInputStream;
public:
    FASReader(const std::string &file), mFs(file,std::ios::in | std::ios::binary)
    {
        assert(mFs.good());

        _IstreamInputStream = new IstreamInputStream(&mFs);
        _CodedInputStream = new CodedInputStream(_IstreamInputStream);      
    }

    template<class T>
    bool ReadNext()
    {
        T msg;
        unsigned __int32 size;

        bool ret;
        if ( ret = _CodedInputStream->ReadVarint32(&size) )
        {   
            CodedInputStream::Limit msgLimit = _CodedInputStream->PushLimit(size);
            if ( ret = msg.ParseFromCodedStream(_CodedInputStream) )
            {
                _CodedInputStream->PopLimit(msgLimit);      
                std::cout << mFeed << " FASReader ReadNext: " << msg.DebugString() << std::endl;
            }
        }

        return ret;
    }

    ~FASReader()
    {
        delete _CodedInputStream;
        delete _IstreamInputStream;
        mFs.close();
    }
};
person jaybny    schedule 02.10.2012

Я столкнулся с одной и той же проблемой как в C ++, так и в Python.

Для версии C ++ я использовал смесь кода, который Кентон Варда опубликовал в этом потоке, и кода из запроса на перенос, который он отправил команде protobuf (потому что версия, опубликованная здесь, не обрабатывает EOF, а та, которую он отправил в github, выполняет ).

#include <google/protobuf/message_lite.h>
#include <google/protobuf/io/zero_copy_stream.h>
#include <google/protobuf/io/coded_stream.h>


bool writeDelimitedTo(const google::protobuf::MessageLite& message,
    google::protobuf::io::ZeroCopyOutputStream* rawOutput)
{
    // We create a new coded stream for each message.  Don't worry, this is fast.
    google::protobuf::io::CodedOutputStream output(rawOutput);

    // Write the size.
    const int size = message.ByteSize();
    output.WriteVarint32(size);

    uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
    if (buffer != NULL)
    {
        // Optimization:  The message fits in one buffer, so use the faster
        // direct-to-array serialization path.
        message.SerializeWithCachedSizesToArray(buffer);
    }

    else
    {
        // Slightly-slower path when the message is multiple buffers.
        message.SerializeWithCachedSizes(&output);
        if (output.HadError())
            return false;
    }

    return true;
}

bool readDelimitedFrom(google::protobuf::io::ZeroCopyInputStream* rawInput, google::protobuf::MessageLite* message, bool* clean_eof)
{
    // We create a new coded stream for each message.  Don't worry, this is fast,
    // and it makes sure the 64MB total size limit is imposed per-message rather
    // than on the whole stream.  (See the CodedInputStream interface for more
    // info on this limit.)
    google::protobuf::io::CodedInputStream input(rawInput);
    const int start = input.CurrentPosition();
    if (clean_eof)
        *clean_eof = false;


    // Read the size.
    uint32_t size;
    if (!input.ReadVarint32(&size))
    {
        if (clean_eof)
            *clean_eof = input.CurrentPosition() == start;
        return false;
    }
    // Tell the stream not to read beyond that size.
    google::protobuf::io::CodedInputStream::Limit limit = input.PushLimit(size);

    // Parse the message.
    if (!message->MergeFromCodedStream(&input)) return false;
    if (!input.ConsumedEntireMessage()) return false;

    // Release the limit.
    input.PopLimit(limit);

    return true;
}

А вот моя реализация на python2:

from google.protobuf.internal import encoder
from google.protobuf.internal import decoder

#I had to implement this because the tools in google.protobuf.internal.decoder
#read from a buffer, not from a file-like objcet
def readRawVarint32(stream):
    mask = 0x80 # (1 << 7)
    raw_varint32 = []
    while 1:
        b = stream.read(1)
        #eof
        if b == "":
            break
        raw_varint32.append(b)
        if not (ord(b) & mask):
            #we found a byte starting with a 0, which means it's the last byte of this varint
            break
    return raw_varint32

def writeDelimitedTo(message, stream):
    message_str = message.SerializeToString()
    delimiter = encoder._VarintBytes(len(message_str))
    stream.write(delimiter + message_str)

def readDelimitedFrom(MessageType, stream):
    raw_varint32 = readRawVarint32(stream)
    message = None

    if raw_varint32:
        size, _ = decoder._DecodeVarint32(raw_varint32, 0)

        data = stream.read(size)
        if len(data) < size:
            raise Exception("Unexpected end of file")

        message = MessageType()
        message.ParseFromString(data)

    return message

#In place version that takes an already built protobuf object
#In my tests, this is around 20% faster than the other version 
#of readDelimitedFrom()
def readDelimitedFrom_inplace(message, stream):
    raw_varint32 = readRawVarint32(stream)

    if raw_varint32:
        size, _ = decoder._DecodeVarint32(raw_varint32, 0)

        data = stream.read(size)
        if len(data) < size:
            raise Exception("Unexpected end of file")

        message.ParseFromString(data)

        return message
    else:
        return None

Возможно, это не самый лучший код, и я уверен, что его можно немного реорганизовать, но, по крайней мере, это должно показать вам один из способов сделать это.

Теперь большая проблема: это МЕДЛЕННО.

Даже при использовании C ++ реализации python-protobuf он на порядок медленнее, чем в чистом C ++. У меня есть тест, в котором я читаю из файла 10M сообщений protobuf по ~ 30 байт каждое. Это занимает ~ 0,9 секунды в C ++ и 35 секунд в Python.

Один из способов сделать это немного быстрее - это повторно реализовать декодер varint, чтобы он читал из файла и декодировал за один раз, вместо чтения из файла, а затем декодирования, как этот код в настоящее время делает. (профилирование показывает, что значительное количество времени тратится на кодировщик / декодер варинтов). Но само собой разумеется, что этого недостаточно, чтобы сократить разрыв между версией на Python и версией C ++.

Любая идея сделать это быстрее приветствуется :)

person fireboot    schedule 31.12.2015
comment
Возникает общий вопрос, почему существуют разные реализации кодирования / декодирования в Java / Python / C ++. Я не понимаю, почему на C ++ нет базовой реализации, которая просто вызывается на Java / Python ... - person abergmeier; 09.01.2016
comment
Ваш код Python не работает при использовании Python3. Для работы decoder вам нужно будет читать байты вместо строк. - person abergmeier; 10.08.2016
comment
Да, этот код был написан для python 2, но его должно быть довольно легко адаптировать и заставить работать для python 3. Я отредактировал свой пост, чтобы указать, что этот код нацелен на python 2. - person fireboot; 10.08.2016
comment
Не могли бы вы подтвердить, что поток имеет тип StringIO в python - person py_newbie; 26.06.2018

Для полноты картины я публикую здесь последнюю версию, которая работает с основной версией protobuf и Python3.

Для версии C ++ достаточно использовать утилиты в delimited_message_utils.h, здесь MWE

#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/util/delimited_message_util.h>

#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>

template <typename T>
bool writeManyToFile(std::deque<T> messages, std::string filename) {
    int outfd = open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC);
    google::protobuf::io::FileOutputStream fout(outfd);

    bool success;
    for (auto msg: messages) {
        success = google::protobuf::util::SerializeDelimitedToZeroCopyStream(
            msg, &fout);
        if (! success) {
            std::cout << "Writing Failed" << std::endl;
            break;
        }
    }
    fout.Close();
    close(outfd);
    return success;
}

template <typename T>
std::deque<T> readManyFromFile(std::string filename) {
    int infd = open(filename.c_str(), O_RDONLY);

    google::protobuf::io::FileInputStream fin(infd);
    bool keep = true;
    bool clean_eof = true;
    std::deque<T> out;

    while (keep) {
        T msg;
        keep = google::protobuf::util::ParseDelimitedFromZeroCopyStream(
            &msg, &fin, nullptr);
        if (keep)
            out.push_back(msg);
    }
    fin.Close();
    close(infd);
    return out;
}

Для версии Python3, основанной на ответе @fireboot, единственное, что потребовало модификации, - это декодирование raw_varint32

def getSize(raw_varint32):
    result = 0
    shift = 0
    b = six.indexbytes(raw_varint32, 0)
    result |= ((ord(b) & 0x7f) << shift)
    return result

def readDelimitedFrom(MessageType, stream):
    raw_varint32 = readRawVarint32(stream)
    message = None

    if raw_varint32:
        size = getSize(raw_varint32)

        data = stream.read(size)
        if len(data) < size:
            raise Exception("Unexpected end of file")

        message = MessageType()
        message.ParseFromString(data)

    return message
person mariob6    schedule 27.11.2019
comment
Это решение заслуживает гораздо более высокой оценки. Более старые ответы не отражают, как библиотека protobuf C ++ развивалась для устранения очевидного недостатка. - person dag; 02.07.2021

Также искал решение для этого. Вот ядро ​​нашего решения, предполагая, что какой-то java-код записал много сообщений MyRecord с writeDelimitedTo в файл. Откройте файл и выполните цикл, выполнив:

if(someCodedInputStream->ReadVarint32(&bytes)) {
  CodedInputStream::Limit msgLimit = someCodedInputStream->PushLimit(bytes);
  if(myRecord->ParseFromCodedStream(someCodedInputStream)) {
    //do your stuff with the parsed MyRecord instance
  } else {
    //handle parse error
  }
  someCodedInputStream->PopLimit(msgLimit);
} else {
  //maybe end of file
}

Надеюсь, поможет.

person Kim Laurio    schedule 30.06.2011

Работая с версией буферов протокола objective-c, я столкнулся именно с этой проблемой. При отправке от клиента iOS на сервер на основе Java, который использует parseDelimitedFrom, который ожидает длину в качестве первого байта, мне нужно было сначала вызвать writeRawByte для CodedOutputStream. Публикуйте здесь, чтобы надеяться помочь другим, столкнувшимся с этой проблемой. Работая над этой проблемой, можно подумать, что прото-буферы Google будут иметь простой флаг, который сделает это за вас ...

    Request* request = [rBuild build];

    [self sendMessage:request];
} 


- (void) sendMessage:(Request *) request {

    //** get length
    NSData* n = [request data];
    uint8_t len = [n length];

    PBCodedOutputStream* os = [PBCodedOutputStream streamWithOutputStream:outputStream];
    //** prepend it to message, such that Request.parseDelimitedFrom(in) can parse it properly
    [os writeRawByte:len];
    [request writeToCodedOutputStream:os];
    [os flush];
}
person gp-coder    schedule 13.12.2013

Поскольку мне не разрешено писать это как комментарий к ответу Кентона Варды выше; Я считаю, что в опубликованном им коде (а также в других предоставленных ответах) есть ошибка. Следующий код:

...
google::protobuf::io::CodedInputStream input(rawInput);

// Read the size.
uint32_t size;
if (!input.ReadVarint32(&size)) return false;

// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
    input.PushLimit(size);
...

устанавливает неверный предел, потому что он не принимает во внимание размер varint32, который уже был прочитан из ввода. Это может привести к потере / повреждению данных, поскольку из потока считываются дополнительные байты, которые могут быть частью следующего сообщения. Обычный способ справиться с этим правильно - удалить CodedInputStream, используемый для чтения размера, и создать новый для чтения полезной нагрузки:

...
uint32_t size;
{
  google::protobuf::io::CodedInputStream input(rawInput);

  // Read the size.
  if (!input.ReadVarint32(&size)) return false;
}

google::protobuf::io::CodedInputStream input(rawInput);

// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
    input.PushLimit(size);
...
person ciphersimian    schedule 05.04.2016
comment
Это было бы верно только в том случае, если бы префикс размера включал свой собственный размер, чего нет. Если вы сделаете это, вы в конечном итоге не прочитаете все сообщение. - person tzaman; 05.04.2016
comment
Проблема заключается именно в том, что префикс размера не включает собственный размер. - person ciphersimian; 06.04.2016
comment
Префикс размера содержит в точности размер сообщения, которое следует за ним. Затем код переходит к чтению того количества байтов, которые содержат все сообщение. Где проблема? - person tzaman; 06.04.2016
comment
И исходный код, и версия, которую я опубликовал, работают нормально, и оказалось, что это не моя проблема. Моя проблема заключалась в том, что CodedInputStream неожиданно потреблял все данные из исходного буфера, даже если был установлен предел. Я пытался определить, сколько данных осталось, и CodedInputStream очень усложняет это. На C # мне помог этот вопрос: stackoverflow.com/questions/33733913/ - person ciphersimian; 08.04.2016

Вы можете использовать getline для чтения строки из потока, используя указанный разделитель:

istream& getline ( istream& is, string& str, char delim );

(определено в шапке)

person Jan    schedule 26.02.2010
comment
Не то же самое; буферы протокола - это двоичный формат, функции с разделителями фактически просто добавляют размер. Мне нужно знать формат префикса размера. - person tzaman; 26.02.2010