Как реализовать ленивый перечислитель фрагментов потока?

Я пытаюсь разбить поток байтов на куски увеличивающегося размера.

Исходный поток содержит неизвестное количество байтов и дорого читается. На выходе перечислителя должны быть массивы байтов увеличивающегося размера, начиная с 8 КБ до 1 МБ.

Это очень просто сделать, просто прочитав весь поток, сохранив его в массиве и извлекая соответствующие фрагменты. Однако, поскольку поток может быть очень большим, чтение его сразу невозможно. Кроме того, хотя производительность не является главной проблемой, важно поддерживать очень низкую нагрузку на систему.

При реализации этого я заметил, что относительно сложно сделать код коротким и удобным для сопровождения. Также следует помнить о нескольких проблемах, связанных с потоком (например, Stream.Read может не заполнить буфер, даже если он был выполнен успешно).

Я не нашел ни одного из существующих классов, которые помогли бы в моем случае, и не смог найти что-то близкое в сети. Как бы вы реализовали такой класс?


person mafu    schedule 06.02.2012    source источник
comment
Будете ли вы повторять его более одного раза? Может ли итератор возвращать потоки или ему нужно возвращать byte[]s? Если это потоки, будут ли они полностью прочитаны, прежде чем вы пойдете дальше? Если это так, было бы довольно легко написать итератор для разбиения исходного потока на более мелкие потоки, используя только небольшой буфер. В качестве альтернативы, знаете ли вы размер фрагмента в начале, то есть можете ли вы выделить буфер и прочитать весь фрагмент за один раз, или вам нужно просмотреть его, например, найти конечный маркер?   -  person Rup    schedule 06.02.2012
comment
@Rup: повторяется только один раз. Должен быть байт[]. Весь исходный поток слишком велик для полного чтения, но отдельные фрагменты поместятся в памяти. Все размеры и позиции фрагментов известны, но фактическая длина исходного потока неизвестна (конечный фрагмент, вероятно, будет меньше предполагаемого размера).   -  person mafu    schedule 06.02.2012


Ответы (2)


public IEnumerable<BufferWrapper> getBytes(Stream stream)
{
    List<int> bufferSizes = new List<int>() { 8192, 65536, 220160, 1048576 };
    int count = 0;
    int bufferSizePostion = 0;
    byte[] buffer = new byte[bufferSizes[0]];
    bool done = false;
    while (!done)
    {
        BufferWrapper nextResult = new BufferWrapper();
        nextResult.bytesRead = stream.Read(buffer, 0, buffer.Length);
        nextResult.buffer = buffer;
        done = nextResult.bytesRead == 0;
        if (!done)
        {
            yield return nextResult;
            count++;
            if (count > 10 && bufferSizePostion < bufferSizes.Count)
            {
                count = 0;
                bufferSizePostion++;
                buffer = new byte[bufferSizes[bufferSizePostion]];
            }
        }
    }
}

public class BufferWrapper
{
    public byte[] buffer { get; set; }
    public int bytesRead { get; set; }
}

Очевидно, что логика того, когда увеличить размер буфера и как выбрать этот размер, может быть изменена.

Кто-то, вероятно, также мог бы найти лучший способ обработки последнего отправляемого буфера, так как это не самый эффективный способ.

person Servy    schedule 06.02.2012
comment
Мне просто пришло в голову, что вы можете создать свою собственную оболочку для массива байтов, которая хранит как массив байтов, так и фактическую длину (реальных данных), и возвращать ее каждый раз. Это означало бы, что вам не нужно будет обрабатывать копию массива для последнего частичного буфера. - person Servy; 06.02.2012
comment
Существует ошибка в потоке.Чтение: только для чтения сигнализирует EOF, возвращая 0 (не bytesRead ‹ buffer.Length). - person mafu; 06.02.2012
comment
+1, а зачем копировать буферы? почему бы просто не вернуть буфер, в который вы его прочитали, и всегда выделять новый? - person Rup; 06.02.2012
comment
Но это достаточно легко исправить — повторите попытку чтения, чтобы заполнить оставшуюся часть буфера, пока либо буфер не будет заполнен, либо не будет возвращено 0 байтов. - person Rup; 06.02.2012
comment
@Rup: потому что количество прочитанных байтов может быть меньше, поэтому требуется меньший буфер. - person mafu; 06.02.2012
comment
@Rup: Верно. Я галлюцинировал, что нужно читать лишние байты, но вы можете очень легко ограничить это (размер буфера минус существующий размер) - person mafu; 06.02.2012
comment
Я исправил несколько вещей; включил мой собственный комментарий и тот факт, что частичный буфер не означает EOF. Было также несколько других ошибок, которые я заметил и устранил. - person Servy; 06.02.2012

Для справки, реализация, которую я сейчас использую, уже с улучшениями согласно ответу @Servy

private const int InitialBlockSize = 8 * 1024;
private const int MaximumBlockSize = 1024 * 1024;

private Stream _Stream;
private int _Size = InitialBlockSize;

public byte[] Current
{
    get;
    private set;
}

public bool MoveNext ()
{
    if (_Size < 0) {
        return false;
    }

    var buf = new byte[_Size];
    int count = 0;

    while (count < _Size) {
        int read = _Stream.Read (buf, count, _Size - count);

        if (read == 0) {
            break;
        }

        count += read;
    }

    if (count == _Size) {
        Current = buf;
        if (_Size <= MaximumBlockSize / 2) {
            _Size *= 2;
        }
    }
    else {
        Current = new byte[count];
        Array.Copy (buf, Current, count);
        _Size = -1;
    }

    return true;
}
person mafu    schedule 06.02.2012