Everything-claude-code kotlin-ktor-patterns
Ktor 服务器模式,包括路由 DSL、插件、身份验证、Koin DI、kotlinx.serialization、WebSockets 和 testApplication 测试。
install
source · Clone the upstream repo
git clone https://github.com/affaan-m/everything-claude-code
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/affaan-m/everything-claude-code "$T" && mkdir -p ~/.claude/skills && cp -r "$T/docs/zh-CN/skills/kotlin-ktor-patterns" ~/.claude/skills/affaan-m-everything-claude-code-kotlin-ktor-patterns && rm -rf "$T"
manifest:
docs/zh-CN/skills/kotlin-ktor-patterns/SKILL.mdsource content
Ktor 服务器模式
使用 Kotlin 协程构建健壮、可维护的 HTTP 服务器的综合 Ktor 模式。
何时启用
- 构建 Ktor HTTP 服务器
- 配置 Ktor 插件(Auth、CORS、ContentNegotiation、StatusPages)
- 使用 Ktor 实现 REST API
- 使用 Koin 设置依赖注入
- 使用 testApplication 编写 Ktor 集成测试
- 在 Ktor 中使用 WebSocket
应用程序结构
标准 Ktor 项目布局
src/main/kotlin/ ├── com/example/ │ ├── Application.kt # 入口点,模块配置 │ ├── plugins/ │ │ ├── Routing.kt # 路由定义 │ │ ├── Serialization.kt # 内容协商设置 │ │ ├── Authentication.kt # 认证配置 │ │ ├── StatusPages.kt # 错误处理 │ │ └── CORS.kt # CORS 配置 │ ├── routes/ │ │ ├── UserRoutes.kt # /users 端点 │ │ ├── AuthRoutes.kt # /auth 端点 │ │ └── HealthRoutes.kt # /health 端点 │ ├── models/ │ │ ├── User.kt # 领域模型 │ │ └── ApiResponse.kt # 响应封装 │ ├── services/ │ │ ├── UserService.kt # 业务逻辑 │ │ └── AuthService.kt # 认证逻辑 │ ├── repositories/ │ │ ├── UserRepository.kt # 数据访问接口 │ │ └── ExposedUserRepository.kt │ └── di/ │ └── AppModule.kt # Koin 模块 src/test/kotlin/ ├── com/example/ │ ├── routes/ │ │ └── UserRoutesTest.kt │ └── services/ │ └── UserServiceTest.kt
应用程序入口点
// Application.kt fun main() { embeddedServer(Netty, port = 8080, module = Application::module).start(wait = true) } fun Application.module() { configureSerialization() configureAuthentication() configureStatusPages() configureCORS() configureDI() configureRouting() }
路由 DSL
基本路由
// plugins/Routing.kt fun Application.configureRouting() { routing { userRoutes() authRoutes() healthRoutes() } } // routes/UserRoutes.kt fun Route.userRoutes() { val userService by inject<UserService>() route("/users") { get { val users = userService.getAll() call.respond(users) } get("/{id}") { val id = call.parameters["id"] ?: return@get call.respond(HttpStatusCode.BadRequest, "Missing id") val user = userService.getById(id) ?: return@get call.respond(HttpStatusCode.NotFound) call.respond(user) } post { val request = call.receive<CreateUserRequest>() val user = userService.create(request) call.respond(HttpStatusCode.Created, user) } put("/{id}") { val id = call.parameters["id"] ?: return@put call.respond(HttpStatusCode.BadRequest, "Missing id") val request = call.receive<UpdateUserRequest>() val user = userService.update(id, request) ?: return@put call.respond(HttpStatusCode.NotFound) call.respond(user) } delete("/{id}") { val id = call.parameters["id"] ?: return@delete call.respond(HttpStatusCode.BadRequest, "Missing id") val deleted = userService.delete(id) if (deleted) call.respond(HttpStatusCode.NoContent) else call.respond(HttpStatusCode.NotFound) } } }
使用认证路由组织路由
fun Route.userRoutes() { route("/users") { // Public routes get { /* list users */ } get("/{id}") { /* get user */ } // Protected routes authenticate("jwt") { post { /* create user - requires auth */ } put("/{id}") { /* update user - requires auth */ } delete("/{id}") { /* delete user - requires auth */ } } } }
内容协商与序列化
kotlinx.serialization 设置
// plugins/Serialization.kt fun Application.configureSerialization() { install(ContentNegotiation) { json(Json { prettyPrint = true isLenient = false ignoreUnknownKeys = true encodeDefaults = true explicitNulls = false }) } }
可序列化模型
@Serializable data class UserResponse( val id: String, val name: String, val email: String, val role: Role, @Serializable(with = InstantSerializer::class) val createdAt: Instant, ) @Serializable data class CreateUserRequest( val name: String, val email: String, val role: Role = Role.USER, ) @Serializable data class ApiResponse<T>( val success: Boolean, val data: T? = null, val error: String? = null, ) { companion object { fun <T> ok(data: T): ApiResponse<T> = ApiResponse(success = true, data = data) fun <T> error(message: String): ApiResponse<T> = ApiResponse(success = false, error = message) } } @Serializable data class PaginatedResponse<T>( val data: List<T>, val total: Long, val page: Int, val limit: Int, )
自定义序列化器
object InstantSerializer : KSerializer<Instant> { override val descriptor = PrimitiveSerialDescriptor("Instant", PrimitiveKind.STRING) override fun serialize(encoder: Encoder, value: Instant) = encoder.encodeString(value.toString()) override fun deserialize(decoder: Decoder): Instant = Instant.parse(decoder.decodeString()) }
身份验证
JWT 身份验证
// plugins/Authentication.kt fun Application.configureAuthentication() { val jwtSecret = environment.config.property("jwt.secret").getString() val jwtIssuer = environment.config.property("jwt.issuer").getString() val jwtAudience = environment.config.property("jwt.audience").getString() val jwtRealm = environment.config.property("jwt.realm").getString() install(Authentication) { jwt("jwt") { realm = jwtRealm verifier( JWT.require(Algorithm.HMAC256(jwtSecret)) .withAudience(jwtAudience) .withIssuer(jwtIssuer) .build() ) validate { credential -> if (credential.payload.audience.contains(jwtAudience)) { JWTPrincipal(credential.payload) } else { null } } challenge { _, _ -> call.respond(HttpStatusCode.Unauthorized, ApiResponse.error<Unit>("Invalid or expired token")) } } } } // Extracting user from JWT fun ApplicationCall.userId(): String = principal<JWTPrincipal>() ?.payload ?.getClaim("userId") ?.asString() ?: throw AuthenticationException("No userId in token")
认证路由
fun Route.authRoutes() { val authService by inject<AuthService>() route("/auth") { post("/login") { val request = call.receive<LoginRequest>() val token = authService.login(request.email, request.password) ?: return@post call.respond( HttpStatusCode.Unauthorized, ApiResponse.error<Unit>("Invalid credentials"), ) call.respond(ApiResponse.ok(TokenResponse(token))) } post("/register") { val request = call.receive<RegisterRequest>() val user = authService.register(request) call.respond(HttpStatusCode.Created, ApiResponse.ok(user)) } authenticate("jwt") { get("/me") { val userId = call.userId() val user = authService.getProfile(userId) call.respond(ApiResponse.ok(user)) } } } }
状态页(错误处理)
// plugins/StatusPages.kt fun Application.configureStatusPages() { install(StatusPages) { exception<ContentTransformationException> { call, cause -> call.respond( HttpStatusCode.BadRequest, ApiResponse.error<Unit>("Invalid request body: ${cause.message}"), ) } exception<IllegalArgumentException> { call, cause -> call.respond( HttpStatusCode.BadRequest, ApiResponse.error<Unit>(cause.message ?: "Bad request"), ) } exception<AuthenticationException> { call, _ -> call.respond( HttpStatusCode.Unauthorized, ApiResponse.error<Unit>("Authentication required"), ) } exception<AuthorizationException> { call, _ -> call.respond( HttpStatusCode.Forbidden, ApiResponse.error<Unit>("Access denied"), ) } exception<NotFoundException> { call, cause -> call.respond( HttpStatusCode.NotFound, ApiResponse.error<Unit>(cause.message ?: "Resource not found"), ) } exception<Throwable> { call, cause -> call.application.log.error("Unhandled exception", cause) call.respond( HttpStatusCode.InternalServerError, ApiResponse.error<Unit>("Internal server error"), ) } status(HttpStatusCode.NotFound) { call, status -> call.respond(status, ApiResponse.error<Unit>("Route not found")) } } }
CORS 配置
// plugins/CORS.kt fun Application.configureCORS() { install(CORS) { allowHost("localhost:3000") allowHost("example.com", schemes = listOf("https")) allowHeader(HttpHeaders.ContentType) allowHeader(HttpHeaders.Authorization) allowMethod(HttpMethod.Put) allowMethod(HttpMethod.Delete) allowMethod(HttpMethod.Patch) allowCredentials = true maxAgeInSeconds = 3600 } }
Koin 依赖注入
模块定义
// di/AppModule.kt val appModule = module { // Database single<Database> { DatabaseFactory.create(get()) } // Repositories single<UserRepository> { ExposedUserRepository(get()) } single<OrderRepository> { ExposedOrderRepository(get()) } // Services single { UserService(get()) } single { OrderService(get(), get()) } single { AuthService(get(), get()) } } // Application setup fun Application.configureDI() { install(Koin) { modules(appModule) } }
在路由中使用 Koin
fun Route.userRoutes() { val userService by inject<UserService>() route("/users") { get { val users = userService.getAll() call.respond(ApiResponse.ok(users)) } } }
用于测试的 Koin
class UserServiceTest : FunSpec(), KoinTest { override fun extensions() = listOf(KoinExtension(testModule)) private val testModule = module { single<UserRepository> { mockk() } single { UserService(get()) } } private val repository by inject<UserRepository>() private val service by inject<UserService>() init { test("getUser returns user") { coEvery { repository.findById("1") } returns testUser service.getById("1") shouldBe testUser } } }
请求验证
// Validate request data in routes fun Route.userRoutes() { val userService by inject<UserService>() post("/users") { val request = call.receive<CreateUserRequest>() // Validate require(request.name.isNotBlank()) { "Name is required" } require(request.name.length <= 100) { "Name must be 100 characters or less" } require(request.email.matches(Regex(".+@.+\\..+"))) { "Invalid email format" } val user = userService.create(request) call.respond(HttpStatusCode.Created, ApiResponse.ok(user)) } } // Or use a validation extension fun CreateUserRequest.validate() { require(name.isNotBlank()) { "Name is required" } require(name.length <= 100) { "Name must be 100 characters or less" } require(email.matches(Regex(".+@.+\\..+"))) { "Invalid email format" } }
WebSocket
fun Application.configureWebSockets() { install(WebSockets) { pingPeriod = 15.seconds timeout = 15.seconds maxFrameSize = 64 * 1024 // 64 KiB — increase only if your protocol requires larger frames masking = false // Server-to-client frames are unmasked per RFC 6455; client-to-server are always masked by Ktor } } fun Route.chatRoutes() { val connections = Collections.synchronizedSet<Connection>(LinkedHashSet()) webSocket("/chat") { val thisConnection = Connection(this) connections += thisConnection try { send("Connected! Users online: ${connections.size}") for (frame in incoming) { frame as? Frame.Text ?: continue val text = frame.readText() val message = ChatMessage(thisConnection.name, text) // Snapshot under lock to avoid ConcurrentModificationException val snapshot = synchronized(connections) { connections.toList() } snapshot.forEach { conn -> conn.session.send(Json.encodeToString(message)) } } } catch (e: Exception) { logger.error("WebSocket error", e) } finally { connections -= thisConnection } } } data class Connection(val session: DefaultWebSocketSession) { val name: String = "User-${counter.getAndIncrement()}" companion object { private val counter = AtomicInteger(0) } }
testApplication 测试
基本路由测试
class UserRoutesTest : FunSpec({ test("GET /users returns list of users") { testApplication { application { install(Koin) { modules(testModule) } configureSerialization() configureRouting() } val response = client.get("/users") response.status shouldBe HttpStatusCode.OK val body = response.body<ApiResponse<List<UserResponse>>>() body.success shouldBe true body.data.shouldNotBeNull().shouldNotBeEmpty() } } test("POST /users creates a user") { testApplication { application { install(Koin) { modules(testModule) } configureSerialization() configureStatusPages() configureRouting() } val client = createClient { install(io.ktor.client.plugins.contentnegotiation.ContentNegotiation) { json() } } val response = client.post("/users") { contentType(ContentType.Application.Json) setBody(CreateUserRequest("Alice", "alice@example.com")) } response.status shouldBe HttpStatusCode.Created } } test("GET /users/{id} returns 404 for unknown id") { testApplication { application { install(Koin) { modules(testModule) } configureSerialization() configureStatusPages() configureRouting() } val response = client.get("/users/unknown-id") response.status shouldBe HttpStatusCode.NotFound } } })
测试认证路由
class AuthenticatedRoutesTest : FunSpec({ test("protected route requires JWT") { testApplication { application { install(Koin) { modules(testModule) } configureSerialization() configureAuthentication() configureRouting() } val response = client.post("/users") { contentType(ContentType.Application.Json) setBody(CreateUserRequest("Alice", "alice@example.com")) } response.status shouldBe HttpStatusCode.Unauthorized } } test("protected route succeeds with valid JWT") { testApplication { application { install(Koin) { modules(testModule) } configureSerialization() configureAuthentication() configureRouting() } val token = generateTestJWT(userId = "test-user") val client = createClient { install(io.ktor.client.plugins.contentnegotiation.ContentNegotiation) { json() } } val response = client.post("/users") { contentType(ContentType.Application.Json) bearerAuth(token) setBody(CreateUserRequest("Alice", "alice@example.com")) } response.status shouldBe HttpStatusCode.Created } } })
配置
application.yaml
ktor: application: modules: - com.example.ApplicationKt.module deployment: port: 8080 jwt: secret: ${JWT_SECRET} issuer: "https://example.com" audience: "https://example.com/api" realm: "example" database: url: ${DATABASE_URL} driver: "org.postgresql.Driver" maxPoolSize: 10
读取配置
fun Application.configureDI() { val dbUrl = environment.config.property("database.url").getString() val dbDriver = environment.config.property("database.driver").getString() val maxPoolSize = environment.config.property("database.maxPoolSize").getString().toInt() install(Koin) { modules(module { single { DatabaseConfig(dbUrl, dbDriver, maxPoolSize) } single { DatabaseFactory.create(get()) } }) } }
快速参考:Ktor 模式
| 模式 | 描述 |
|---|---|
| 使用 DSL 进行路由分组 |
| 反序列化请求体 |
| 发送带状态的响应 |
| 读取路径参数 |
| 读取查询参数 |
| 安装并配置插件 |
| 使用身份验证保护路由 |
| Koin 依赖注入 |
| 集成测试 |
记住:Ktor 是围绕 Kotlin 协程和 DSL 设计的。保持路由精简,将逻辑推送到服务层,并使用 Koin 进行依赖注入。使用
testApplication 进行测试以获得完整的集成覆盖。