Входы подавления дребезга Rx

Мне нужно устранить дребезг входного потока.

При первом появлении состояния 1 мне нужно подождать 5 секунд и проверить, было ли последнее состояние также 1. Только тогда у меня есть стабильный сигнал.

(time)  0-1-2-3-4-5-6-7-8-9
(state) 0-0-0-0-0-1-0-1-0-1
(result)                   -> 1

Вот пример нестабильного сигнала.

(time)  0-1-2-3-4-5-6-7-8-9
(state) 0-0-0-0-0-1-0-1-0-0
(result)                   -> 0

Я пытался использовать буфер, но буфер имеет фиксированную начальную точку, и мне нужно подождать 5 секунд, начиная с моего первого события.


person Louis Haußknecht    schedule 20.03.2013    source источник
comment
Какой бы вы хотели получить результат, если бы следующие состояния в примере 2 были 0-1?   -  person Matthew Finlay    schedule 21.03.2013
comment
Производится ли вывод каждую секунду? Я согласен с JerKilmball ниже, было бы полезно получить дополнительную информацию о вашем варианте использования.   -  person AlexFoxGill    schedule 21.03.2013


Ответы (2)


Воспринимать ваши требования буквально

При первом появлении состояния 1 мне нужно подождать 5 секунд и проверить, было ли последнее состояние также 1. Только тогда у меня есть стабильный сигнал.

Я могу предложить несколько способов решения этой проблемы. Чтобы прояснить мои предположения, вы просто хотите подтолкнуть последнее значение, полученное через 5 секунд после первого появления 1. Это приведет к тому, что последовательность с одним значением выдаст либо 0, либо 1 (т.е. независимо от любых дальнейших значений, созданных за 5 секунд). секунд от исходной последовательности)

Здесь я воссоздаю вам последовательность с некоторой ерундой.

var source = Observable.Timer(TimeSpan.Zero,TimeSpan.FromSeconds(1))
    .Take(10)
    .Select(i=>{if(i==5 || i==7 || i==9){return 1;}else{return 0;}}); //Should produce 1;
    //.Select(i=>{if(i==5 || i==7 ){return 1;}else{return 0;}});    //Should produce 0;

Все варианты ниже выглядят так, чтобы поделиться последовательностью. Чтобы безопасно поделиться последовательностью в Rx, мы Publish() и подключаем ее. Я использую автоматическое подключение через оператор RefCount().

var sharedSource = source.Publish().RefCount();

1) В этом решении мы берем первое значение 1, а затем буферизуем выбранные значения последовательности до размеров буфера 5 секунд. Мы берем только первый из этих буферов. Как только мы получим этот буфер, мы получим последнее значение и отправим его. Если буфер пуст, я предполагаю, что мы нажимаем единицу, поскольку последним значением была «1», которая запускала буфер.

sharedSource.Where(state=>state==1)
            .Take(1)
            .SelectMany(_=>sharedSource.Buffer(TimeSpan.FromSeconds(5)).Take(1))
            .Select(buffer=>
            {   
                if(buffer.Any())
                {
                    return buffer.Last();
                }
                else{
                    return 1;
                }
            })
            .Dump();

2) В этом решении я использую подход, чтобы начать слушать только после того, как мы получим действительное значение (1), а затем принять все значения, пока таймер не вызовет завершение. Отсюда мы берем последнее полученное значение.

var fromFirstValid = sharedSource.SkipWhile(state=>state==0);
fromFirstValid 
    .TakeUntil(
        fromFirstValid.Take(1)
                    .SelectMany(_=>Observable.Timer(TimeSpan.FromSeconds(5))))
    .TakeLast(1)
    .Dump();

3) В этом решении я использую оператор окна для создания одного окна, которое открывается, когда происходит первое значение «1», а затем закрывается по истечении 5 секунд. Снова мы просто берем последнее значение

sharedSource.Window(
                sharedSource.Where(state=>state==1),
                _=>Observable.Timer(TimeSpan.FromSeconds(5)))
            .SelectMany(window=>window.TakeLast(1))
            .Take(1)
            .Dump();

Так много разных способов содрать шкуру с кошки.

person Lee Campbell    schedule 22.03.2013
comment
+1 - Хороший разброс ответов! Знаешь, я вставляю ссылку на твой пост в Window/Join/etc примерно три раза в гребаную неделю... :) - person JerKimball; 22.03.2013
comment
Уууу!! Я очень рад, что это приносит вам пользу. - person Lee Campbell; 22.03.2013
comment
Большой! Я использовал решение 2. Если вы не используете Rx каждый день, трудно переключить свой мозг в режим Rx. Спасибо! - person Louis Haußknecht; 08.04.2013

Звучит (на первый взгляд), как будто вы хотите Throttle, а не Buffer, хотя дополнительная информация о ваших вариантах использования поможет определить это — во всяком случае, вот как вы можете Throttle свой поток:

void Main()
{
    var subject = new Subject<int>();
    var source = subject.Publish().RefCount();

    var query = source
        // Start counting on a 1, wait 5 seconds, and take the last value
        .Throttle(x => Observable.Timer(TimeSpan.FromSeconds(5)));

    using(query.Subscribe(Console.WriteLine))
    {
        // This sequence should produce a one
        subject.OnNext(1);
        subject.OnNext(0);
        subject.OnNext(1);
        subject.OnNext(0);
        subject.OnNext(1);
        subject.OnNext(1);
        Console.ReadLine();
        // This sequence should produce a zero
        subject.OnNext(0);
        subject.OnNext(0);
        subject.OnNext(0);
        subject.OnNext(0);
        subject.OnNext(1);
        subject.OnNext(0);
        Console.ReadLine();
    }
}
person JerKimball    schedule 21.03.2013