Skip to content
This repository has been archived by the owner on Jan 24, 2023. It is now read-only.

Commit

Permalink
Merge pull request #27 from UrbanCompass/threads
Browse files Browse the repository at this point in the history
Add ability to specify callback thread
  • Loading branch information
russellbstephens authored Dec 13, 2016
2 parents 6dc3364 + 09ed80a commit 3011719
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 16 deletions.
28 changes: 27 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,35 @@ let control = UIControl()
control.controlEvent(.touchUpInside).subscribe(
onNext: { ... } // do something with thing
)

let button = UIButton()
button.tap.subscribe(
onNext: { ... } // do something with thing
)
```

# Queues

You can specify which queue an observables will be notified on by using `.subscribe(queue: <desired queue>)`. If you don't specify, then the observable will be notified on the same queue that the observable published on.

There are 3 scenarios:

1. You don't specify the queue. Your observer will be notified on the same thread as the observable published on.

2. You specified `main` queue AND the observable published on the `main` queue. Your observer will be notified synchronously on the `main` queue.

3. You specified a queue. Your observer will be notified async on the specified queue.

### Examples

Subscribing on `DispatchQueue.main`

```swift
observable.subscribe(queue: .main,
onNext: { thing in ... }
)

observable.subscribe(queue: .main) { event in
...
}
```
4 changes: 2 additions & 2 deletions Snail/Fail.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ public class Fail<T>: Observable<T> {
self.error = error
}

public override func subscribe(_ handler: @escaping (Event<T>) -> Void) {
public override func subscribe(queue: DispatchQueue? = nil, _ handler: @escaping (Event<T>) -> Void) {
handler(.error(error))
}

public override func subscribe(onNext: ((T) -> Void)?, onError: ((Error) -> Void)?, onDone: (() -> Void)?) {
public override func subscribe(queue: DispatchQueue? = nil, onNext: ((T) -> Void)?, onError: ((Error) -> Void)?, onDone: (() -> Void)?) {
onError?(error)
}
}
4 changes: 2 additions & 2 deletions Snail/Just.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ public class Just<T>: Observable<T> {
super.init()
}

public override func subscribe(_ handler: @escaping (Event<T>) -> Void) {
public override func subscribe(queue: DispatchQueue? = nil, _ handler: @escaping (Event<T>) -> Void) {
handler(.next(value))
handler(.done)
}

public override func subscribe(onNext: ((T) -> Void)? = nil, onError: ((Error) -> Void)? = nil, onDone: (() -> Void)? = nil) {
public override func subscribe(queue: DispatchQueue? = nil, onNext: ((T) -> Void)? = nil, onError: ((Error) -> Void)? = nil, onDone: (() -> Void)? = nil) {
onNext?(value)
onDone?()
}
Expand Down
29 changes: 22 additions & 7 deletions Snail/Observable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
public class Observable<T> : ObservableType {
public typealias E = T
private var isStopped: Int32 = 0
var eventHandlers: [(Event<E>) -> Void] = []
var eventHandlers: [(queue: DispatchQueue?, handler: (Event<E>) -> Void)] = []

public init() {}

public func subscribe(_ handler: @escaping (Event<E>) -> Void) {
eventHandlers.append(handler)
public func subscribe(queue: DispatchQueue? = nil, _ handler: @escaping (Event<E>) -> Void) {
eventHandlers.append((queue, handler))
}

func createHandler(onNext: ((T) -> Void)? = nil, onError: ((Error) -> Void)? = nil, onDone: (() -> Void)? = nil) -> (Event<E>) -> Void {
Expand All @@ -21,8 +21,8 @@ public class Observable<T> : ObservableType {
}
}

public func subscribe(onNext: ((T) -> Void)? = nil, onError: ((Error) -> Void)? = nil, onDone: (() -> Void)? = nil) {
eventHandlers.append(createHandler(onNext: onNext, onError: onError, onDone: onDone))
public func subscribe(queue: DispatchQueue? = nil, onNext: ((T) -> Void)? = nil, onError: ((Error) -> Void)? = nil, onDone: (() -> Void)? = nil) {
eventHandlers.append((queue, createHandler(onNext: onNext, onError: onError, onDone: onDone)))
}

public func on(_ event: Event<E>) {
Expand All @@ -31,10 +31,25 @@ public class Observable<T> : ObservableType {
guard isStopped == 0 else {
return
}
eventHandlers.forEach { $0(event) }
eventHandlers.forEach { (queue, handler) in fire(queue: queue, handler: handler, event: event) }
case .error, .done:
if OSAtomicCompareAndSwap32Barrier(0, 1, &isStopped) {
eventHandlers.forEach { $0(event) }
eventHandlers.forEach { (queue, handler) in fire(queue: queue, handler: handler, event: event) }
}
}
}

private func fire(queue: DispatchQueue?, handler: @escaping (Event<E>) -> Void, event: Event<E>) {
guard let queue = queue else {
handler(event)
return
}

if queue == DispatchQueue.main && Thread.isMainThread {
handler(event)
} else {
queue.async {
handler(event)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions Snail/ObservableType.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

public protocol ObservableType {
associatedtype E
func subscribe(_ handler: @escaping (Event<E>) -> Void)
func subscribe(onNext: ((Self.E) -> Void)?, onError: ((Error) -> Void)?, onDone: (() -> Void)?)
func subscribe(queue: DispatchQueue?, _ handler: @escaping (Event<E>) -> Void)
func subscribe(queue: DispatchQueue?, onNext: ((Self.E) -> Void)?, onError: ((Error) -> Void)?, onDone: (() -> Void)?)
func on(_ event: Event<E>)
}
4 changes: 2 additions & 2 deletions Snail/Replay.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ public class Replay<T>: Observable<T> {
self.threshold = threshold
}

public override func subscribe(_ handler: @escaping (Event<E>) -> Void) {
public override func subscribe(queue: DispatchQueue? = nil, _ handler: @escaping (Event<E>) -> Void) {
super.subscribe(handler)
replay(handler)
}

public override func subscribe(onNext: ((T) -> Void)? = nil, onError: ((Error) -> Void)? = nil, onDone: (() -> Void)? = nil) {
public override func subscribe(queue: DispatchQueue? = nil, onNext: ((T) -> Void)? = nil, onError: ((Error) -> Void)? = nil, onDone: (() -> Void)? = nil) {
super.subscribe(onNext: onNext, onError: onError, onDone: onDone)
replay(createHandler(onNext: onNext, onError: onError, onDone: onDone))
}
Expand Down
58 changes: 58 additions & 0 deletions SnailTests/ObservableTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,62 @@ class ObservableTests: XCTestCase {
subject?.on(.next("1"))
XCTAssert(strings?.first == more.first)
}

func testSubscribeOnMainThread() {
var isMainQueue = false
let exp = expectation(description: "queue")

DispatchQueue.global().async {
self.subject?.subscribe(queue: .main, onNext: { string in
exp.fulfill()
isMainQueue = Thread.isMainThread
})
self.subject?.on(.next("1"))
}

waitForExpectations(timeout: 2) { error in
XCTAssertNil(error)
XCTAssert(isMainQueue)
}
}

func testSubscribeOnMainThreadNotifiedOnMain() {
var isMainQueue = false
let exp = expectation(description: "queue")

DispatchQueue.global().async {
self.subject?.subscribe(queue: .main, onNext: { string in
exp.fulfill()
isMainQueue = Thread.isMainThread
})
DispatchQueue.main.async {
self.subject?.on(.next("1"))
}
}

waitForExpectations(timeout: 2) { error in
XCTAssertNil(error)
XCTAssert(isMainQueue)
}
}

func testSubscribeOnMainUsingEvent() {
var isMainQueue = false
let exp = expectation(description: "queue")

DispatchQueue.global().async {
self.subject?.subscribe(queue: .main) { event in
exp.fulfill()
isMainQueue = Thread.isMainThread
}
DispatchQueue.main.async {
self.subject?.on(.next("1"))
}
}

waitForExpectations(timeout: 2) { error in
XCTAssertNil(error)
XCTAssert(isMainQueue)
}
}
}

0 comments on commit 3011719

Please sign in to comment.