Skip to content

Optimize gzip handlemulti #338

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import java.net.InetAddress
import java.net.InetSocketAddress
import java.util.concurrent.CancellationException
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.CoroutineContext
import kotlin.time.DurationUnit
import kotlin.time.toDuration
Expand All @@ -46,13 +48,16 @@ class WebSocketConnection :

private var lastFrameTime = System.currentTimeMillis()

private val isDisconnecting = AtomicBoolean(false)

override val coroutineContext: CoroutineContext = Dispatchers.IO + job

override fun connect(endPoint: InetSocketAddress, timeout: Int) {
launch {
logger.debug("Trying connection to ${endPoint.hostName}:${endPoint.port}")

try {
isDisconnecting.set(false)
endpoint = endPoint

client = HttpClient(CIO) {
Expand Down Expand Up @@ -90,6 +95,8 @@ class WebSocketConnection :
is Frame.Text -> logger.debug("Received plain text ${frame.readText()}")
}
}
} catch (e: CancellationException) {
logger.debug("Frame listener was cancelled", e)
} catch (e: Exception) {
logger.error("An error occurred while receiving data", e)
disconnect(false)
Expand All @@ -106,7 +113,13 @@ class WebSocketConnection :
}

override fun disconnect(userInitiated: Boolean) {
if (!isDisconnecting.compareAndSet(false, true)) {
logger.error("Already disconnected or disconnecting")
return
}

logger.debug("Disconnect called: $userInitiated")

launch {
try {
session?.close()
Expand All @@ -123,6 +136,10 @@ class WebSocketConnection :
}

override fun send(data: ByteArray) {
if (isDisconnecting.get()) {
return
}

launch {
try {
val frame = Frame.Binary(true, data)
Expand All @@ -145,30 +162,34 @@ class WebSocketConnection :
*/
private fun startConnectionMonitoring() {
launch {
while (isActive) {
if (client?.isActive == false || session?.isActive == false) {
logger.error("Client or Session is no longer active")
disconnect(userInitiated = false)
}

val timeSinceLastFrame = System.currentTimeMillis() - lastFrameTime

// logger.debug("Watchdog status: $timeSinceLastFrame")
when {
timeSinceLastFrame > 30000 -> {
logger.error("Watchdog: No response for 30 seconds. Disconnecting from steam")
try {
while (isActive && !isDisconnecting.get()) {
if (client?.isActive == false || session?.isActive == false) {
logger.error("Client or Session is no longer active")
disconnect(userInitiated = false)
break
}

timeSinceLastFrame > 25000 -> logger.debug("Watchdog: No response for 25 seconds")
val timeSinceLastFrame = System.currentTimeMillis() - lastFrameTime

timeSinceLastFrame > 20000 -> logger.debug("Watchdog: No response for 20 seconds")
// logger.debug("Watchdog status: $timeSinceLastFrame")
when {
timeSinceLastFrame > 30000 -> {
logger.error("Watchdog: No response for 30 seconds. Disconnecting from steam")
disconnect(userInitiated = false)
break
}

timeSinceLastFrame > 15000 -> logger.debug("Watchdog: No response for 15 seconds")
}
timeSinceLastFrame > 25000 -> logger.debug("Watchdog: No response for 25 seconds")

delay(5000)
timeSinceLastFrame > 20000 -> logger.debug("Watchdog: No response for 20 seconds")

timeSinceLastFrame > 15000 -> logger.debug("Watchdog: No response for 15 seconds")
}

delay(5000)
}
} catch (e: CancellationException) {
logger.debug("Watchdog Cancelled.", e)
}
}
}
Expand Down
Loading