Skip to content

Commit

Permalink
test: Add kuksa.val.v2 tests
Browse files Browse the repository at this point in the history
  • Loading branch information
wba2hi committed Jan 10, 2025
1 parent 24ac58a commit c986372
Show file tree
Hide file tree
Showing 9 changed files with 788 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ package org.eclipse.kuksa.connectivity.databroker.v2

import io.grpc.ConnectivityState
import io.grpc.ManagedChannel
import io.grpc.stub.StreamObserver
import kotlinx.coroutines.flow.Flow
import org.eclipse.kuksa.connectivity.authentication.JsonWebToken
import org.eclipse.kuksa.connectivity.databroker.DataBrokerException
import org.eclipse.kuksa.connectivity.databroker.DisconnectListener
import org.eclipse.kuksa.connectivity.databroker.v2.request.ActuateRequestV2
import org.eclipse.kuksa.connectivity.databroker.v2.request.BatchActuateRequestV2
Expand All @@ -38,6 +40,11 @@ import org.eclipse.kuksa.proto.v2.KuksaValV2
import java.util.logging.Logger
import kotlin.properties.Delegates

/**
* The DataBrokerConnection holds an active connection to the DataBroker. The Connection can be use to interact with the
* DataBroker.
*/
@Suppress("TooManyFunctions") // most methods are simply exposed from transporter layer
class DataBrokerConnectionV2 internal constructor(
private val managedChannel: ManagedChannel,
private val dataBrokerTransporter: DataBrokerTransporterV2 = DataBrokerTransporterV2(managedChannel),
Expand Down Expand Up @@ -77,6 +84,8 @@ class DataBrokerConnectionV2 internal constructor(
* The server might respond with the following GRPC error codes:
* NOT_FOUND if the requested signal doesn't exist
* PERMISSION_DENIED if access is denied
*
* @throws DataBrokerException when an error occurs
*/
suspend fun fetchValue(request: FetchValueRequestV2): KuksaValV2.GetValueResponse {
return dataBrokerTransporter.fetchValue(request.signalId)
Expand All @@ -89,6 +98,9 @@ class DataBrokerConnectionV2 internal constructor(
* The server might respond with the following GRPC error codes:
* NOT_FOUND if any of the requested signals doesn't exist.
* PERMISSION_DENIED if access is denied for any of the requested signals.
*
* @throws DataBrokerException when an error occurs
*
*/
suspend fun fetchValues(request: FetchValuesRequestV2): KuksaValV2.GetValuesResponse {
return dataBrokerTransporter.fetchValues(request.signalIds)
Expand All @@ -99,6 +111,8 @@ class DataBrokerConnectionV2 internal constructor(
* Returns (GRPC error code):
* NOT_FOUND if any of the signals are non-existent.
* PERMISSION_DENIED if access is denied for any of the signals.
*
* @throws DataBrokerException when an error occurs
*/
fun subscribeById(
request: SubscribeByIdRequestV2,
Expand All @@ -115,6 +129,8 @@ class DataBrokerConnectionV2 internal constructor(
* When subscribing the Broker shall immediately return the value for all
* subscribed entries. If no value is available when subscribing a DataPoint
* with value None shall be returned.
*
* @throws DataBrokerException when an error occurs
*/
fun subscribe(
request: SubscribeRequestV2,
Expand All @@ -132,6 +148,8 @@ class DataBrokerConnectionV2 internal constructor(
* INVALID_ARGUMENT
* - if the data type used in the request does not match the data type of the addressed signal
* - if the requested value is not accepted, e.g. if sending an unsupported enum value
*
* @throws DataBrokerException when an error occurs
*/
suspend fun actuate(request: ActuateRequestV2): KuksaValV2.ActuateResponse {
return dataBrokerTransporter.actuate(request.signalId, request.value)
Expand All @@ -150,6 +168,7 @@ class DataBrokerConnectionV2 internal constructor(
* - if the data type used in the request does not match the data type of the addressed signal
* - if the requested value is not accepted, e.g. if sending an unsupported enum value
*
* @throws DataBrokerException when an error occurs
*/
suspend fun batchActuate(request: BatchActuateRequestV2): KuksaValV2.BatchActuateResponse {
return dataBrokerTransporter.batchActuate(request.signalIds, request.value)
Expand All @@ -162,6 +181,8 @@ class DataBrokerConnectionV2 internal constructor(
*
* The server might respond with the following GRPC error codes:
* NOT_FOUND if the specified root branch does not exist.
*
* @throws DataBrokerException when an error occurs
*/
suspend fun listMetadata(request: ListMetadataRequestV2): KuksaValV2.ListMetadataResponse {
return dataBrokerTransporter.listMetadata(request.root, request.filter)
Expand All @@ -178,6 +199,8 @@ class DataBrokerConnectionV2 internal constructor(
* INVALID_ARGUMENT
* - if the data type used in the request does not match the data type of the addressed signal
* - if the published value is not accepted e.g. if sending an unsupported enum value
*
* @throws DataBrokerException when an error occurs
*/
suspend fun publishValue(
request: PublishValueRequestV2,
Expand All @@ -193,17 +216,29 @@ class DataBrokerConnectionV2 internal constructor(
* The open stream is used for request / response type communication between the
* provider and server (where the initiator of a request can vary).
* Errors are communicated as messages in the stream.
*
* @throws DataBrokerException when an error occurs
*/
fun openProviderStream(
streamRequestFlow: Flow<KuksaValV2.OpenProviderStreamRequest>,
): Flow<KuksaValV2.OpenProviderStreamResponse> {
return dataBrokerTransporter.openProviderStream(streamRequestFlow)
responseStream: StreamObserver<KuksaValV2.OpenProviderStreamResponse>,
): StreamObserver<KuksaValV2.OpenProviderStreamRequest> {
return dataBrokerTransporter.openProviderStream(responseStream)
}

/**
* Gets the server information.
*
* @throws DataBrokerException when an error occurs
*/
suspend fun fetchServerInfo(): KuksaValV2.GetServerInfoResponse {
return dataBrokerTransporter.fetchServerInfo()
}

/**
* Disconnect from the DataBroker.
*/
fun disconnect() {
logger.finer("disconnect() called")
managedChannel.shutdownNow()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ package org.eclipse.kuksa.connectivity.databroker.v2
import io.grpc.ConnectivityState
import io.grpc.ManagedChannel
import io.grpc.StatusException
import io.grpc.stub.StreamObserver
import kotlinx.coroutines.flow.Flow
import org.eclipse.kuksa.connectivity.authentication.JsonWebToken
import org.eclipse.kuksa.connectivity.databroker.DataBrokerException
import org.eclipse.kuksa.connectivity.databroker.v2.extension.withAuthenticationInterceptor
import org.eclipse.kuksa.proto.v2.KuksaValV2
import org.eclipse.kuksa.proto.v2.Types
import org.eclipse.kuksa.proto.v2.Types.Value
import org.eclipse.kuksa.proto.v2.VALGrpc
import org.eclipse.kuksa.proto.v2.VALGrpcKt
import org.eclipse.kuksa.proto.v2.actuateRequest
import org.eclipse.kuksa.proto.v2.batchActuateRequest
Expand Down Expand Up @@ -63,6 +65,7 @@ internal class DataBrokerTransporterV2(
var jsonWebToken: JsonWebToken? = null

private val coroutineStub: VALGrpcKt.VALCoroutineStub = VALGrpcKt.VALCoroutineStub(managedChannel)
private val asyncStub: VALGrpc.VALStub = VALGrpc.newStub(managedChannel)

/**
* Gets the latest value of a [signalId].
Expand Down Expand Up @@ -281,12 +284,12 @@ internal class DataBrokerTransporterV2(
* Errors are communicated as messages in the stream.
*/
fun openProviderStream(
streamRequestFlow: Flow<KuksaValV2.OpenProviderStreamRequest>,
): Flow<KuksaValV2.OpenProviderStreamResponse> {
responseStream: StreamObserver<KuksaValV2.OpenProviderStreamResponse>,
): StreamObserver<KuksaValV2.OpenProviderStreamRequest> {
return try {
coroutineStub
asyncStub
.withAuthenticationInterceptor(jsonWebToken)
.openProviderStream(streamRequestFlow)
.openProviderStream(responseStream)
} catch (e: StatusException) {
throw DataBrokerException(e.message, e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import io.grpc.ClientInterceptor
import io.grpc.Metadata
import io.grpc.stub.MetadataUtils
import org.eclipse.kuksa.connectivity.authentication.JsonWebToken
import org.eclipse.kuksa.proto.v2.VALGrpc
import org.eclipse.kuksa.proto.v2.VALGrpcKt

internal fun VALGrpcKt.VALCoroutineStub.withAuthenticationInterceptor(
Expand All @@ -35,6 +36,15 @@ internal fun VALGrpcKt.VALCoroutineStub.withAuthenticationInterceptor(
return withInterceptors(authenticationInterceptor)
}

internal fun VALGrpc.VALStub.withAuthenticationInterceptor(
jsonWebToken: JsonWebToken?,
): VALGrpc.VALStub {
if (jsonWebToken == null) return this

val authenticationInterceptor = clientInterceptor(jsonWebToken)
return withInterceptors(authenticationInterceptor)
}

private fun clientInterceptor(jsonWebToken: JsonWebToken): ClientInterceptor? {
val authorizationHeader = Metadata.Key.of(HttpHeaders.AUTHORIZATION, Metadata.ASCII_STRING_MARSHALLER)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (c) 2023 - 2025 Contributors to the Eclipse Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/

package org.eclipse.kuksa.connectivity.databroker.provider

import io.grpc.ChannelCredentials
import io.grpc.Grpc
import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder
import io.grpc.TlsChannelCredentials
import org.eclipse.kuksa.connectivity.authentication.JsonWebToken
import org.eclipse.kuksa.connectivity.databroker.DATABROKER_HOST
import org.eclipse.kuksa.connectivity.databroker.DATABROKER_TIMEOUT_SECONDS
import org.eclipse.kuksa.connectivity.databroker.DATABROKER_TIMEOUT_UNIT
import org.eclipse.kuksa.connectivity.databroker.docker.DEFAULT_PORT_INSECURE
import org.eclipse.kuksa.connectivity.databroker.docker.DEFAULT_PORT_SECURE
import org.eclipse.kuksa.connectivity.databroker.v2.DataBrokerConnectorV2
import org.eclipse.kuksa.mocking.JwtType
import org.eclipse.kuksa.model.TimeoutConfig
import org.eclipse.kuksa.test.TestResourceFile
import java.io.IOException
import java.io.InputStream

class DataBrokerConnectorV2Provider {
lateinit var managedChannel: ManagedChannel

fun createInsecure(
host: String = DATABROKER_HOST,
port: Int = DEFAULT_PORT_INSECURE,
jwtFileStream: InputStream? = null,
): DataBrokerConnectorV2 {
managedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build()

val jsonWebToken = jwtFileStream?.let {
val token = it.reader().readText()
JsonWebToken(token)
}

return DataBrokerConnectorV2(
managedChannel,
jsonWebToken,
).apply {
timeoutConfig = TimeoutConfig(DATABROKER_TIMEOUT_SECONDS, DATABROKER_TIMEOUT_UNIT)
}
}

fun createSecure(
host: String = DATABROKER_HOST,
port: Int = DEFAULT_PORT_SECURE,
overrideAuthority: String = "",
rootCertFileStream: InputStream = TestResourceFile("tls/CA.pem").inputStream(),
jwtFileStream: InputStream? = JwtType.READ_WRITE_ALL.asInputStream(),
): DataBrokerConnectorV2 {
val tlsCredentials: ChannelCredentials
try {
tlsCredentials = TlsChannelCredentials.newBuilder()
.trustManager(rootCertFileStream)
.build()
} catch (_: IOException) {
// Handle error
throw IOException("Could not create TLS credentials")
}

val channelBuilder = Grpc
.newChannelBuilderForAddress(host, port, tlsCredentials)

val hasOverrideAuthority = overrideAuthority.isNotEmpty()
if (hasOverrideAuthority) {
channelBuilder.overrideAuthority(overrideAuthority)
}

managedChannel = channelBuilder.build()

val jsonWebToken = jwtFileStream?.let {
val token = it.reader().readText()
JsonWebToken(token)
}

return DataBrokerConnectorV2(
managedChannel,
jsonWebToken,
).apply {
timeoutConfig = TimeoutConfig(DATABROKER_TIMEOUT_SECONDS, DATABROKER_TIMEOUT_UNIT)
}
}
}
Loading

0 comments on commit c986372

Please sign in to comment.