Skip to content

Commit

Permalink
Add EventGroupSelectorManager instead of ActorSelectorManager
Browse files Browse the repository at this point in the history
  • Loading branch information
marychatte committed Jun 4, 2024
1 parent 1eeb7c1 commit 25355ef
Show file tree
Hide file tree
Showing 14 changed files with 522 additions and 84 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.selector.eventgroup

import kotlinx.coroutines.suspendCancellableCoroutine
import java.nio.channels.SelectionKey

internal inline val SelectionKey.attachment get() = attachment() as Attachment

/**
* Attachment for SelectionKey
* It contains task for each interest and allows to run them and resume the continuation
*/
internal class Attachment {
private var acceptTask: Task<Any?>? = null
private var readTask: Task<Any?>? = null
private var writeTask: Task<Any?>? = null
private var connectTask: Task<Any?>? = null

suspend fun <T> runTask(interest: Int, task: suspend () -> T): T {
return suspendCancellableCoroutine {
@Suppress("UNCHECKED_CAST")
setContinuationByInterest(interest, Task(it.toResumableCancellable(), task) as Task<Any?>)
}
}

suspend fun runTaskAndResumeContinuation(key: SelectionKey) {
when {
key.isAcceptable -> acceptTask.runAndResume(SelectionKey.OP_ACCEPT)
key.isReadable -> readTask.runAndResume(SelectionKey.OP_READ)
key.isWritable -> writeTask.runAndResume(SelectionKey.OP_WRITE)
key.isConnectable -> connectTask.runAndResume(SelectionKey.OP_CONNECT)
}
}

private suspend fun Task<Any?>?.runAndResume(interest: Int) {
val task = this ?: return
setContinuationByInterest(interest, null)
task.runAndResume()
}

private fun setContinuationByInterest(interest: Int, task: Task<Any?>?) {
when (interest) {
SelectionKey.OP_ACCEPT -> acceptTask = task
SelectionKey.OP_READ -> readTask = task
SelectionKey.OP_WRITE -> writeTask = task
SelectionKey.OP_CONNECT -> connectTask = task
}
}

fun cancel(cause: Throwable? = null) {
acceptTask.cancel(cause)
readTask.cancel(cause)
writeTask.cancel(cause)
connectTask.cancel(cause)
}

private fun Task<*>?.cancel(cause: Throwable? = null) {
this?.continuation?.cancel(cause)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.selector.eventgroup

import java.nio.channels.SocketChannel

/**
* Allows to perform read and write operations on the socket channel,
* which will be submitted as tasks to the event loop and will be suspended until
* they will be executed in the context of the event loop
*/
internal interface Connection {
val channel: SocketChannel

suspend fun <T> performRead(body: suspend (SocketChannel) -> T): T

suspend fun <T> performWrite(body: suspend (SocketChannel) -> T): T

fun close()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.selector.eventgroup

import io.ktor.utils.io.*
import kotlin.coroutines.*

private val MAX_THREADS by lazy {
Runtime.getRuntime().availableProcessors()
.minus(2)
.coerceAtLeast(1)
}

@InternalAPI
public class EventGroupContext(
public val parallelism: Int,
) : CoroutineContext.Element {
override val key: CoroutineContext.Key<*> = Key

public companion object Key : CoroutineContext.Key<EventGroupContext>
}

@InternalAPI
internal fun CoroutineContext.eventGroupParallelism(): Int {
return get(EventGroupContext.Key)?.parallelism ?: MAX_THREADS
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.selector.eventgroup

import io.ktor.network.selector.*
import io.ktor.utils.io.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import java.nio.channels.*
import java.nio.channels.spi.*
import kotlin.coroutines.*

@OptIn(InternalAPI::class)
public class EventGroupSelectorManager(context: CoroutineContext) : SelectorManager {
private val group = EventGroup(context.eventGroupParallelism())

override val coroutineContext: CoroutineContext = context + CoroutineName("eventgroup")

override val provider: SelectorProvider = SelectorProvider.provider()

override fun notifyClosed(selectable: Selectable) {
// whatever
}

override suspend fun select(selectable: Selectable, interest: SelectInterest) {
error("no select in eventgroup")
}

override fun close() {
group.close()
}
}

public class EventGroup(private val maxLoops: Int) {
private val acceptLoop = Eventloop()
private val loopIndex = atomic(0)
private val loops = mutableListOf<Eventloop>()

init {
acceptLoop.run()

repeat(maxLoops - 1) {
val next = Eventloop().apply { run() }
loops.add(next)
}
}

private fun registerAcceptKey(channel: AbstractSelectableChannel) = acceptLoop.runOnLoop {
acceptLoop.addInterest(channel, SelectionKey.OP_ACCEPT)
}

internal fun registerChannel(channel: ServerSocketChannel): RegisteredServerChannel {
val key = registerAcceptKey(channel)

return RegisteredServerChannelImpl(channel, key)
}

private inner class RegisteredServerChannelImpl(
override val channel: ServerSocketChannel,
private val key: CompletableDeferred<SelectionKey>,
) : RegisteredServerChannel {
override suspend fun acceptConnection(configure: (SocketChannel) -> Unit): ConnectionImpl {
val result = key.await().attachment.runTask(SelectionKey.OP_ACCEPT) {
channel.accept().apply {
configureBlocking(false)
configure(this)
}
}

val nextLoopIndex = loopIndex.getAndIncrement() % (maxLoops - 1)

return ConnectionImpl(result, loops[nextLoopIndex])
}
}

private class ConnectionImpl(
override val channel: SocketChannel,
val loop: Eventloop,
) : Connection {
override suspend fun <T> performRead(body: suspend (SocketChannel) -> T): T {
return runTask(SelectionKey.OP_READ) { body(channel) }
}

override suspend fun <T> performWrite(body: suspend (SocketChannel) -> T): T {
return runTask(SelectionKey.OP_WRITE) { body(channel) }
}

override fun close() {
channel.close()
}

private suspend fun <T> runTask(interest: Int, body: suspend () -> T): T {
val key = loop.addInterest(channel, interest)
return key.attachment.runTask(interest, body).also {
loop.deleteInterest(key, interest)
}
}
}

public fun close() {
acceptLoop.close(null)
loops.forEach { it.close(null) }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.selector.eventgroup

import kotlinx.coroutines.*
import java.nio.channels.*

internal class Eventloop {
val scope = newThreadContext(nThreads = 1).wrapInScope()

fun run(): Job {
return scope.launch { runLoop() }
}

private val taskQueue = ArrayDeque<Task<*>>()

private val selector = Selector.open()

fun close(cause: Throwable?) {
taskQueue.forEach { it.continuation.cancel(cause) }
selector.close()
}

private suspend fun runLoop() {
while (true) {
runAllPendingTasks()

val n = selector.select(SELECTOR_TIMEOUT_MILLIS)
yield()

if (n == 0) {
continue
}

val selectionKeys = selector.selectedKeys().iterator()
while (selectionKeys.hasNext()) {
val key = selectionKeys.next()
selectionKeys.remove()

try {
if (!key.isValid) continue
key.attachment.runTaskAndResumeContinuation(key)
} catch (e: Throwable) {
key.channel().close()
key.attachment.cancel(e)
}
}
}
}

private suspend fun runAllPendingTasks() {
repeat(taskQueue.size) {
taskQueue.removeFirst().runAndResume()
}
}

internal fun <T> runOnLoop(body: suspend () -> T): CompletableDeferred<T> {
val result = CompletableDeferred<T>()
taskQueue.addLast(Task(result.toResumableCancellable(), body))
return result
}

internal fun addInterest(channel: SelectableChannel, interest: Int): SelectionKey {
val result = channel.keyFor(selector)?.apply {
interestOpsOr(interest)

} ?: let {
channel.register(selector, interest, Attachment())
}
return result
}

internal fun deleteInterest(selectionKey: SelectionKey, interest: Int) {
selectionKey.interestOpsAnd(interest.inv())
}

companion object {
private const val SELECTOR_TIMEOUT_MILLIS = 20L
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.selector.eventgroup

import java.net.Socket
import java.nio.channels.ServerSocketChannel
import java.nio.channels.SocketChannel

/**
* Represents a server channel registered to an event loop with OP_ACCEPT interest
*/
internal interface RegisteredServerChannel {
val channel: ServerSocketChannel

/**
* Allows to accept connections on the server socket channel
*/
suspend fun acceptConnection(configure: (SocketChannel) -> Unit = {}): Connection
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.selector.eventgroup

import io.ktor.network.selector.*
import io.ktor.network.sockets.*
import java.nio.channels.*

internal class ServerConnectionBasedSocket(
connection: Connection,
selector: SelectorManager,
socketOptions: SocketOptions.TCPClientSocketOptions? = null
) : NIOSocketImpl<SocketChannel>(connection, connection.channel, selector, pool = null, socketOptions = socketOptions),
Socket {
init {
require(!channel.isBlocking) { "Channel need to be configured as non-blocking." }
}

override val localAddress: SocketAddress
get() {
val localAddress = if (java7NetworkApisAvailable) {
channel.localAddress
} else {
channel.socket().localSocketAddress
}
return localAddress?.toSocketAddress()
?: throw IllegalStateException("Channel is not yet bound")
}

override val remoteAddress: SocketAddress
get() {
val remoteAddress = if (java7NetworkApisAvailable) {
channel.remoteAddress
} else {
channel.socket().remoteSocketAddress
}
return remoteAddress?.toSocketAddress()
?: throw IllegalStateException("Channel is not yet connected")
}
}
Loading

0 comments on commit 25355ef

Please sign in to comment.