Как условно буферизовать значения RACSignal?

Я работаю над кодом, который взаимодействует с удаленным API через веб-сокеты. Мой уровень данных отвечает за установление и мониторинг соединения через веб-сокет. Он также содержит методы, которые могут использоваться приложением для постановки в очередь сообщений веб-сокета для отправки. Код приложения не должен нести ответственность за проверку состояния подключения к веб-сокету, также известного как «выстрелил-забыл».

В идеале я хотел бы, чтобы уровень данных функционировал следующим образом:

  • Когда уровень данных не имеет подключения к конечной точке веб-сокета (self.isConnected == NO), сообщения буферизуются внутри.
  • Когда соединение становится доступным (self.isConnected == YES), буферизованные сообщения отправляются немедленно, и все последующие сообщения отправляются немедленно.

Вот что мне удалось придумать:

#import "RACSignal+Buffering.h"

@implementation RACSignal (Buffering)

- (RACSignal*)bufferWithSignal:(RACSignal*)shouldBuffer
{
    return [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {

        RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

        NSMutableArray* bufferedValues = [[NSMutableArray alloc] init];
        __block BOOL buffering = NO;

        void (^bufferHandler)() = ^{
            if (!buffering)
            {
                for (id val in bufferedValues)
                {
                    [subscriber sendNext:val];
                }

                [bufferedValues removeAllObjects];
            }
        };

        RACDisposable* bufferDisposable = [shouldBuffer subscribeNext:^(NSNumber* shouldBuffer) {

            buffering = shouldBuffer.boolValue;
            bufferHandler();

        }];

        if (bufferDisposable)
        {
            [disposable addDisposable:bufferDisposable];
        }

        RACDisposable* valueDisposable = [self subscribeNext:^(id x) {

            [bufferedValues addObject:x];
            bufferHandler();

        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            [subscriber sendCompleted];
        }];

        if (valueDisposable)
        {
            [disposable addDisposable:valueDisposable];
        }

        return disposable;
    }];
}

@end

Наконец, это псевдокод того, как он будет использоваться:

@interface APIManager ()

@property (nonatomic) RACSubject* requests;

@end

@implementation WebsocketDataLayer

- (id)init
{
    self = [super init];

    if (self) {

        RACSignal* connectedSignal = RACObserve(self, connected);

        self.requests = [[RACSubject alloc] init];

        RACSignal* bufferedApiRequests = [self.requests bufferWithSignal:connectedSignal];

        [self rac_liftSelector:@selector(sendRequest:) withSignalsFromArray:@[bufferedApiRequests]];
    }
    return self;
}

- (void)enqueueRequest:(NSString*)request
{
    [self.requests sendNext:request];
}

- (void)sendRequest:(NSString*)request
{
    DebugLog(@"Making websocket request: %@", request);
}

@end

Мой вопрос: правильный ли это подход для буферизации значений? Есть ли более идиоматический способ RAC справиться с этим?


person Matt Hupman    schedule 22.10.2013    source источник


Ответы (2)


Буферизацию можно рассматривать как то, что применяется к отдельным запросам, что приводит к естественной реализации с использованием -flattenMap: и RACObserve:

@weakify(self);
RACSignal *bufferedRequests = [self.requests flattenMap:^(NSString *request) {
    @strongify(self);

    // Waits for self.connected to be YES, or checks that it already is,
    // then forwards the request.
    return [[[[RACObserve(self, connected)
        ignore:@NO]
        take:1]
        // Replace the property value with our request.
        mapReplace:request];
}];

Если порядок важен, вы можете заменить -flattenMap: на -map: плюс -concat. Эти реализации позволяют избежать необходимости в каких-либо пользовательских операторах и работают без ручных подписок (которые, как известно, запутаны).

person Justin Spahr-Summers    schedule 23.10.2013
comment
Вариант map + concat будет примерно таким, если я не ошибаюсь: rel="nofollow noreferrer">github.com/ReactiveCocoa/ReactiveCocoa/blob/ - person allprog; 24.10.2013
comment
Отличный ответ. Просто из любопытства: не слишком ли это расточительно для буферизации с более высокой пропускной способностью? Может ли это быть причиной того, что bufferWithTime: не был реализован с точки зрения этого подхода? - person allprog; 24.10.2013
comment
@allprog Это может быть немного медленнее, чем такая реализация, как -bufferWithTime:, но это не должно быть заметно. Его можно оптимизировать, вытащив RACObserve и используя -replayLast на нем. Также обратите внимание, что -bufferWithTime: гарантирует отправку полного буфера по истечении периода (внося дополнительную сложность, особенно вокруг завершения), тогда как здесь это не очень важно. - person Justin Spahr-Summers; 24.10.2013
comment
Очень круто! У меня было ощущение, что должен быть способ, которым RAC справляется с этим. На первый взгляд это казалось расточительным из-за многократного создания self.connected RACSignal. Однако я предполагаю, что семантика RACDisposable take: справляется с очисткой? - person Matt Hupman; 24.10.2013
comment
@MattHupman Я бы начал с того, что не беспокоился об этом. Однако, если вы делаете начинаете замечать проблемы с производительностью, вы можете сделать, как я упоминал выше: переместить RACObserve из блока и вызвать на нем -replayLast. Это гарантирует, что наблюдение KVO произойдет только один раз, и подписчики будут делиться всеми значениями этого наблюдения. - person Justin Spahr-Summers; 24.10.2013

Вы делаете почти то же самое, что реализовано в bufferWithTime:, и я не могу придумать ни одной из существующих операций, которые реализовали бы ее более идиоматически. (Возможно, именно поэтому bufferWithTime был реализован таким образом.) Анализ вашего кода с использованием этой реализации может выявить некоторые ошибки, о которых вы не подумали.

Но, честно говоря, это не должно быть так сложно. Должна существовать операция буферизации, которая буферизует вывод и извергает содержимое при срабатывании триггерного сигнала. Вероятно, большая часть буферизации может быть реализована с точки зрения этой функциональности, поэтому ее наличие добавило бы ценности фреймворку.

person allprog    schedule 23.10.2013