Skip to content

Commit

Permalink
Merge pull request #27 from pbk20191/develop
Browse files Browse the repository at this point in the history
prepare for xcode 15 and swift 5.9
  • Loading branch information
pbk20191 authored Jun 17, 2023
2 parents 13e7573 + 34ed4e3 commit a1ce051
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 77 deletions.
2 changes: 1 addition & 1 deletion Sources/Tetra/AsyncScope/InvalidTaskScope.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ struct InvalidTaskScope: TaskScopeProtocol {
var isCancelled:Bool { true }

@usableFromInline
func launch(operation: @escaping Job) -> Bool {
func launch(operation: @escaping PendingWork) -> Bool {
print("InvalidTaskScope never launch")
return false
}
Expand Down
13 changes: 4 additions & 9 deletions Sources/Tetra/AsyncScope/JobSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import Foundation

@usableFromInline
struct JobSequence: Sendable, AsyncSequence {

@usableFromInline
func makeAsyncIterator() -> AsyncIterator {
stream.makeAsyncIterator()
Expand All @@ -25,14 +25,9 @@ struct JobSequence: Sendable, AsyncSequence {
private let continuation:AsyncStream<Element>.Continuation

init() {
var reference: AsyncStream<Element>.Continuation? = nil
let semaphore = DispatchSemaphore(value: 0)
stream = .init{ continuation in
reference = continuation
semaphore.signal()
}
semaphore.wait()
continuation = reference.unsafelyUnwrapped
let (source, ref) = AsyncStream<Element>.makeStream()
stream = source
continuation = ref
}

func append(job: __owned @escaping Element) -> Bool {
Expand Down
152 changes: 87 additions & 65 deletions Sources/Tetra/AsyncScope/StandaloneTaskScope.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import Foundation
import os

import _Concurrency

/**
Wrapper object which act like CoroutineScope in Coroutine.
Expand Down Expand Up @@ -94,58 +94,42 @@ public struct StandaloneTaskScope: TaskScopeProtocol {
}
}
task = creator(priority) {
// TODO: use withDiscardingTaskGroup
await withTaskGroup(of: Void.self) { group in
group.addTask(priority: .background) {
let lock = createCheckedStateLock(checkedState: TaskCancellationState.waiting)
await withTaskCancellationHandler {
await withUnsafeContinuation{ continuation in

let snapShot = lock.withLock{
let oldValue = $0
switch oldValue {
case .cancelled:
break
case .suspending:
assertionFailure("unexpected state")
fallthrough
case .waiting:
$0 = .suspending(continuation)

}
return oldValue
}
switch snapShot {
case .cancelled:
continuation.resume()
case .waiting:
break
case .suspending(let unsafeContinuation):
unsafeContinuation.resume()
}
}
} onCancel: {
lock.withLock{
$0.take()
}?.resume()
if #available(macOS 13.3, iOS 16.4, watchOS 9.4, tvOS 16.4, *) {
await withDiscardingTaskGroup { group in
group.addTask(priority: .background) {
await suspendUntilCancelled()
}
for await operation in sequence {
group.addTask(operation: operation)
}
}

var childIterator = group.makeAsyncIterator()
let stream = AsyncStream<Void> {
await childIterator.next()
} onCancel: {
sequence.finish()
}
// consume finished child Task to be released
async let iteratingTask: Void = await {
for await _ in stream { }
}()

for await operation in sequence {
group.addTask(operation: operation)
} else {
await withTaskGroup(of: Void.self) { group in
group.addTask(priority: .background) {
await suspendUntilCancelled()
}

var childIterator = group.makeAsyncIterator()
let stream = AsyncStream<Void> {
await childIterator.next()
} onCancel: {
sequence.finish()
}
// consume finished child Task to be released
async let iteratingTask: Void = await {
for await _ in stream {
print("release")
}
}()

for await operation in sequence {
group.addTask(operation: operation)
// await Task.yield()
}
await iteratingTask
// release all the childTask which is not release from `stream` caused by early Task cancellation
//for await _ in group { print("release2") }
}
await iteratingTask
}

}
Expand All @@ -154,27 +138,65 @@ public struct StandaloneTaskScope: TaskScopeProtocol {
cancellable = nil
}

public func launch(operation: __owned @escaping Job) -> Bool {
public func launch(operation: __owned @escaping PendingWork) -> Bool {
buffer.append(job: operation)
}


private enum TaskCancellationState: Sendable {

case waiting
case suspending(UnsafeContinuation<Void,Never>)
case cancelled

mutating func take() -> UnsafeContinuation<Void,Never>? {
if case let .suspending(continuation) = self {
self = .cancelled
return continuation
} else {
self = .cancelled
return nil


}

private enum TaskCancellationState: Sendable {

case waiting
case suspending(UnsafeContinuation<Void,Never>)
case cancelled

mutating func take() -> UnsafeContinuation<Void,Never>? {
if case let .suspending(continuation) = self {
self = .cancelled
return continuation
} else {
self = .cancelled
return nil
}
}

}

private func suspendUntilCancelled() async {
let lock = createCheckedStateLock(checkedState: TaskCancellationState.waiting)
await withTaskCancellationHandler {
await withUnsafeContinuation{ continuation in

let snapShot = lock.withLock{
let oldValue = $0
switch oldValue {
case .cancelled:
break
case .suspending:
assertionFailure("unexpected state")
fallthrough
case .waiting:
$0 = .suspending(continuation)

}
return oldValue
}
switch snapShot {
case .cancelled:
continuation.resume()
case .waiting:
break
case .suspending(let unsafeContinuation):
unsafeContinuation.resume()
}
}

} onCancel: {
lock.withLock{
$0.take()
}?.resume()
}
}

5 changes: 4 additions & 1 deletion Sources/Tetra/AsyncScope/TaskScopeProtocol.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import Foundation

public protocol TaskScopeProtocol: Sendable, Hashable {

typealias Job = @Sendable () async -> Void
typealias PendingWork = @Sendable () async -> Void

@available(*, deprecated, renamed: "PendingJob", message: "Job is deprecated since Swift has own type named Job")
typealias Job = PendingWork

/**
submit the Job to this `TaskScope`. Job will be executed on the next possible opportunity unless it's cancelled.
Expand Down
2 changes: 1 addition & 1 deletion Sources/Tetra/AsyncScope/ViewBoundTaskScope.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public struct ViewBoundTaskScope: TaskScopeProtocol {

@discardableResult
@inlinable
public func launch(operation: @escaping Job) -> Bool {
public func launch(operation: @escaping PendingWork) -> Bool {
switch scope {
case .invalid:
return InvalidTaskScope().launch(operation: operation)
Expand Down

0 comments on commit a1ce051

Please sign in to comment.