/* * * Copyright 2015 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #import "GRXConcurrentWriteable.h" #import <RxLibrary/GRXWriteable.h> @interface GRXConcurrentWriteable () // This is atomic so that cancellation can nillify it from any thread. @property(atomic, strong) id<GRXWriteable> writeable; @end @implementation GRXConcurrentWriteable { dispatch_queue_t _writeableQueue; // This ensures that writesFinishedWithError: is only sent once to the writeable. BOOL _alreadyFinished; } - (instancetype)init { return [self initWithWriteable:nil]; } // Designated initializer - (instancetype)initWithWriteable:(id<GRXWriteable>)writeable dispatchQueue:(dispatch_queue_t)queue { if (self = [super init]) { _writeableQueue = queue; _writeable = writeable; } return self; } - (instancetype)initWithWriteable:(id<GRXWriteable>)writeable { return [self initWithWriteable:writeable dispatchQueue:dispatch_get_main_queue()]; } - (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler { dispatch_async(_writeableQueue, ^{ // We're racing a possible cancellation performed by another thread. To turn all already- // enqueued messages into noops, cancellation nillifies the writeable property. If we get it // before it's nil, we won the race. id<GRXWriteable> writeable = self.writeable; if (writeable) { [writeable writeValue:value]; handler(); } }); } - (void)enqueueSuccessfulCompletion { __weak typeof(self) weakSelf = self; dispatch_async(_writeableQueue, ^{ typeof(self) strongSelf = weakSelf; if (strongSelf) { BOOL finished = NO; @synchronized(strongSelf) { if (!strongSelf->_alreadyFinished) { strongSelf->_alreadyFinished = YES; } else { finished = YES; } } if (!finished) { // Cancellation is now impossible. None of the other three blocks can run concurrently with // this one. [strongSelf.writeable writesFinishedWithError:nil]; // Skip any possible message to the wrapped writeable enqueued after this one. strongSelf.writeable = nil; } } }); } - (void)cancelWithError:(NSError *)error { NSAssert(error, @"For a successful completion, use enqueueSuccessfulCompletion."); BOOL finished = NO; @synchronized(self) { if (!_alreadyFinished) { _alreadyFinished = YES; } else { finished = YES; } } if (!finished) { // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to // nillify writeable because we might be running concurrently with the blocks in // _writeableQueue, and assignment with ARC isn't atomic. id<GRXWriteable> writeable = self.writeable; self.writeable = nil; dispatch_async(_writeableQueue, ^{ [writeable writesFinishedWithError:error]; }); } } - (void)cancelSilently { BOOL finished = NO; @synchronized(self) { if (!_alreadyFinished) { _alreadyFinished = YES; } else { finished = YES; } } if (!finished) { // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to // nillify writeable because we might be running concurrently with the blocks in // _writeableQueue, and assignment with ARC isn't atomic. self.writeable = nil; } } @end