Claude-skill-registry kx-kotlin-support
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/kx-kotlin-support" ~/.claude/skills/majiayu000-claude-skill-registry-kx-kotlin-support && rm -rf "$T"
skills/data/kx-kotlin-support/SKILL.mdkx_kotlin_support 개발 가이드
📋 목차
1. 프로젝트 개요
kx_kotlin_support는 Kotlin 기반의 멀티프로젝트 유틸리티 라이브러리입니다.
핵심 가치
- AWS 네이티브: Kinesis, DynamoDB, S3, Lambda 등 AWS 서비스와 깊은 통합
- Kotlin 스러운 코드: 불변성, Flow, DSL, 확장 함수 중심 설계
- 3단계 의존성 관리: Core → Light → Heavy 계층 구조
- 실전 검증: 프로덕션 환경에서 검증된 패턴과 유틸리티
아키텍처
┌─────────────┐ │ Heavy │ Spring Boot, JPA, Hibernate, AWS CDK │ │ (웹서버 + RDS 환경) └─────────────┘ ↓ ┌─────────────┐ │ Light │ AWS SDK, Koin, Ktor, External APIs │ │ (AWS Lambda 환경) └─────────────┘ ↓ ┌─────────────┐ │ Core │ 최소 의존성 (JSON, CSV, Time, Collection) │ │ (모든 환경) └─────────────┘
주요 사용 사례
- AWS Lambda에서 대용량 CSV 파일 실시간 처리
- AWS CDK로 ECS 블루그린 배포 인프라 구축
- Spring Boot 웹 애플리케이션 + QueryDSL + JPA
- Kinesis Task/Worker 패턴으로 비동기 대량 데이터 처리
- Step Functions로 복잡한 배치 워크플로우 구현
2. 서브프로젝트 구조
Core 프로젝트
목적: 최소한의 의존성으로 모든 환경에서 사용 가능한 핵심 유틸리티
주요 특징:
- AWS SDK 의존성 없음
- Spring 의존성 없음
- 순수 Kotlin + 기본 라이브러리만 사용
핵심 패키지:
| 패키지 | 주요 기능 | 설명 |
|---|---|---|
| AI 텍스트 모델 클라이언트 | AI 모델 연동, 텍스트 입출력 처리 |
| 배치 처리 진행률 계산 | 대용량 배치 작업의 청크 분할 및 진행률 추적 |
| 컬렉션 확장 함수 | List/Map/Set 등의 확장 함수 (diff, flattenAny, groupByFirstCondition 등) |
| 동시성 처리 | 코루틴 실행기, 스레드 관리, StopWatch, CacheMap, MicroBatchQueue |
| 핵심 유틸리티 | 클래스 로딩, 데이터 변환, KDSL, 패키지명 처리 |
| CSV 파일 처리 | CSV 읽기/쓰기, 집계, Flow 변환 |
| 델리게이트 패턴 | Map 기반 속성 델리게이트 (MapAttribute) |
| 도메인 모델 | 개발자 정보, 메뉴, 쿼리, 트리 구조 등 공통 도메인 |
| 예외 처리 | KnownException, 예외 체이닝 유틸리티 |
| 파일 처리 | Gzip/Zip 압축, 파일명 처리, 랜덤 셔플 |
| Flow 확장 | Kotlin Flow 확장 함수 |
| HTML 생성 | HTML 태그 빌더, htmx 지원 |
| ID 생성기 | GUID 대용량 채번기 (하이/로우 방식) |
| 입출력 리소스 | InputResource/OutputResource 추상화 |
| JSON 처리 | Gson/Koson/JsonPath/Serialization 지원 |
| 숫자 확장 | Int/Long/Double/Boolean 확장 함수, 숫자 단축 표현 |
| 정규식 | 정규식 유틸리티 및 확장 |
| 재시도 로직 | 백오프 지원 재시도 템플릿 |
| 문자열 확장 | 문자열 변환/검증/암호화, 한글 처리, 결과 데이터 래퍼 |
| 시스템 유틸리티 | OS 타입, 배포 타입, 리소스 홀더, 시스템 구분자 |
| 시간/날짜 처리 | LocalDate/LocalDateTime/Duration 확장, 타임존 처리 |
| 검증 | Bean Validation, Konform, 조건부 검증 |
| XML 처리 | XML 데이터 파싱 및 처리 |
Light 프로젝트
목적: AWS Lambda 환경에서 사용하는 AWS 서비스 통합 및 외부 API 연동
주요 특징:
- AWS SDK v2 (Kotlin) 사용
- Koin DI 통합
- Ktor 클라이언트 기반 HTTP 통신
- Lambda SnapStart 최적화
핵심 패키지:
| 패키지 | 주요 기능 | 설명 |
|---|---|---|
| Athena 쿼리 | CloudTrail 등 테이블 정의, 쿼리 실행 및 결과 조회 |
| AWS Batch | 배치 작업 제출 및 관리 |
| Bedrock AI | Claude 등 AI 모델 호출, 프롬프트 관리 |
| Cognito | 사용자 풀 관리, 인증/인가 |
| DynamoDB | 테이블 CRUD, Enhanced Client, 멀티 인덱스, Lock 구현 |
| ECS | 컨테이너 서비스 관리 |
| EventBridge | 이벤트 발행 및 구독 |
| Firehose | 실시간 로그 스트리밍 |
| Kinesis | 실시간 대량 데이터 처리 (Task/Worker 패턴) |
| Lambda | 람다 함수 호출, 디스패치 패턴 (동기/비동기) |
| CloudWatch Logs | 로그 그룹/스트림 관리, 쿼리 |
| S3 | 파일 업로드/다운로드, 버킷 관리, Flow 지원 |
| SES | 이메일 발송 |
| Step Functions | 워크플로우 실행 및 관리 |
| SQS | 큐 메시지 발행/구독, Worker 패턴 |
| Systems Manager | 파라미터 스토어 관리 |
| 두레이 | 두레이 메신저 API 연동 |
| GitHub | GitHub API 연동, 저장소/이슈 관리 |
| Google API | Calendar, Drive, OTP, OAuth, Sheet, Vision 등 |
| Koin DI | Koin 의존성 주입 확장 |
| Ktor | Ktor 클라이언트 확장 |
| Notion API | Notion 페이지/데이터베이스 CRUD |
| OpenAI API | ChatGPT 등 OpenAI 모델 호출 |
| Slack API | Slack 메시지 발송 및 워크플로우 |
Heavy 프로젝트
목적: Spring Boot 웹 애플리케이션 및 RDS 데이터베이스 환경 지원
주요 특징:
- Spring Framework 전체 스택
- JPA + Hibernate + QueryDSL
- AWS CDK 인프라 구축 DSL
핵심 패키지:
| 패키지 | 주요 기능 | 설명 |
|---|---|---|
| AWS CDK | AWS CDK DSL (CICD, ECS, Lambda, SFN) |
| 데이터프레임 | Kotlin DataFrame 라이브러리 확장 |
| Excel 처리 | Apache POI 기반 엑셀 읽기/쓰기 |
| Hibernate | JPA PostListener, PhysicalNamingStrategy |
| JDBC | JDBC 연결 및 쿼리 실행 헬퍼 |
| JPA | Entity 탐색, 컬럼/테이블 정보 추출 |
| Kotlin QueryDSL | QueryDSL Kotlin 확장 (파라미터, Path 처리) |
| Spring Framework | Spring Batch, MVC, Security, WebFlux 확장 |
3. 코딩 표준 및 컨벤션
기본 원칙
1. 한글 문서화
- 모든 주석, 로그 메시지, 문서는 한글로 작성
- 표준 용어(AWS 리소스명 등)만 예외적으로 영어 사용
2. Kotlin 스러운 코드
사용 최소화 →var
선호val
사용 최소화 → 불변 컬렉션 선호mutableListOf- 확장 함수 적극 활용
- Flow/Sequence 활용한 지연 평가
3. 예외 처리
- 모든 예외는 반드시 처리되어야 함
- 불필요한
후 로깅만 하는 패턴 지양catch - 의미 있는 예외 처리 또는 상위로 전파
로거 사용법
설정 방식:
import mu.KotlinLogging class MyClass { companion object { private val log = KotlinLogging.logger {} } }
로그 작성 시:
// ✅ 올바른 방식 - {} 블록 사용 log.info { "데이터 ${data.size}개 처리 완료" } log.warn { "재시도 실패: ${error.message}" } log.debug { " -> 결과: $result" } // ❌ 잘못된 방식 - 즉시 평가 log.info("데이터 ${data.size}개 처리 완료") // 성능 저하
이유:
{} 블록을 사용하면 로그 레벨이 비활성화되었을 때 문자열 보간을 하지 않아 성능이 향상됩니다.
파일 구성
1. 클래스당 1개 파일
- 각 클래스는 독립된 파일로 분리
- 파일명 = 클래스명
2. 확장 함수는 xxxSupport.kt 파일에
MapSupport.kt // Map 확장 함수 ListStringSupport.kt // List<String> 확장 함수 S3Support.kt // S3 관련 확장 함수
3. 테스트 코드 위치
src/test/kotlin/net/kotlinx/[패키지명]/[클래스명]Test.kt
AWS SDK 사용 패턴
Paginated Flow 사용:
// ✅ 올바른 방식 - Paginated Flow fun listAllUsers(userPoolId: String): Flow<User> = cognito.listUsersPaginated { this.userPoolId = userPoolId }.flatMapConcat { it.users!!.asFlow() } // ❌ 잘못된 방식 - 단일 페이지만 가져옴 fun listUsers(userPoolId: String): List<User> = cognito.listUsers { this.userPoolId = userPoolId }.users!!
이유: AWS SDK의 List 계열 API는 기본적으로 페이징되어 있습니다. 전체 데이터를 가져오려면 Paginated Flow를 사용해야 합니다.
Retrofit2 생성 규칙
- REST API 1건당 1개 인터페이스
- 관련 데이터 객체는 같은 파일 내 정의
- 모든 인터페이스와 데이터 객체는 같은 접미어 사용
예시:
// DoorayDriveApi.kt interface DoorayDriveApi { @GET("/api/drive/files") suspend fun listFiles(): DoorayDriveListResponse // 관련 데이터 객체 data class DoorayDriveListResponse( val files: List<DoorayDriveFile> ) data class DoorayDriveFile( val id: String, val name: String ) }
Spring Framework 규칙
성공 응답:
// Spring Controller에서 성공 메시지 리턴 시 @PostMapping("/save") fun save(@RequestBody data: MyData): ApiResponse { myService.save(data) return ApiResponse(true, "데이터가 저장됨") }
IDE 컴파일 확인
- 작업 후 IDE의 컴파일 에러만 확인
- 별도의 gradle 명령은 실행하지 말 것
4. 주요 패키지 가이드
4.1 AWS Kinesis 실시간 대량 처리
핵심 개념: Task/Worker 패턴으로 Kinesis를 통한 비동기 대량 데이터 처리
요구사항:
- 고속 / 병렬 처리가 저렴하게 가능 (샤드1개 월 1.3만원으로 초당 1000개 처리)
- 수평 확장/축소 가능 (런타임에 샤드 수 조정 가능)
- 대용량 데이터 처리 가능 (청크단위 요청/응답 처리)
- 실시간에 가까운(1초 이내도 가능) 반응
- 요청 / 응답을 flow로 간단하게 사용할 수 있어야함
- timeout 기능이 있어야 함
KinesisTask (요청자)
val task = KinesisTask { streamName = "worker-stream" checkpointTableName = "system-dev" taskName = "demoTaskJob" checkpointTtl = 1.hours } // 대용량 파일을 Flow로 읽어서 처리 val file: File by ResourceHolder.WORKSPACE.slash("largeFile.csv") lazyLoad "s3://xxxa/demo/largeFile.csv" val flow = file.toInputResource().toFlow() .map { line -> json { "id" to line[0] "query" to line[1] } } .chunked(1000) // Task 실행 - 결과를 Flow로 수신 task.execute(flow).collect { datas -> datas.forEach { log.debug { " => [${it}]" } } }
KinesisWorker (처리자)
val worker = KinesisWorker { streamName = "worker-stream" checkpointTableName = "system-dev" handler = { records -> log.info { "워커 테스트: ${records.size}개의 레코드 처리" } records.forEach { it.result.put("processed", true) it.result.put("time", java.time.LocalDateTime.now().toKr01()) log.debug { " -> ${it.result}" } 100.milliseconds.delay() //0.1초에 1개씩 처리 } } readChunkCnt = 100 shardCheckInterval = 10.minutes } worker.start()
사용 시나리오:
- 대용량 CSV 파일 처리 (수십만~수백만 건)
- 실시간에 가까운 처리 필요 (1초 이내 반응)
- 수평 확장 가능 (샤드 수 조정)
- 비용 효율적 (샤드 1개 = 월 1.3만원, 초당 1000개 처리)
주의사항:
- 오류 처리시 중단시점부터 재시도하는 기능은 없음
- collector를 csv로 만들어서 셀프 구현 필요
4.2 AWS CDK 인프라 구축
CICD 파이프라인 (GitHub + CodeBuild + CodePipeline)
val stack = this val infra = koin<MyInfra>() val workBucket = infra.s3.work.load(stack) val appRole = MyRole.APP_ADMIN.load(stack) val securityGroup = MySecurityGroup.JOB.load(stack) val toAdmin = infra.topic.adminAll.load(stack) val build = CdkCodeBuild { chacheBucket = workBucket.iBucket role = appRole.iRole vpc = infra.vpc.iVpc securityGroups = listOf(securityGroup.iSecurityGroup) concurrentBuildLimit = 1 //AWS 오류.. gradleVersion = "8.12.1" gradleCmds(":deployAll") byGithub(MyProject.GITHUB_ROOT, MyProject.PROJECT_DMP) create(stack) } CdkCodePipeline { codeBuild = build.codeBuild role = appRole.iRole topics = listOf(toAdmin) events = when (deploymentType) { DeploymentType.PROD -> listOf(EventSets.CodekPipeline.FAILED) //후킹이 걸려있기 때문에 빌드 성공은 필요없음 DeploymentType.DEV -> listOf(EventSets.CodekPipeline.FAILED, EventSets.CodekPipeline.SUCCESSED) } byGithub(MyProject.GITHUB_ROOT, MyProject.PROJECT_DMP, "arn:aws:codeconnections:ap-northeast-2:xxxx") create(stack) }
ECS 블루그린 배포
val infra = koin<MyInfra>() val ecr = infra.ecr.api.load(stack) val webConfig = MyEcs.ECS_CONFIGS[CdkInterface.DEPLOYMENT_TYPE]!! val web = CdkEcsWeb { name = "api" config = webConfig taskRole = MyRole.APP_ADMIN.load(stack).iRole executionRole = MyRole.ECS_TASK.load(stack).iRole image = ecr.imageFromStackByTag(deploymentType.name.lowercase()) vpc = infra.vpc.load(stack).iVpc sgWeb = MySecurityGroup.API.load(stack).iSecurityGroup sgAlb = MySecurityGroup.ALB.load(stack).iSecurityGroup containerInsights = deploymentType == DeploymentType.PROD environment += mapOf( AwsNaming.Spring.ENV_PROFILE to "default,${CdkInterface.SUFF}" ) certs = listOf(MySms.CERT_DMP.get(stack)) healthCheck = HealthCheck.builder() .interval(20.seconds.toCdk()) .timeout(10.seconds.toCdk()) .healthyThresholdCount(2) //디폴트인 5로 하면 체크 전에 내려갈 수 있음. .unhealthyThresholdCount(2) .path("/api/healthcheck") .build() when (CdkInterface.DEPLOYMENT_TYPE) { DeploymentType.PROD -> createServiceBlueGreen(stack) //라이브서버는 블루그린 배포 DeploymentType.DEV -> createServiceRolling(stack) } cdkLogGroup.addLogAnomalyDetector(stack) } //도메인 등록하기 val hostedZone = HostedZoneUtil.load(stack, "xxx.com") val domain = MyEcs.DOMAINS[CdkInterface.DEPLOYMENT_TYPE]!! Route53Util.arecord(stack, hostedZone, domain, web.alb.toRecordTarget())
Step Functions 대량데이터 분할처리
CdkSfn(project, "batch_step") { this.lambda = func this.iRole = role.iRole val stepStart = lambda("StepStart") val stepEnd = lambda("StepEnd") val modeMap = listOf( mapInline("StepMap") { next = stepEnd.stateId itemPath = "$.option.${stepStart.stateId}.body.datas" }, stepEnd, ).join() val listMode = run { val stepList = lambda("StepList") val waitColdstart = wait("WaitColdstart") { this.secondsPath = "${AwsNaming.option}.${AwsNaming.waitColdstartSeconds}" } val waitIpBlock = wait("WaitIpBlock") { this.secondsPath = "${AwsNaming.option}.${AwsNaming.waitSeconds}" } listOf( stepList, choice("IsCompleted").apply { whenMatchesBody(stepList.stateId, AwsNaming.choiceFirst, waitColdstart, stepList) whenMatchesBody(stepList.stateId, AwsNaming.choiceRetry, waitIpBlock, stepList) otherwise(stepEnd) }, ).join() } create( stepStart, choice("WhenMode").apply { whenMatches("mode", "List", listMode) otherwise(modeMap) }, ) onErrorHandle(adminAllTopic, dlq.iQueue) }
4.3 DynamoDB 사용 패턴
Enhanced Client 사용
// 데이터 클래스 정의 @DynamoDbBean data class UserRecord( @get:DynamoDbPartitionKey var userId: String = "", @get:DynamoDbSortKey var timestamp: String = "", var name: String = "", var email: String = "" ) // Enhanced Client 사용 val table = dynamoDbEnhancedClient.table("users", TableSchema.fromBean(UserRecord::class.java)) // 저장 table.putItem(UserRecord( userId = "user123", timestamp = LocalDateTime.now().toIso(), name = "홍길동", email = "hong@example.com" )) // 조회 val user = table.getItem(Key.builder() .partitionValue("user123") .sortValue(timestamp) .build()) // 쿼리 val results = table.query { r -> r.queryConditional( QueryConditional.keyEqualTo(Key.builder() .partitionValue("user123") .build()) ) }
DynamoDB 분산 락
val dynamoLock = DynamoLock { aws = awsClient tableName = "locks" ttl = 5.minutes } dynamoLock.withLock("my-resource-id") { // 크리티컬 섹션 - 다른 인스턴스에서 동시 실행 방지 processImportantData() }
4.4 S3 파일 처리
// S3Data로 경로 관리 val s3File = S3Data.parse("s3://my-bucket/path/to/file.csv") // 파일 업로드 s3.putObject(s3File, file.readBytes()) // 파일 다운로드 val bytes = s3.getObject(s3File) // CSV 파일을 Flow로 읽기 val flow: Flow<List<String>> = s3File.toInputResource().toFlow() flow.collect { line -> log.info { "라인: ${line.joinToString(",")}" } } // S3에 직접 쓰기 (Flow) val outputFlow: Flow<String> = flowOf("header1,header2", "value1,value2") s3File.toOutputResource().writeFlow(outputFlow)
4.5 JSON 처리 (GsonData)
GsonData: 동적 JSON 조작을 위한 래퍼 클래스 (타입 안전성 낮지만 유연함)
주의: kotlin의 엄격한 객체 정의와 어울리지 않으므로 로직에 가급적 사용 금지. 모든 이상은 예외 대신 null을 리턴함.
// JSON 생성 val json = GsonData.obj { put("name", "홍길동") put("age", 30) put("active", true) } // JSON 파싱 val parsed = GsonData.parse("""{"name":"홍길동","age":30}""") // 값 읽기 val name = parsed["name"].str // "홍길동" val age = parsed["age"].int // 30 // 중첩 접근 val nested = GsonData.parse("""{"user":{"profile":{"name":"홍길동"}}}""") val userName = nested["user"]["profile"]["name"].str // JsonPath 사용 val nameByPath = nested["$.user.profile.name"].str // 배열 처리 val array = GsonData.array { add("item1") add("item2") add(GsonData.obj { put("key", "value") }) } // 반복 array.forEach { item -> log.info { "Item: $item" } } // 수정 json.put("age", 31) json.put("newField", "newValue") // 삭제 json.remove("active") // 병합 val json2 = GsonData.obj { put("email", "hong@example.com") } val merged = json + json2
사용 시나리오:
- Lambda 함수에서 이벤트 파싱
- 외부 API 응답 처리 (스키마가 유동적인 경우)
- 로그 데이터 집계
- 주의: 프로덕션 로직에는 가급적 사용 금지 (타입 안전성 부족)
4.6 CSV 처리
// CSV 파일 읽기 val csvFile = File("/path/to/data.csv") val records: Flow<List<String>> = csvFile.toInputResource().toFlow() // CSV 파싱 + 변환 records .drop(1) // 헤더 스킵 .map { line -> User( id = line[0], name = line[1], email = line[2] ) } .collect { user -> processUser(user) } // CSV 쓰기 val output = File("/path/to/output.csv") output.toOutputResource().use { resource -> resource.writeLine(listOf("ID", "Name", "Email")) // 헤더 users.forEach { user -> resource.writeLine(listOf(user.id, user.name, user.email)) } } // CSV 집계 val aggregated = csvFile.toInputResource() .aggregation<MyCsvLine>() // 타입 추론 .sum { it.amount }
4.7 Time 처리
// Duration 확장 val duration = 5.minutes val milliseconds = duration.toMillis() duration.delay() // suspend 함수 // LocalDate 확장 val today = LocalDate.now() val yesterday = today.minusDays(1) val formatted = today.toKr01() // "2025-01-15" // LocalDateTime 확장 val now = LocalDateTime.now() val isoFormat = now.toIso() // "2025-01-15T14:30:00" val krFormat = now.toKr01() // "2025-01-15 14:30:00" // Delay 100.milliseconds.delay() // suspend 함수
4.8 Spring Batch
@Configuration class BatchJobConfig { @Bean fun myJob( jobBuilderFactory: JobBuilderFactory, stepBuilderFactory: StepBuilderFactory ): Job { return jobBuilderFactory.get("myJob") .start(myStep(stepBuilderFactory)) .build() } fun myStep(stepBuilderFactory: StepBuilderFactory): Step { return stepBuilderFactory.get("myStep") .chunk<InputData, OutputData>(100) .reader(itemReader()) .processor(itemProcessor()) .writer(itemWriter()) .build() } fun itemReader(): ItemReader<InputData> { // CSV 또는 DB에서 데이터 읽기 } fun itemProcessor(): ItemProcessor<InputData, OutputData> { return ItemProcessor { input -> // 데이터 변환 OutputData(input.id, input.name.uppercase()) } } fun itemWriter(): ItemWriter<OutputData> { return ItemWriter { items -> items.forEach { processOutput(it) } } } }
4.9 QueryDSL (kqdsl)
// QueryDSL + Kotlin 확장 val qUser = QUser.user val results = queryFactory .selectFrom(qUser) .where( qUser.name.eq("홍길동"), qUser.age.gt(20) ) .orderBy(qUser.createdAt.desc()) .fetch() // 동적 쿼리 fun searchUsers(name: String?, minAge: Int?): List<User> { return queryFactory .selectFrom(qUser) .where( name?.let { qUser.name.contains(it) }, minAge?.let { qUser.age.goe(it) } ) .fetch() } // 페이징 val pageable = PageRequest.of(0, 20) val page = queryFactory .selectFrom(qUser) .offset(pageable.offset) .limit(pageable.pageSize.toLong()) .fetch()
4.10 Koin DI
// 모듈 정의 val myModule = module { single { AwsClient() } // 싱글톤 single { S3Client(get()) } // 의존성 주입 factory { KinesisTask() } // 매번 새로운 인스턴스 } // Koin 시작 startKoin { modules(myModule) } // 의존성 가져오기 val awsClient: AwsClient by inject() val s3Client = get<S3Client>() // 레이지 로딩 val kinesis: KinesisTask by koinLazy()
5. 일반적인 사용 패턴
패턴 A: Lambda 함수에서 대용량 CSV 처리
class CsvProcessorLambda : RequestHandler<S3Event, String> { private val aws by koinLazy<AwsClient>() private val log = KotlinLogging.logger {} override fun handleRequest(event: S3Event, context: Context): String { event.records.forEach { record -> val s3Data = S3Data(record.s3.bucket.name, record.s3.`object`.key) // CSV를 Flow로 읽어서 처리 runBlocking { s3Data.toInputResource().toFlow() .drop(1) // 헤더 스킵 .chunked(1000) // 1000개씩 배치 처리 .collect { batch -> processBatch(batch) } } } return "처리 완료" } private suspend fun processBatch(lines: List<List<String>>) { log.info { "${lines.size}개 라인 처리 중..." } // DynamoDB 저장, Kinesis 전송 등 } }
패턴 B: Spring Boot API에서 페이징 + 엑셀 다운로드
@RestController @RequestMapping("/api/users") class UserController( private val userRepository: UserRepository, private val queryFactory: JPAQueryFactory ) { companion object { private val log = KotlinLogging.logger {} } @GetMapping fun listUsers(pageable: Pageable): Page<User> { val qUser = QUser.user return queryFactory .selectFrom(qUser) .orderBy(qUser.createdAt.desc()) .offset(pageable.offset) .limit(pageable.pageSize.toLong()) .fetchPage(pageable) } @GetMapping("/export") fun exportToExcel(response: HttpServletResponse) { response.contentType = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" response.setHeader("Content-Disposition", "attachment; filename=users.xlsx") val users = userRepository.findAll() ExcelWriter().use { excel -> excel.createSheet("Users") excel.writeHeader(listOf("ID", "Name", "Email", "Created")) users.forEach { user -> excel.writeRow(listOf( user.id.toString(), user.name, user.email, user.createdAt.toKr01() )) } excel.write(response.outputStream) } } }
패턴 C: AWS CDK로 전체 인프라 구축
class MyStack(scope: Construct, id: String) : Stack(scope, id) { init { // VPC val vpc = CdkVpc { create(this@MyStack) } // Security Groups val webSg = CdkSecurityGroup { vpc = vpc.iVpc description = "Web 서버 SG" create(this@MyStack) } // ECR val ecr = CdkEcr { repositoryName = "my-app" create(this@MyStack) } // ECS Cluster val cluster = Cluster(this@MyStack, "Cluster", ClusterProps.builder() .vpc(vpc.iVpc) .build()) // ALB + ECS Service (블루그린) val web = CdkEcsWeb { name = "api" image = ecr.imageFromStackByTag("latest") vpc = vpc.iVpc sgWeb = webSg.iSecurityGroup createServiceBlueGreen(this@MyStack) } // Route53 val hostedZone = HostedZoneUtil.load(this@MyStack, "example.com") Route53Util.arecord(this@MyStack, hostedZone, "api.example.com", web.alb.toRecordTarget()) // CodePipeline (CICD) val build = CdkCodeBuild { gradleCmds(":bootJar", ":jib") byGithub("owner", "repo") create(this@MyStack) } CdkCodePipeline { codeBuild = build.codeBuild byGithub("owner", "repo", "arn:aws:codeconnections:...") create(this@MyStack) } } }
패턴 D: Kinesis + Lambda 실시간 처리 파이프라인
// Producer: 대용량 데이터를 Kinesis로 전송 class DataProducerLambda : RequestHandler<S3Event, String> { companion object { private val log = KotlinLogging.logger {} } private val task = KinesisTask { streamName = "data-stream" checkpointTableName = "checkpoints" taskName = "producerJob" } override fun handleRequest(event: S3Event, context: Context): String { runBlocking { val s3Data = S3Data.parse(event.records.first().s3.bucket.name, ...) val flow = s3Data.toInputResource().toFlow() .map { line -> json { "id" to line[0] "data" to line[1] } } .chunked(1000) task.execute(flow).collect { results -> log.info { "${results.size}개 처리 완료" } } } return "OK" } } // Consumer: Kinesis에서 데이터 읽어서 처리 class DataConsumerLambda { companion object { private val log = KotlinLogging.logger {} } private val worker = KinesisWorker { streamName = "data-stream" checkpointTableName = "checkpoints" readerName = "consumer01" handler = { records -> // DynamoDB에 저장 records.forEach { record -> val data = record.result dynamoTable.putItem(...) record.result.put("processed", true) } } } fun start() { runBlocking { worker.start() // 무한 루프로 실행 } } }
6. 테스트 작성 가이드
테스트 기본 구조
위치:
src/test/kotlin/net/kotlinx/[패키지명]/
형식: kotest BDD 스타일
class MyServiceTest : BeSpecHeavy() { init { initTest(KotestUtil.PROJECT) Given("사용자 데이터가 준비되어 있을 때") { val user = User( id = "user123", name = "홍길동", email = "hong@example.com" ) When("사용자를 저장하면") { val saved = userService.save(user) Then("정상적으로 저장되어야 한다") { saved shouldNotBe null saved.id shouldBe user.id saved.name shouldBe user.name } } When("존재하지 않는 사용자를 조회하면") { Then("null을 반환해야 한다") { val notFound = userService.findById("not-exist") notFound shouldBe null } } } Given("여러 사용자가 있을 때") { val users = listOf( User("user1", "홍길동", "hong@example.com"), User("user2", "김철수", "kim@example.com") ) users.forEach { userService.save(it) } When("전체 사용자를 조회하면") { val all = userService.findAll() Then("모든 사용자가 반환되어야 한다") { all.size shouldBeGreaterThanOrEqual 2 } } } } }
Core 프로젝트 테스트
class CollectionSupportTest : BeSpecLog() { init { initTest(KotestUtil.FAST) Given("리스트가 주어졌을 때") { val list = listOf(1, 2, 3, 4, 5) Then("chunked가 정상 동작해야 한다") { val chunks = list.chunked(2) chunks.size shouldBe 3 chunks[0] shouldBe listOf(1, 2) chunks[2] shouldBe listOf(5) } } } }
Light 프로젝트 테스트 (AWS 통합)
class S3ServiceTest : BeSpecLight() { private val s3: S3Client by koinLazy() init { initTest(KotestUtil.INTEGRATION) Given("S3 버킷이 있을 때") { val bucket = "test-bucket" val key = "test/file.txt" val s3Data = S3Data(bucket, key) When("파일을 업로드하면") { val content = "Hello World" s3.putObject(s3Data, content.toByteArray()) Then("파일이 정상적으로 업로드되어야 한다") { val downloaded = s3.getObject(s3Data) String(downloaded) shouldBe content } } xThen("파일을 삭제하면") { // x = skip s3.deleteObject(s3Data) shouldThrow<NoSuchKeyException> { s3.getObject(s3Data) } } } } }
Heavy 프로젝트 테스트 (Spring + JPA)
@SpringBootTest class UserRepositoryTest : BeSpecHeavy() { @Autowired private lateinit var userRepository: UserRepository @Autowired private lateinit var entityManager: EntityManager init { initTest(KotestUtil.PROJECT) Given("사용자 엔티티가 준비되어 있을 때") { val user = User( name = "홍길동", email = "hong@example.com" ) When("엔티티를 저장하면") { val saved = userRepository.save(user) entityManager.flush() entityManager.clear() Then("ID가 자동 생성되어야 한다") { saved.id shouldNotBe null } Then("저장된 데이터를 조회할 수 있어야 한다") { val found = userRepository.findById(saved.id!!).orElse(null) found shouldNotBe null found.name shouldBe "홍길동" } } } } }
Mock 사용 지양 원칙
CLAUDE.md 가이드: mock 객체를 사용할 필요 없고 해당 객체를 koin 등으로 가져와서 직접 실행
// ❌ 잘못된 방식 - Mock 사용 class MyServiceTest : BeSpecHeavy() { @MockK private lateinit var userRepository: UserRepository init { every { userRepository.findById(any()) } returns User(...) // ... } } // ✅ 올바른 방식 - 실제 객체 사용 class MyServiceTest : BeSpecHeavy() { private val userRepository: UserRepository by koinLazy() init { // 실제 DB 또는 테스트 DB 사용 userRepository.save(User(...)) val found = userRepository.findById("user123") // ... } }
7. 트러블슈팅
문제: Kinesis Task가 타임아웃됨
증상:
task.execute() 호출 후 결과를 받지 못하고 타임아웃
원인:
- Worker가 실행되지 않음
- Partition Key가 잘못 설정됨
- Checkpoint 테이블 권한 문제
해결:
-
Worker가 실행 중인지 확인
worker.start() // 별도 프로세스에서 실행 필요 -
Partition Key 확인
// Task는 "taskName-taskId-in" 형식으로 전송 // Worker는 "in" 타입만 읽음 -
DynamoDB 테이블 존재 및 권한 확인
aws dynamodb describe-table --table-name checkpoints
문제: GsonData에서 null 값 처리
증상:
gson["key"].str 호출 시 NPE 발생
원인: GsonData는 null을 JsonNull로 래핑하지만,
.str 호출 시 null 반환
해결:
// ✅ 올바른 방식 - null 체크 val value = gson["key"].str ?: "기본값" // ✅ lett 사용 gson["key"].lett { value -> // value가 비어있지 않을 때만 실행 log.info { "값: ${value.str}" } } // ❌ 잘못된 방식 val value = gson["key"].str!! // NPE 위험
문제: AWS CDK 배포 시 권한 오류
증상:
cdk deploy 실행 시 권한 오류
원인: IAM Role에 필요한 권한이 없음
해결:
- CloudFormation 실행 권한 확인
- 생성하려는 리소스의 권한 확인 (ECS, Lambda 등)
- 필요시 AdministratorAccess 권한으로 테스트
// Role에 권한 추가 role.addManagedPolicy(ManagedPolicy.fromAwsManagedPolicyName("AmazonECSFullAccess"))
문제: Spring Boot에서 QueryDSL Q클래스 생성 안 됨
증상: QUser, QOrder 등의 Q클래스를 찾을 수 없음
원인: Annotation Processor가 실행되지 않음
해결:
-
Gradle에서 kapt 플러그인 확인
plugins { kotlin("kapt") version "1.9.0" } dependencies { kapt("com.querydsl:querydsl-apt:5.0.0:jakarta") } -
IDE에서 빌드 실행
./gradlew clean build -
IntelliJ에서 "Annotation Processors" 활성화
- Settings → Build → Compiler → Annotation Processors
- "Enable annotation processing" 체크
문제: S3 파일 다운로드 시 메모리 부족
증상: 대용량 파일(수 GB) 다운로드 시 OutOfMemoryError
원인: 전체 파일을 메모리에 로드
해결:
// ❌ 잘못된 방식 - 전체 파일 로드 val bytes = s3.getObject(s3Data) processBytes(bytes) // ✅ 올바른 방식 - 스트리밍 처리 s3Data.toInputResource().toFlow() .collect { line -> processLine(line) // 라인별 처리 }
문제: Kotest 테스트가 실행되지 않음
증상: IntelliJ에서 테스트 실행 버튼이 표시되지 않음
원인: Kotest 플러그인 미설치
해결:
-
IntelliJ Kotest 플러그인 설치
- Settings → Plugins → "Kotest" 검색 → 설치
-
Gradle 의존성 확인
testImplementation("io.kotest:kotest-runner-junit5:5.5.0") testImplementation("io.kotest:kotest-assertions-core:5.5.0")
8. 요약 테이블
Core 프로젝트 핵심 클래스
| 패키지 | 핵심 클래스/함수 | 설명 |
|---|---|---|
| | 동적 JSON 조작 래퍼 |
| | CSV → Flow 변환 |
| , | 시간 확장 함수 |
| , | 컬렉션 확장 |
| , | 코루틴 실행기, 캐시 |
| , | I/O 추상화 |
Light 프로젝트 핵심 클래스
| 패키지 | 핵심 클래스/함수 | 설명 |
|---|---|---|
| , | 실시간 대량 처리 (Task/Worker 패턴) |
| , | DynamoDB Enhanced + 분산 락 |
| , | S3 파일 처리 |
| , | Lambda 함수 호출 패턴 |
| | SQS Worker 패턴 |
| , | Koin DI 확장 |
| | Notion API 연동 |
| | OpenAI ChatGPT API |
Heavy 프로젝트 핵심 클래스
| 패키지 | 핵심 클래스/함수 | 설명 |
|---|---|---|
| , | CICD 파이프라인 (GitHub + CodeBuild) |
| | ECS 블루그린/롤링 배포 |
| | Step Functions 배치 처리 |
| | Spring Batch 확장 |
| | Kotlin QueryDSL 확장 |
| , | JPA/Hibernate 확장 |
| , | Apache POI Excel 처리 |
빠른 참조: 코딩 체크리스트
개발 시 다음을 확인하세요:
- 로거:
+companion object
+KotlinLogging.logger {}
블록 사용log.info { } - 불변성:
→var
,val
→mutableListOf
선호listOf - 확장 함수: 새 확장 함수는
파일에 추가xxxSupport.kt - AWS SDK: Paginated API는 Flow로 변환 (
)flatMapConcat - 예외 처리: 모든 예외는 반드시 처리 (불필요한 catch 후 로그만 지양)
- 테스트: kotest BDD 스타일, Mock 대신 실제 객체 사용
- 파일 구조: 클래스당 1개 파일, 테스트는
src/test/kotlin - 문서화: 한글로 작성 (표준 용어만 영어)
- 컴파일 확인: IDE 컴파일 에러만 확인 (별도 gradle 명령 X)
추가 학습 자료
- README.md: 프로젝트 개요 및 주요 예시 코드
- CLAUDE.md: 코딩 표준 및 가이드라인
- 각 패키지의 Support.kt 파일: 확장 함수 패턴 학습
- test 디렉토리: 실제 사용 예시 코드
이 Skill은 kx_kotlin_support 라이브러리를 사용한 개발을 지원하기 위해 작성되었습니다.