Commit f5a84202 authored by Him188's avatar Him188

Speed up image uploading

parent fe764d1f
...@@ -40,6 +40,9 @@ import kotlin.contracts.ExperimentalContracts ...@@ -40,6 +40,9 @@ import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.contract import kotlin.contracts.contract
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
import kotlin.jvm.JvmSynthetic import kotlin.jvm.JvmSynthetic
import kotlin.math.roundToInt
import kotlin.time.ExperimentalTime
import kotlin.time.measureTime
@OptIn(ExperimentalContracts::class) @OptIn(ExperimentalContracts::class)
internal fun GroupImpl.Companion.checkIsInstance(instance: Group) { internal fun GroupImpl.Companion.checkIsInstance(instance: Group) {
...@@ -344,6 +347,7 @@ internal class GroupImpl( ...@@ -344,6 +347,7 @@ internal class GroupImpl(
return MessageReceipt(source, this, botAsMember) return MessageReceipt(source, this, botAsMember)
} }
@OptIn(ExperimentalTime::class)
@JvmSynthetic @JvmSynthetic
override suspend fun uploadImage(image: ExternalImage): OfflineGroupImage = try { override suspend fun uploadImage(image: ExternalImage): OfflineGroupImage = try {
if (BeforeImageUploadEvent(this, image).broadcast().isCancelled) { if (BeforeImageUploadEvent(this, image).broadcast().isCancelled) {
...@@ -391,18 +395,21 @@ internal class GroupImpl( ...@@ -391,18 +395,21 @@ internal class GroupImpl(
// 每 10KB 等 1 秒, 最少等待 5 秒 // 每 10KB 等 1 秒, 最少等待 5 秒
val success = response.uploadIpList.zip(response.uploadPortList).any { (ip, port) -> val success = response.uploadIpList.zip(response.uploadPortList).any { (ip, port) ->
withTimeoutOrNull((image.inputSize * 1000 / 1024 / 10).coerceAtLeast(5000)) { withTimeoutOrNull((image.inputSize * 1000 / 1024 / 10).coerceAtLeast(5000)) {
bot.network.logger.verbose { "[Highway] Uploading group image to ${ip.toIpV4AddressString()}:$port: size=${image.inputSize / 1024} KiB" } bot.network.logger.verbose { "[Highway] Uploading group image to ${ip.toIpV4AddressString()}:$port, size=${image.inputSize / 1024} KiB" }
HighwayHelper.uploadImage(
client = bot.client, val time = measureTime {
serverIp = ip.toIpV4AddressString(), HighwayHelper.uploadImage(
serverPort = port, client = bot.client,
imageInput = image.input, serverIp = ip.toIpV4AddressString(),
inputSize = image.inputSize.toInt(), serverPort = port,
fileMd5 = image.md5, imageInput = image.input,
ticket = response.uKey, inputSize = image.inputSize.toInt(),
commandId = 2 fileMd5 = image.md5,
) ticket = response.uKey,
bot.network.logger.verbose { "[Highway] Uploading group image: succeed" } commandId = 2
)
}
bot.network.logger.verbose { "[Highway] Uploading group image: succeed at ${(image.inputSize.toDouble() / 1024 / time.inSeconds).roundToInt()} KiB/s" }
true true
} ?: kotlin.run { } ?: kotlin.run {
bot.network.logger.verbose { "[Highway] Uploading group image: timeout, retrying next server" } bot.network.logger.verbose { "[Highway] Uploading group image: timeout, retrying next server" }
......
...@@ -2,6 +2,9 @@ ...@@ -2,6 +2,9 @@
package net.mamoe.mirai.utils.internal package net.mamoe.mirai.utils.internal
import kotlinx.io.pool.DefaultPool
import kotlinx.io.pool.ObjectPool
expect abstract class InputStream { expect abstract class InputStream {
open fun available(): Int open fun available(): Int
open fun close() open fun close()
...@@ -21,9 +24,50 @@ internal fun ByteArray.checkOffsetAndLength(offset: Int, length: Int) { ...@@ -21,9 +24,50 @@ internal fun ByteArray.checkOffsetAndLength(offset: Int, length: Int) {
require(offset + length <= this.size) { "offset ($offset) + length ($length) > array.size (${this.size})" } require(offset + length <= this.size) { "offset ($offset) + length ($length) > array.size (${this.size})" }
} }
internal inline fun InputStream.readInSequence(block: (Int) -> Unit) {
internal inline fun InputStream.readInSequence(block: (ByteArray, len: Int) -> Unit) {
var read: Int var read: Int
while (this.read().also { read = it } != -1) { ByteArrayPool.useInstance { buf ->
block(read) while (this.read(buf).also { read = it } != -1) {
block(buf, read)
}
} }
} }
/**
* 缓存 [ByteArray] 实例的 [ObjectPool]
*/
internal object ByteArrayPool : DefaultPool<ByteArray>(32) {
/**
* 每一个 [ByteArray] 的大小
*/
const val BUFFER_SIZE: Int = 8192
override fun produceInstance(): ByteArray = ByteArray(BUFFER_SIZE)
override fun clearInstance(instance: ByteArray): ByteArray = instance
fun checkBufferSize(size: Int) {
require(size <= BUFFER_SIZE) { "sizePerPacket is too large. Maximum buffer size=$BUFFER_SIZE" }
}
fun checkBufferSize(size: Long) {
require(size <= BUFFER_SIZE) { "sizePerPacket is too large. Maximum buffer size=$BUFFER_SIZE" }
}
/**
* 请求一个大小至少为 [requestedSize] 的 [ByteArray] 实例.
*/ // 不要写为扩展函数. 它需要优先于 kotlinx.io 的扩展函数 resolve
inline fun <R> useInstance(requestedSize: Int = 0, block: (ByteArray) -> R): R {
if (requestedSize > BUFFER_SIZE) {
return ByteArray(requestedSize).run(block)
}
val instance = borrow()
try {
return block(instance)
} finally {
recycle(instance)
}
}
}
\ No newline at end of file
...@@ -13,8 +13,8 @@ internal actual fun ByteArray.md5(offset: Int, length: Int): ByteArray { ...@@ -13,8 +13,8 @@ internal actual fun ByteArray.md5(offset: Int, length: Int): ByteArray {
internal actual fun InputStream.md5(): ByteArray { internal actual fun InputStream.md5(): ByteArray {
val digest = MessageDigest.getInstance("md5") val digest = MessageDigest.getInstance("md5")
digest.reset() digest.reset()
this.readInSequence { this.readInSequence { buf, len ->
digest.update(it.toByte()) digest.update(buf, 0, len)
} }
return digest.digest() return digest.digest()
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment