Commit 5c68fc53 authored by Him188's avatar Him188

Fix channel

parent 0c6526af
package net.mamoe.mirai.utils.io package net.mamoe.mirai.utils.io
import io.ktor.network.selector.ActorSelectorManager
import io.ktor.network.sockets.Socket
import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel
import io.ktor.util.KtorExperimentalAPI
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.io.ByteReadChannel import kotlinx.coroutines.withContext
import kotlinx.coroutines.io.ByteWriteChannel
import kotlinx.coroutines.io.readAvailable
import kotlinx.io.core.ByteReadPacket import kotlinx.io.core.ByteReadPacket
import kotlinx.io.core.Closeable import kotlinx.io.core.Closeable
import kotlinx.io.pool.useInstance import kotlinx.io.core.ExperimentalIoApi
import kotlinx.io.streams.readPacketAtMost
import kotlinx.io.streams.writePacket
import net.mamoe.mirai.utils.MiraiInternalAPI import net.mamoe.mirai.utils.MiraiInternalAPI
import java.io.BufferedInputStream
import java.io.BufferedOutputStream
import java.net.Socket
/** /**
* 多平台适配的 TCP Socket. * 多平台适配的 TCP Socket.
*/ */
@MiraiInternalAPI @MiraiInternalAPI
actual class PlatformSocket : Closeable { actual class PlatformSocket : Closeable {
@UseExperimental(KtorExperimentalAPI::class) private lateinit var socket: Socket
lateinit var socket: Socket
@UseExperimental(KtorExperimentalAPI::class)
actual val isOpen: Boolean actual val isOpen: Boolean
get() = socket.socketContext.isActive get() = socket.isConnected
override fun close() = socket.dispose() override fun close() = socket.close()
@PublishedApi @PublishedApi
internal lateinit var writeChannel: ByteWriteChannel internal lateinit var writeChannel: BufferedOutputStream
@PublishedApi @PublishedApi
internal lateinit var readChannel: ByteReadChannel internal lateinit var readChannel: BufferedInputStream
actual suspend inline fun send(packet: ByteArray, offset: Int, length: Int) { actual suspend inline fun send(packet: ByteArray, offset: Int, length: Int) {
try { withContext(Dispatchers.IO) {
writeChannel.writeFully(packet, offset, length) writeChannel.write(packet, offset, length)
} catch (e: Exception) { writeChannel.flush()
throw SendPacketInternalException(e)
} }
} }
...@@ -46,10 +40,9 @@ actual class PlatformSocket : Closeable { ...@@ -46,10 +40,9 @@ actual class PlatformSocket : Closeable {
* @throws SendPacketInternalException * @throws SendPacketInternalException
*/ */
actual suspend inline fun send(packet: ByteReadPacket) { actual suspend inline fun send(packet: ByteReadPacket) {
try { withContext(Dispatchers.IO) {
writeChannel.writePacket(packet) writeChannel.writePacket(packet)
} catch (e: Exception) { writeChannel.flush()
throw SendPacketInternalException(e)
} }
} }
...@@ -57,21 +50,17 @@ actual class PlatformSocket : Closeable { ...@@ -57,21 +50,17 @@ actual class PlatformSocket : Closeable {
* @throws ReadPacketInternalException * @throws ReadPacketInternalException
*/ */
actual suspend inline fun read(): ByteReadPacket { actual suspend inline fun read(): ByteReadPacket {
// do not use readChannel.readRemaining() !!! this function never returns return withContext(Dispatchers.IO) {
ByteArrayPool.useInstance { buffer -> readChannel.readPacketAtMost(Long.MAX_VALUE)
val count = try {
readChannel.readAvailable(buffer)
} catch (e: Exception) {
throw ReadPacketInternalException(e)
}
return buffer.toReadPacket(0, count)
} }
} }
@UseExperimental(KtorExperimentalAPI::class) @UseExperimental(ExperimentalIoApi::class)
actual suspend fun connect(serverHost: String, serverPort: Int) { actual suspend fun connect(serverHost: String, serverPort: Int) {
socket = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(serverHost, serverPort) withContext(Dispatchers.IO) {
writeChannel = socket.openWriteChannel(true) socket = Socket(serverHost, serverPort)
readChannel = socket.openReadChannel() readChannel = socket.getInputStream().buffered()
writeChannel = socket.getOutputStream().buffered()
}
} }
} }
\ No newline at end of file
package net.mamoe.mirai.utils.io package net.mamoe.mirai.utils.io
import io.ktor.network.selector.ActorSelectorManager
import io.ktor.network.sockets.Socket
import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel
import io.ktor.util.KtorExperimentalAPI
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.io.ByteReadChannel import kotlinx.coroutines.withContext
import kotlinx.coroutines.io.ByteWriteChannel
import kotlinx.coroutines.io.readAvailable
import kotlinx.io.core.ByteReadPacket import kotlinx.io.core.ByteReadPacket
import kotlinx.io.core.Closeable import kotlinx.io.core.Closeable
import kotlinx.io.pool.useInstance import kotlinx.io.core.ExperimentalIoApi
import kotlinx.io.streams.readPacketAtMost
import kotlinx.io.streams.writePacket
import net.mamoe.mirai.utils.MiraiInternalAPI import net.mamoe.mirai.utils.MiraiInternalAPI
import java.io.BufferedInputStream
import java.io.BufferedOutputStream
import java.net.Socket
/** /**
* 多平台适配的 TCP Socket. * 多平台适配的 TCP Socket.
*/ */
@MiraiInternalAPI @MiraiInternalAPI
actual class PlatformSocket : Closeable { actual class PlatformSocket : Closeable {
@UseExperimental(KtorExperimentalAPI::class) private lateinit var socket: Socket
lateinit var socket: Socket
@UseExperimental(KtorExperimentalAPI::class)
actual val isOpen: Boolean actual val isOpen: Boolean
get() = socket.socketContext.isActive get() = socket.isConnected
override fun close() = socket.dispose() override fun close() = socket.close()
@PublishedApi @PublishedApi
internal lateinit var writeChannel: ByteWriteChannel internal lateinit var writeChannel: BufferedOutputStream
@PublishedApi @PublishedApi
internal lateinit var readChannel: ByteReadChannel internal lateinit var readChannel: BufferedInputStream
actual suspend inline fun send(packet: ByteArray, offset: Int, length: Int) { actual suspend inline fun send(packet: ByteArray, offset: Int, length: Int) {
try { withContext(Dispatchers.IO) {
writeChannel.writeFully(packet, offset, length) writeChannel.write(packet, offset, length)
} catch (e: Exception) { writeChannel.flush()
throw SendPacketInternalException(e)
} }
} }
...@@ -46,10 +40,9 @@ actual class PlatformSocket : Closeable { ...@@ -46,10 +40,9 @@ actual class PlatformSocket : Closeable {
* @throws SendPacketInternalException * @throws SendPacketInternalException
*/ */
actual suspend inline fun send(packet: ByteReadPacket) { actual suspend inline fun send(packet: ByteReadPacket) {
try { withContext(Dispatchers.IO) {
writeChannel.writePacket(packet) writeChannel.writePacket(packet)
} catch (e: Exception) { writeChannel.flush()
throw SendPacketInternalException(e)
} }
} }
...@@ -57,21 +50,17 @@ actual class PlatformSocket : Closeable { ...@@ -57,21 +50,17 @@ actual class PlatformSocket : Closeable {
* @throws ReadPacketInternalException * @throws ReadPacketInternalException
*/ */
actual suspend inline fun read(): ByteReadPacket { actual suspend inline fun read(): ByteReadPacket {
// do not use readChannel.readRemaining() !!! this function never returns return withContext(Dispatchers.IO) {
ByteArrayPool.useInstance { buffer -> readChannel.readPacketAtMost(Long.MAX_VALUE)
val count = try {
readChannel.readAvailable(buffer)
} catch (e: Exception) {
throw ReadPacketInternalException(e)
}
return buffer.toReadPacket(0, count)
} }
} }
@UseExperimental(KtorExperimentalAPI::class) @UseExperimental(ExperimentalIoApi::class)
actual suspend fun connect(serverHost: String, serverPort: Int) { actual suspend fun connect(serverHost: String, serverPort: Int) {
socket = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(serverHost, serverPort) withContext(Dispatchers.IO) {
writeChannel = socket.openWriteChannel(true) socket = Socket(serverHost, serverPort)
readChannel = socket.openReadChannel() readChannel = socket.getInputStream().buffered()
writeChannel = socket.getOutputStream().buffered()
}
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment