Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] pipeline gets storage #487

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Sources/Verge/Derived/Derived.swift
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,12 @@ public class Derived<Value: Equatable>: Store<Value, Never>, DerivedType, @unche
retainsUpstream: Any?
) where Pipeline.Input == UpstreamState, Value == Pipeline.Output {

let pipelineStorage: Pipeline.Storage = pipeline.makeStorage()

weak var indirectSelf: Derived<Value>?

let s = subscribeUpstreamState { value in
let update = pipeline.yieldContinuously(value)
let update = pipeline.yieldContinuously(value, storage: pipelineStorage)
switch update {
case .noUpdates:
break
Expand All @@ -135,7 +137,7 @@ public class Derived<Value: Equatable>: Store<Value, Never>, DerivedType, @unche
self._set = set
super.init(
name: name,
initialState: pipeline.yield(initialUpstreamState),
initialState: pipeline.yield(initialUpstreamState, storage: pipelineStorage),
logger: nil,
sanitizer: nil
)
Expand Down
198 changes: 146 additions & 52 deletions Sources/Verge/Store/Pipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,22 @@ public protocol PipelineType<Input, Output> {

associatedtype Input
associatedtype Output

associatedtype Storage: Sendable = Void

func makeStorage() -> Storage

/// Yields the output from the input.
func yield(_ input: Input) -> Output
func yield(_ input: Input, storage: Storage) -> Output

/// Yields the output from the input if it's needed
func yieldContinuously(_ input: Input) -> ContinuousResult<Output>

func yieldContinuously(_ input: Input, storage: Storage) -> ContinuousResult<Output>

}

extension PipelineType where Storage == Void {
public func makeStorage() -> Void {
()
}
}

/**
Expand All @@ -57,6 +66,8 @@ public enum Pipelines {
/// KeyPath based pipeline, light weight operation just take value from source.
public struct ChangesSelectPassthroughPipeline<Source: Equatable, Output: Equatable>: PipelineType {

public typealias Storage = Void

public typealias Input = Changes<Source>

public let selector: (borrowing Input.Value) -> Output
Expand All @@ -67,23 +78,25 @@ public enum Pipelines {
self.selector = selector
}

public func yieldContinuously(_ input: Input) -> ContinuousResult<Output> {
public func yieldContinuously(_ input: Input, storage: Storage) -> ContinuousResult<Output> {

let target = input._read(perform: selector)

return .new(consume target)

}

public func yield(_ input: Input) -> Output {
public func yield(_ input: Input, storage: Storage) -> Output {
input._read(perform: selector)
}

}

/// KeyPath based pipeline, light weight operation just take value from source.
public struct ChangesSelectPipeline<Source: Equatable, Output: Equatable>: PipelineType {


public typealias Storage = Void

public typealias Input = Changes<Source>

public let selector: (borrowing Input.Value) -> Output
Expand All @@ -97,8 +110,8 @@ public enum Pipelines {
self.additionalDropCondition = additionalDropCondition
}

public func yieldContinuously(_ input: Input) -> ContinuousResult<Output> {
public func yieldContinuously(_ input: Input, storage: Storage) -> ContinuousResult<Output> {

guard let previous = input.previous else {
return .new(input._read(perform: selector))
}
Expand All @@ -120,7 +133,7 @@ public enum Pipelines {

}

public func yield(_ input: Input) -> Output {
public func yield(_ input: Input, storage: Storage) -> Output {
input._read(perform: selector)
}

Expand All @@ -140,6 +153,8 @@ public enum Pipelines {
/// Closure based pipeline,
public struct ChangesMapPipeline<Source: Equatable, Intermediate, Output: Equatable>: PipelineType {

public typealias Storage = Void

public typealias Input = Changes<Source>

// MARK: - Properties
Expand All @@ -160,10 +175,10 @@ public enum Pipelines {

// MARK: - Functions

public func yieldContinuously(_ input: Input) -> ContinuousResult<Output> {
public func yieldContinuously(_ input: Input, storage: Storage) -> ContinuousResult<Output> {

guard let previous = input.previous else {
return .new(yield(input))
return .new(yield(input, storage: storage))
}

guard previous.primitive == input.primitive else {
Expand Down Expand Up @@ -195,7 +210,7 @@ public enum Pipelines {
return .noUpdates
}

public func yield(_ input: Input) -> Output {
public func yield(_ input: Input, storage: Storage) -> Output {
transform(intermediate(input.primitive).value)
}

Expand All @@ -211,49 +226,128 @@ public enum Pipelines {
)
}
}

public struct BasicMapPipeline<Input: Equatable, Output: Equatable>: PipelineType {

// MARK: - Properties

public let map: (Input) -> Output
public let additionalDropCondition: ((Input) -> Bool)?

public init(
map: @escaping (Input) -> Output,
additionalDropCondition: ((Input) -> Bool)?
) {

public struct UniqueFilterEquatable<Map: MapFunction>: PipelineType where Map.Output : Equatable {

public typealias Input = Map.Input
public typealias Output = Map.Output

private let map: Map

public init(map: Map) {
self.map = map
self.additionalDropCondition = additionalDropCondition
}

// MARK: - Functions

public func yieldContinuously(_ input: Input) -> ContinuousResult<Output> {

guard let additionalDropCondition = additionalDropCondition, additionalDropCondition(input) else {
return .new(yield(input))

public func makeStorage() -> VergeConcurrency.UnfairLockAtomic<Output?> {
.init(nil)
}

public func yield(_ input: Input, storage: Storage) -> Output {
let result = map.perform(input)
storage.swap(result)
return result
}

public func yieldContinuously(_ input: Input, storage: Storage) -> ContinuousResult<Output> {

// not to check if input has changed because storing the input may cause performance issue by copying.

let result = map.perform(input)

return storage.modify { value in
if value != result {
value = result
return .new(result)
} else {
return .noUpdates
}
}

return .noUpdates

}

public func yield(_ input: Input) -> Output {
map(input)

}

public struct UniqueFilter<Map: MapFunction, OutputComparator: Comparison>: PipelineType where OutputComparator.Input == Map.Output? {

public typealias Input = Map.Input
public typealias Output = Map.Output

private let map: Map
private let outputComparator: OutputComparator

public init(map: Map, outputComparator: OutputComparator) {
self.map = map
self.outputComparator = outputComparator
}

public func drop(while predicate: @escaping (Input) -> Bool) -> Self {
return .init(
map: map,
additionalDropCondition: additionalDropCondition.map { currentCondition in
{ input in
currentCondition(input) || predicate(input)
}
} ?? predicate
)

public func makeStorage() -> VergeConcurrency.UnfairLockAtomic<Output?> {
.init(nil)
}


public func yield(_ input: Input, storage: Storage) -> Output {
let result = map.perform(input)
storage.swap(result)
return result
}

public func yieldContinuously(_ input: Input, storage: Storage) -> ContinuousResult<Output> {

// not to check if input has changed because storing the input may cause performance issue by copying.

let result = map.perform(input)

return storage.modify { value in
if !outputComparator(value, result) {
value = result
return .new(result)
} else {
return .noUpdates
}
}
}

}

}

public protocol MapFunction: Sendable {
associatedtype Input
associatedtype Output
func perform(_ input: Input) -> Output
}

public struct AnyMapFunction<Input, Output>: MapFunction {

private let _perform: @Sendable (Input) -> Output

public init(_ perform: @escaping @Sendable (Input) -> Output) {
self._perform = perform
}

public func perform(_ input: Input) -> Output {
_perform(input)
}
}

extension PipelineType {

public static func uniqueMap<Map: MapFunction>(_ mapFunction: Map) -> Self
where Map.Output: Equatable, Self == Pipelines.UniqueFilterEquatable<Map> {
return .init(map: mapFunction)
}

public static func uniqueMap<Input, Output: Equatable>(_ map: @escaping @Sendable (Input) -> Output) -> Self
where Output: Equatable, Self == Pipelines.UniqueFilterEquatable<AnyMapFunction<Input, Output>> {
return uniqueMap(.init(map))
}

public static func uniqueMap<Map: MapFunction, OutputComparator: Comparison>(_ mapFunction: Map, _ outputComparator: OutputComparator) -> Self
where Self == Pipelines.UniqueFilter<Map, OutputComparator> {
return .init(map: mapFunction, outputComparator: outputComparator)
}

public static func uniqueMap<Input, Output, OutputComparator: Comparison>(_ map: @escaping @Sendable (Input) -> Output, _ outputComparator: OutputComparator) -> Self
where Self == Pipelines.UniqueFilter<AnyMapFunction<Input, Output>, OutputComparator> {
return .init(map: .init(map), outputComparator: outputComparator)
}

}
Expand Down
12 changes: 8 additions & 4 deletions Sources/Verge/Store/StoreType+BindingDerived.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,19 @@ private struct BindingDerivedPipeline<Source: Equatable, Output: Equatable, Back
self.backingPipeline = backingPipeline
}

func yield(_ input: Changes<Source>) -> Output {
backingPipeline.yield(input)
func makeStorage() -> BackingPipeline.Storage {
backingPipeline.makeStorage()
}

func yieldContinuously(_ input: Changes<Source>) -> ContinuousResult<Output> {
func yield(_ input: Changes<Source>, storage: Storage) -> Output {
backingPipeline.yield(input, storage: storage)
}

func yieldContinuously(_ input: Changes<Source>, storage: Storage) -> ContinuousResult<Output> {
if input._transaction.isFromBindingDerived {
return .noUpdates
}
return backingPipeline.yieldContinuously(input)
return backingPipeline.yieldContinuously(input, storage: storage)
}

}
Expand Down
Loading
Loading