Skip to content
This repository has been archived by the owner on Aug 12, 2024. It is now read-only.

Commit

Permalink
Shamrock: typo BaseTransmitServlet.kt
Browse files Browse the repository at this point in the history
Signed-off-by: 白池 <[email protected]>
  • Loading branch information
whitechi73 committed Feb 19, 2024
1 parent 1c65aab commit c70f3ea
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,33 @@ import moe.fuqiuluo.shamrock.remote.action.handlers.QuickOperation.quicklyReply
import moe.fuqiuluo.shamrock.remote.service.api.GlobalEventTransmitter

internal object HttpService: HttpTransmitServlet() {
private val jobList = arrayListOf<Job>()
private val subscribes = arrayListOf<Job>()

override fun submitFlowJob(job: Job) {
// HTTP 回调不会触发断连,无需释放之前的JOB
jobList.add(job)
override fun subscribe(job: Job) {
subscribes.add(job)
}

override fun cancelFlowJobs() {
jobList.removeIf {
override fun unsubscribe() {
subscribes.removeIf {
it.cancel()
return@removeIf true
}
}

override fun initTransmitter() {
if (jobList.isNotEmpty()) return
submitFlowJob(GlobalScope.launch {
override fun init() {
if (subscribes.isNotEmpty()) return
subscribe(GlobalScope.launch {
GlobalEventTransmitter.onMessageEvent { (record, event) ->
val respond = pushTo(event) ?: return@onMessageEvent
handleQuicklyReply(record, event.messageId, respond.bodyAsText())
}
})
submitFlowJob(GlobalScope.launch {
subscribe(GlobalScope.launch {
GlobalEventTransmitter.onNoticeEvent { event ->
pushTo(event)
}
})
submitFlowJob(GlobalScope.launch {
subscribe(GlobalScope.launch {
GlobalEventTransmitter.onRequestEvent {
pushTo(it)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,37 @@ internal class WebSocketClientService(
heartbeatInterval: Long,
wsHeaders: Map<String, String>
) : WebSocketClientServlet(address, heartbeatInterval, wsHeaders) {
private val eventJobList = mutableSetOf<Job>()
private val subscribes = mutableSetOf<Job>()

init {
startHeartbeatTimer()
}

override fun submitFlowJob(job: Job) {
eventJobList.add(job)
override fun subscribe(job: Job) {
subscribes.add(job)
}

override fun initTransmitter() {
submitFlowJob(GlobalScope.launch {
override fun init() {
subscribe(GlobalScope.launch {
GlobalEventTransmitter.onMessageEvent { (_, event) ->
pushTo(event)
}
})
submitFlowJob(GlobalScope.launch {
subscribe(GlobalScope.launch {
GlobalEventTransmitter.onNoticeEvent { event ->
pushTo(event)
}
})
submitFlowJob(GlobalScope.launch {
subscribe(GlobalScope.launch {
GlobalEventTransmitter.onRequestEvent { event ->
pushTo(event)
}
})
LogCenter.log("WebSocketClientService: 初始化服务", Level.WARN)
}

override fun cancelFlowJobs() {
eventJobList.removeIf { job ->
override fun unsubscribe() {
subscribes.removeIf { job ->
job.cancel()
return@removeIf true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,33 @@ internal class WebSocketService(
port: Int,
heartbeatInterval: Long,
): WebSocketTransmitServlet(host, port, heartbeatInterval) {
private val eventJobList = mutableSetOf<Job>()
private val subscribes = mutableSetOf<Job>()

override fun submitFlowJob(job: Job) {
eventJobList.add(job)
override fun subscribe(job: Job) {
subscribes.add(job)
}

override fun initTransmitter() {
submitFlowJob(GlobalScope.launch {
override fun init() {
subscribe(GlobalScope.launch {
GlobalEventTransmitter.onMessageEvent { (_, event) ->
pushTo(event)
}
})
submitFlowJob(GlobalScope.launch {
subscribe(GlobalScope.launch {
GlobalEventTransmitter.onNoticeEvent { event ->
pushTo(event)
}
})
submitFlowJob(GlobalScope.launch {
subscribe(GlobalScope.launch {
GlobalEventTransmitter.onRequestEvent { event ->
pushTo(event)
}
})
LogCenter.log("WebSocketService: 初始化服务", Level.WARN)
}

override fun cancelFlowJobs() {
eventJobList.removeIf { job ->
override fun unsubscribe() {
subscribes.removeIf { job ->
job.cancel()
return@removeIf true
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
package moe.fuqiuluo.shamrock.remote.service.api

import com.tencent.mobileqq.app.QQAppInterface
import com.tencent.qqnt.kernel.nativeinterface.MsgElement
import com.tencent.qqnt.kernel.nativeinterface.MsgRecord
import kotlinx.coroutines.Job
import moe.fuqiuluo.shamrock.remote.service.data.push.NoticeSubType
import moe.fuqiuluo.shamrock.remote.service.data.push.NoticeType
import moe.fuqiuluo.shamrock.xposed.helper.AppRuntimeFetcher
import oicq.wlogin_sdk.tools.MD5

internal interface BaseTransmitServlet {
val address: String

fun allowTransmit(): Boolean
fun transmitAccess(): Boolean

fun submitFlowJob(job: Job)
fun subscribe(job: Job)

fun cancelFlowJobs()
fun unsubscribe()

fun initTransmitter()
fun init()

val app: QQAppInterface
get() = AppRuntimeFetcher.appRuntime as QQAppInterface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import io.ktor.client.request.setBody
import io.ktor.client.statement.HttpResponse
import io.ktor.http.ContentType
import io.ktor.http.contentType
import kotlinx.coroutines.Job
import moe.fuqiuluo.shamrock.remote.service.config.ShamrockConfig
import moe.fuqiuluo.shamrock.tools.GlobalClient
import moe.fuqiuluo.shamrock.helper.Level
Expand All @@ -22,12 +21,12 @@ import java.net.SocketException
internal abstract class HttpTransmitServlet : BaseTransmitServlet {
override val address: String by lazy { ShamrockConfig.getWebHookAddress() }

override fun allowTransmit(): Boolean {
override fun transmitAccess(): Boolean {
return ShamrockConfig.allowWebHook()
}

protected suspend inline fun <reified T> pushTo(body: T): HttpResponse? {
if (!allowTransmit()) return null
if (!transmitAccess()) return null
try {
if (address.startsWith("http://") || address.startsWith("https://")) {
val response = GlobalClient.post(address) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ internal abstract class WebSocketClientServlet(
private var firstOpen = true
private val sendLock = Mutex()

override fun allowTransmit(): Boolean {
override fun transmitAccess(): Boolean {
return ShamrockConfig.openWebSocketClient()
}

Expand Down Expand Up @@ -90,9 +90,9 @@ internal abstract class WebSocketClientServlet(
if (firstOpen) {
firstOpen = false
} else {
cancelFlowJobs()
unsubscribe()
}
initTransmitter()
init()
}

override fun onClose(code: Int, reason: String?, remote: Boolean) {
Expand All @@ -105,18 +105,18 @@ internal abstract class WebSocketClientServlet(
}
}
LogCenter.log("WebSocketClient onClose: $code, $reason, $remote")
cancelFlowJobs()
unsubscribe()
connectedClients.remove(url)
}

override fun onError(ex: Exception?) {
LogCenter.log("WebSocketClient onError: ${ex?.message}")
cancelFlowJobs()
unsubscribe()
connectedClients.remove(url)
}

protected suspend inline fun <reified T> pushTo(body: T) {
if (!allowTransmit() || isClosed || isClosing) return
if (!transmitAccess() || isClosed || isClosing) return
try {
sendLock.withLock {
send(GlobalJson.encodeToString(body))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ internal abstract class WebSocketTransmitServlet(
override val address: String
get() = "-"

override fun allowTransmit(): Boolean {
override fun transmitAccess(): Boolean {
return ShamrockConfig.openWebSocket()
}

Expand Down Expand Up @@ -129,16 +129,16 @@ internal abstract class WebSocketTransmitServlet(

override fun onError(conn: WebSocket, ex: Exception?) {
LogCenter.log("WSServer Error: " + ex?.stackTraceToString(), Level.ERROR)
cancelFlowJobs()
unsubscribe()
}

override fun onStart() {
LogCenter.log("WSServer start running on ws://${getAddress()}!")
initTransmitter()
init()
}

protected suspend inline fun <reified T> pushTo(body: T) {
if(!allowTransmit()) return
if(!transmitAccess()) return
try {
sendLock.withLock {
broadcastTextEvent(GlobalJson.encodeToString(body))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ internal class InitRemoteService : IAction {


if (ShamrockConfig.allowWebHook()) {
HttpService.initTransmitter()
HttpService.init()
}

val runtime = AppRuntimeFetcher.appRuntime
Expand Down

0 comments on commit c70f3ea

Please sign in to comment.