Поток данных задачи, можно ли изменить блок данных из состояния завершения?

Я хотел бы знать, можно ли изменить состояние завершения блоков данных?

Например, я пометил блок данных var block = new BufferBlock<int>(); в комплекте с block.Complete(). Блок связан с другими блоками данных. Я хотел бы знать, могу ли я снова запустить block, изменив его состояние завершения на исходное состояние !complete.

Если это невозможно, как я могу выполнить несколько прогонов, включая завершение, без необходимости а) разъединять все блоки, б) повторно создавать все блоки и в) повторно связывать все блоки снова?

Любые идеи, которые могут упростить задачу запуска циклов потока данных, включая завершение каждого блока в цепочке без необходимости воссоздавать всю структуру?

Спасибо


person Matt    schedule 12.04.2013    source источник
comment
Зачем вообще нужны отдельные прогоны? Кроме того, почему вы не хотите воссоздать сеть потока данных? Я сомневаюсь, что это повлияет на производительность (если вы не будете делать это много раз в секунду), и это также не должно сильно усложнять ваш код.   -  person svick    schedule 12.04.2013
comment
Отдельные прогоны являются частью требования. Повторное создание всей сети полностью сведет на нет эффективность, достигнутую за счет выбора потока данных tpl. Основная головная боль — это отвязка и пересвязывание, не столько пересоздание каждого блока потока данных. Если изменение состояния завершения невозможно, как иначе я мог бы сигнализировать о завершении, не влияя на состояние завершения каждого блока потока данных? Я не могу просто отправить атомарный флаг, потому что типы объектов в цепочке не идентичны (есть задействованные блоки преобразования).   -  person Matt    schedule 12.04.2013


Ответы (1)


Нет, вы не можете «отменить завершение» завершенного блока потока данных. Я думаю, что вы должны сделать, это добавить флаг к каждому сообщению, который говорит, является ли это последним сообщением в прогоне. Чтобы упростить работу с ним, вы можете создать набор вспомогательных методов, таких как:

public static TransformBlock<Tuple<TInput, bool>, Tuple<TOutput, bool>>
    CreateEnhancedTransformBlock<TInput, TOutput>(Func<TInput, TOutput> transform)
{
    return new TransformBlock<Tuple<TInput, bool>, Tuple<TOutput, bool>>(
        tuple => Tuple.Create(transform(tuple.Item1), tuple.Item2));
}

Таким образом, вы вводите делегата transform, который имеет дело только с TInput и TOuput, и флаг передается вместе с каждым сообщением.

person svick    schedule 13.04.2013
comment
Интересная идея, не могли бы вы прокомментировать вычислительные затраты во время выполнения, выбрав этот путь? Пока я вижу только дополнительную проверку флага завершения блока получателя для каждого входящего сообщения. Что-то еще я опускаю? Было бы дороже отправить enum, а не bool? Таким образом, я мог сообщать о других состояниях, а не только о завершении. - person Matt; 14.04.2013
comment
@Freddy Могут быть небольшие накладные расходы, поскольку есть два вызова делегата вместо одного, но это должно быть действительно незначительным. И я думаю, что замена bool перечислением также не должна повлиять на производительность. - person svick; 14.04.2013
comment
определенно хорошая идея решить эту проблему, но в конце концов я последовал вашему первому совету: я просматриваю функцию аккуратного завершения блоков данных и воссоздаю весь конвейер. Я предоставляю обратные вызовы восходящему блоку данных, содержащему модули, и после завершения каждого блока данных он воссоздает то же самое и передает его через обратный вызов нисходящему потоку, где два блока повторно связаны. - person Matt; 15.04.2013
comment
Я также считаю, что добавление системы индексации полезно для идентификации прогонов или пакетов данных. Это делает так, что прогоны не должны быть полностью изолированы. - person VoteCoffee; 04.02.2014