Skip to content

da-bom/lib-kafka

Folders and files

NameName
Last commit message
Last commit date

Latest commit

ย 

History

24 Commits
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 

Repository files navigation

lib-kafka

DABOM ํ”„๋กœ์ ํŠธ ๊ณต์šฉ Kafka ๋ฉ”์‹œ์ง• ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ

ํ•ญ๋ชฉ ๊ฐ’
Artifact com.github.da-bom:lib-kafka
ํ˜„์žฌ ๋ฒ„์ „ 1.0.1
Java 21
Spring Boot 3.4.0
๋ฐฐํฌ JitPack

1. ๋ชฉ์ 

DABOM ๋งˆ์ดํฌ๋กœ์„œ๋น„์Šค์—์„œ ๋ฐ˜๋ณต๋˜๋Š” Kafka ๊ด€๋ จ ์ฝ”๋“œ๋ฅผ ํ•˜๋‚˜์˜ ๊ณตํ†ต ๋ชจ๋“ˆ๋กœ ํ†ตํ•ฉํ•œ๋‹ค.

์ด ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๊ฐ€ ์ œ๊ณตํ•˜๋Š” ๊ฒƒ:

  • Kafka Producer / Consumer ์ž๋™ ์„ค์ • - ํŒฉํ† ๋ฆฌ, ํ…œํ”Œ๋ฆฟ, ๋ฆฌ์Šค๋„ˆ ์ปจํ…Œ์ด๋„ˆ๋ฅผ ํ‘œ์ค€ ์„ค์ •์œผ๋กœ ๊ตฌ์„ฑ
  • ์ด๋ฒคํŠธ ์—”๋ฒจ๋กœํ”„ - ๋ชจ๋“  ๋ฉ”์‹œ์ง€๋ฅผ EventEnvelope<T> ํฌ๋งท์œผ๋กœ ํ†ต์ผ
  • ๊ณ„์•ฝ ์ƒ์ˆ˜ - ํ† ํ”ฝ๋ช…, ์ด๋ฒคํŠธ ํƒ€์ž…, ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน์„ ์ฝ”๋“œ ๋ ˆ๋ฒจ ์ƒ์ˆ˜๋กœ ๊ด€๋ฆฌ
  • ์˜ˆ์™ธ ๋ถ„๋ฅ˜ & ์—๋Ÿฌ ํ•ธ๋“ค๋ง - ์˜ˆ์™ธ ํƒ€์ž…๋ณ„ retry / ignore / DLQ ์ž๋™ ๋ผ์šฐํŒ…
  • ์šด์˜ ๋ฉ”ํŠธ๋ฆญ - Micrometer ๊ธฐ๋ฐ˜ producer/consumer ์„ฑ๊ณต๋ฅ , ์ง€์—ฐ์‹œ๊ฐ„, DLT ์นด์šดํŠธ

์ด ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๊ฐ€ ์ œ๊ณตํ•˜์ง€ ์•Š๋Š” ๊ฒƒ:

  • Outbox polling / ์ƒํƒœ ์ „์ด / ์Šค์ผ€์ค„๋ง
  • ์„œ๋น„์Šค ๊ณ ์œ  ๋น„์ฆˆ๋‹ˆ์Šค ๋กœ์ง
  • Kafka ํด๋Ÿฌ์Šคํ„ฐ ํ”„๋กœ๋น„์ €๋‹

2. ํŒจํ‚ค์ง€ ๊ตฌ์กฐ

com.dabom.messaging.kafka
โ”œโ”€โ”€ autoconfigure/             Kafka ๋นˆ ์ž๋™ ์„ค์ •
โ”‚   โ”œโ”€โ”€ KafkaConfig                Producer/Consumer Factory, KafkaTemplate, ListenerContainerFactory
โ”‚   โ””โ”€โ”€ KafkaErrorHandlerConfig    CommonErrorHandler, DLT Publisher, ExceptionClassifier
โ”œโ”€โ”€ contract/                  ๋ฉ”์‹œ์ง• ๊ณ„์•ฝ ์ƒ์ˆ˜
โ”‚   โ”œโ”€โ”€ KafkaTopics                ํ† ํ”ฝ๋ช…
โ”‚   โ”œโ”€โ”€ KafkaEventTypes            ์ด๋ฒคํŠธ ํƒ€์ž… ๋ฌธ์ž์—ด
โ”‚   โ””โ”€โ”€ KafkaConsumerGroups        ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน ID
โ”œโ”€โ”€ error/                     ์˜ˆ์™ธ ๋ถ„๋ฅ˜ & ๋„๋ฉ”์ธ ์˜ˆ์™ธ
โ”‚   โ”œโ”€โ”€ KafkaExceptionClassifier   ์˜ˆ์™ธ โ†’ ์•ก์…˜ ๋งคํ•‘
โ”‚   โ”œโ”€โ”€ KafkaMessageProcessingException
โ”‚   โ”œโ”€โ”€ NonRetryableKafkaMessageProcessingException
โ”‚   โ”œโ”€โ”€ KafkaMessageDeserializationException
โ”‚   โ”œโ”€โ”€ KafkaErrorAction           RETRY / IGNORE / DLQ
โ”‚   โ”œโ”€โ”€ KafkaErrorCode             ๋ถ„๋ฅ˜ ์ฝ”๋“œ enum
โ”‚   โ””โ”€โ”€ KafkaErrorDecision         (action, code) ์Œ
โ”œโ”€โ”€ event/
โ”‚   โ”œโ”€โ”€ dto/                   ํŽ˜์ด๋กœ๋“œ ๋ ˆ์ฝ”๋“œ
โ”‚   โ”‚   โ”œโ”€โ”€ EventEnvelope<T>       ๊ณตํ†ต ๋ฉ”์‹œ์ง€ ๋ž˜ํผ
โ”‚   โ”‚   โ”œโ”€โ”€ notification/          NotificationPayload, NotificationType, NotificationEventSupport
โ”‚   โ”‚   โ”œโ”€โ”€ policy/                PolicyUpdatedPayload
โ”‚   โ”‚   โ””โ”€โ”€ usage/                 UsagePayload, UsageRealtimePayload
โ”‚   โ”œโ”€โ”€ consumer/              ์†Œ๋น„ ์ธํ„ฐํŽ˜์ด์Šค
โ”‚   โ”‚   โ””โ”€โ”€ KafkaEventConsumer<T>
โ”‚   โ”œโ”€โ”€ publisher/             ๋ฐœํ–‰ ์ธํ„ฐํŽ˜์ด์Šค
โ”‚   โ”‚   โ”œโ”€โ”€ KafkaEventPublisher
โ”‚   โ”‚   โ””โ”€โ”€ DefaultKafkaEventPublisher
โ”‚   โ””โ”€โ”€ KafkaEventMessageSupport   JSON ํŒŒ์‹ฑ, eventType ํ•„ํ„ฐ๋ง, ์ง๋ ฌํ™”
โ”œโ”€โ”€ metrics/                   Micrometer ๋ฉ”ํŠธ๋ฆญ
โ”‚   โ”œโ”€โ”€ KafkaMetrics               ์นด์šดํ„ฐ/ํƒ€์ด๋จธ ์ •์˜
โ”‚   โ”œโ”€โ”€ KafkaMetricTagSanitizer    ํƒœ๊ทธ ์ •๊ทœํ™”
โ”‚   โ”œโ”€โ”€ KafkaEventMetadataExtractor
โ”‚   โ”œโ”€โ”€ consumer/
โ”‚   โ”‚   โ””โ”€โ”€ KafkaMetricsRecordInterceptor
โ”‚   โ””โ”€โ”€ producer/
โ”‚       โ””โ”€โ”€ KafkaMetricsProducerListener
โ””โ”€โ”€ support/
    โ””โ”€โ”€ KafkaLogSanitizer          ๋กœ๊ทธ ์ธ์ ์…˜ ๋ฐฉ์ง€

3. ํ•ต์‹ฌ ์ถ”์ƒํ™”

3-1. EventEnvelope<T>

๋ชจ๋“  Kafka ๋ฉ”์‹œ์ง€์˜ ๋ž˜ํผ ๋ ˆ์ฝ”๋“œ. Jackson ๋‹คํ˜•์„ฑ ์—ญ์ง๋ ฌํ™”๋ฅผ ์ง€์›ํ•œ๋‹ค.

public record EventEnvelope<T>(
    String    eventId,     // UUID
    String    eventType,   // KafkaEventTypes ์ƒ์ˆ˜๊ฐ’
    LocalDateTime timestamp,
    T         payload
)

ํŒฉํ† ๋ฆฌ:

EventEnvelope<UsagePayload> envelope = EventEnvelope.of("DATA_USAGE", payload);
// eventId = UUID.randomUUID(), timestamp = LocalDateTime.now()

Jackson @JsonTypeInfo ๋งคํ•‘:

eventType ๊ฐ’ payload ํƒ€์ž…
DATA_USAGE UsagePayload
POLICY_UPDATED PolicyUpdatedPayload
USAGE_REALTIME UsageRealtimePayload
NOTIFICATION NotificationPayload

Wire format ์˜ˆ์‹œ:

{
  "eventId": "a1b2c3d4-...",
  "eventType": "NOTIFICATION",
  "timestamp": "2026-03-20T14:30:00",
  "payload": {
    "familyId": 1,
    "customerId": 42,
    "type": "THRESHOLD_ALERT",
    "title": "๋ฐ์ดํ„ฐ ๊ฒฝ๊ณ ",
    "message": "์ž”์—ฌ๋Ÿ‰์ด 10% ๋‚จ์•˜์Šต๋‹ˆ๋‹ค.",
    "data": { "thresholdPercent": 10 }
  }
}

3-2. ํŽ˜์ด๋กœ๋“œ ๋ ˆ์ฝ”๋“œ

๋ ˆ์ฝ”๋“œ ํ•„๋“œ ์šฉ๋„
UsagePayload familyId, customerId, appId, bytesUsed, metadata ์›๋ณธ ์‚ฌ์šฉ๋Ÿ‰ ์ด๋ฒคํŠธ
UsageRealtimePayload familyId, customerId, usedBytes, limitBytes, usagePercent, metadata ์‹ค์‹œ๊ฐ„ ์‚ฌ์šฉ๋Ÿ‰ ์Šค๋ƒ…์ƒท
PolicyUpdatedPayload familyId, policyKey, policyValue, activatedAt, active ์ •์ฑ… ๋ณ€๊ฒฝ ์ด๋ฒคํŠธ
NotificationPayload familyId, customerId, type, title, message, data ์•Œ๋ฆผ ์ด๋ฒคํŠธ (customerId=null์ด๋ฉด ๊ฐ€์กฑ ์ „์ฒด ๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ)

3-3. NotificationType

13์ข…์˜ ์•Œ๋ฆผ ํƒ€์ž… enum. NotificationEventSupport.resolveTitle(type)๋กœ ํ•œ๊ธ€ ์ œ๋ชฉ์„ ์–ป์„ ์ˆ˜ ์žˆ๋‹ค.

enum resolveTitle ๋ฐ˜ํ™˜๊ฐ’
QUOTA_UPDATED ๋ฐ์ดํ„ฐ ์ž”์—ฌ๋Ÿ‰ ๊ฐฑ์‹ 
THRESHOLD_ALERT ๋ฐ์ดํ„ฐ ๊ฒฝ๊ณ 
CUSTOMER_BLOCKED ๋ฐ์ดํ„ฐ ์ฐจ๋‹จ
CUSTOMER_UNBLOCKED ๋ฐ์ดํ„ฐ ์ฐจ๋‹จ ํ•ด์ œ
POLICY_CHANGED ์ •์ฑ… ๋ณ€๊ฒฝ
MISSION_CREATED ๋ฏธ์…˜ ์ƒ์„ฑ
REWARD_REQUESTED ๋ณด์ƒ ์š”์ฒญ
REWARD_APPROVED ๋ณด์ƒ ์Šน์ธ
REWARD_REJECTED ๋ณด์ƒ ๊ฑฐ์ ˆ
APPEAL_CREATED ์ด์˜์ œ๊ธฐ ์ ‘์ˆ˜
APPEAL_APPROVED ์ด์˜์ œ๊ธฐ ์Šน์ธ
APPEAL_REJECTED ์ด์˜์ œ๊ธฐ ๊ฑฐ์ ˆ
EMERGENCY_APPROVED ๊ธด๊ธ‰ ์ฟผํ„ฐ ์Šน์ธ

4. ๋ฉ”์‹œ์ง• ๊ณ„์•ฝ

4-1. ํ† ํ”ฝ

์ƒ์ˆ˜ ํ† ํ”ฝ๋ช… Producer Consumer
KafkaTopics.USAGE_EVENTS usage-events simulator-traffic dabom-processor-usage
KafkaTopics.POLICY_UPDATED policy-updated dabom-api-core dabom-processor-usage
KafkaTopics.USAGE_REALTIME usage-realtime dabom-processor-usage dabom-api-core
KafkaTopics.NOTIFICATION notification-events dabom-processor-usage / ๋ฐฐ์น˜ dabom-api-notification

4-2. ์ด๋ฒคํŠธ ํƒ€์ž…

์ƒ์ˆ˜ ๊ฐ’ ์—ฐ๊ฒฐ ํŽ˜์ด๋กœ๋“œ
KafkaEventTypes.DATA_USAGE DATA_USAGE UsagePayload
KafkaEventTypes.POLICY_UPDATED POLICY_UPDATED PolicyUpdatedPayload
KafkaEventTypes.USAGE_REALTIME USAGE_REALTIME UsageRealtimePayload
KafkaEventTypes.NOTIFICATION NOTIFICATION NotificationPayload

4-3. ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน

์ƒ์ˆ˜ ๊ฐ’ ์„œ๋น„์Šค
KafkaConsumerGroups.DABOM_PROCESSOR_USAGE_MAIN dabom-processor-usage-main-group dabom-processor-usage
KafkaConsumerGroups.DABOM_PROCESSOR_USAGE_POLICY dabom-processor-usage-policy-group dabom-processor-usage
KafkaConsumerGroups.DABOM_API_CORE_REALTIME dabom-api-core-realtime-group dabom-api-core
KafkaConsumerGroups.DABOM_NOTIFICATION_SENDER dabom-notification-sender-group dabom-api-notification

4-4. ์ฒ˜๋ฆฌ ํ๋ฆ„

flowchart LR
    A[simulator-traffic] -->|DATA_USAGE| T1[usage-events]
    B[dabom-api-core] -->|POLICY_UPDATED| T2[policy-updated]

    T1 --> C[dabom-processor-usage]
    T2 --> C

    C -->|USAGE_REALTIME| T3[usage-realtime]
    C -->|NOTIFICATION| T4[notification-events]

    T3 --> D[dabom-api-core]
    T4 --> E[dabom-api-notification]
Loading

5. ์ž๋™ ์„ค์ • (Auto-Configuration)

๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๊ฐ€ ์ปดํฌ๋„ŒํŠธ ์Šค์บ”๋˜๋ฉด ์•„๋ž˜ ๋นˆ์ด ์ž๋™ ๋“ฑ๋ก๋œ๋‹ค.

5-1. Producer ์„ค์ • (KafkaConfig)

๋นˆ ์„ค์ •
ProducerFactory<String, String> StringSerializer, acks=all, enable.idempotence=true
KafkaTemplate<String, String> observation ํ™œ์„ฑํ™”, ๋ฉ”ํŠธ๋ฆญ ๋ฆฌ์Šค๋„ˆ ๋ถ€์ฐฉ

5-2. Consumer ์„ค์ • (KafkaConfig)

๋นˆ ์„ค์ •
ConsumerFactory<String, String> ErrorHandlingDeserializer ๋ž˜ํ•‘ (poison pill ๋ฐฉ์–ด)
ConcurrentKafkaListenerContainerFactory observation ํ™œ์„ฑํ™”, ๋ฉ”ํŠธ๋ฆญ ์ธํ„ฐ์…‰ํ„ฐ, ๊ณตํ†ต ์—๋Ÿฌ ํ•ธ๋“ค๋Ÿฌ

5-3. ์—๋Ÿฌ ํ•ธ๋“ค๋Ÿฌ (KafkaErrorHandlerConfig)

๋นˆ ์—ญํ• 
CommonErrorHandler DefaultErrorHandler + exponential backoff + DLT ๋ผ์šฐํŒ…
KafkaExceptionClassifier ์˜ˆ์™ธ ์ฒด์ธ์„ ๋ถ„์„ํ•˜์—ฌ (action, code) ๊ฒฐ์ •
DLT Producer {์›๋ณธํ† ํ”ฝ}.DLT๋กœ ์‹คํŒจ ๋ฉ”์‹œ์ง€ ์ „์†ก, x-error-code/x-error-action ํ—ค๋” ํฌํ•จ

5-4. ๊ธฐํƒ€

๋นˆ ์—ญํ• 
KafkaEventMessageSupport JSON ํŒŒ์‹ฑ, eventType ํ•„ํ„ฐ๋ง, ์ง๋ ฌํ™”/์—ญ์ง๋ ฌํ™”
DefaultKafkaEventPublisher KafkaEventPublisher ๊ตฌํ˜„์ฒด
KafkaMetrics Micrometer ์นด์šดํ„ฐ/ํƒ€์ด๋จธ ์ •์˜
KafkaMetricsProducerListener producer ์„ฑ๊ณต/์‹คํŒจ ๋ฉ”ํŠธ๋ฆญ
KafkaMetricsRecordInterceptor consumer ์ฒ˜๋ฆฌ์‹œ๊ฐ„, producer-to-consumer ์ง€์—ฐ ๋ฉ”ํŠธ๋ฆญ
KafkaLogSanitizer ๋กœ๊ทธ ์ธ์ ์…˜ ๋ฐฉ์ง€ (\r\n\t ์น˜ํ™˜, 128์ž ์ ˆ๋‹จ)

6. ๋‚ด๋ถ€ ๋กœ์ง

6-1. ๋ฐœํ–‰ ํ๋ฆ„

Application
  โ””โ”€ KafkaEventPublisher.publish(topic, eventType, payload)
       โ””โ”€ EventEnvelope.of(eventType, payload)        โ† UUID, timestamp ์ž๋™ ์ƒ์„ฑ
            โ””โ”€ KafkaEventMessageSupport.serialize()    โ† ObjectMapper โ†’ JSON String
                 โ””โ”€ KafkaTemplate.send(topic, json)
                      โ””โ”€ KafkaMetricsProducerListener  โ† ์„ฑ๊ณต/์‹คํŒจ ์นด์šดํ„ฐ, ์ง€์—ฐ ํƒ€์ด๋จธ

6-2. ์†Œ๋น„ ํ๋ฆ„ (2-pass ์—ญ์ง๋ ฌํ™”)

๋ฌด๊ด€ํ•œ ์ด๋ฒคํŠธ์— ๋Œ€ํ•œ ์ „์ฒด ์—ญ์ง๋ ฌํ™” ๋น„์šฉ์„ ๋ฐฉ์ง€ํ•˜๊ธฐ ์œ„ํ•ด 2-pass ์ „๋žต์„ ์‚ฌ์šฉํ•œ๋‹ค.

ConsumerRecord<String, String>
  โ”‚
  โ”œโ”€ Pass 1: readTree(json) โ†’ JsonNode
  โ”‚    โ””โ”€ extractEventType(node)
  โ”‚         โ””โ”€ eventType์ด ๊ธฐ๋Œ€๊ฐ’๊ณผ ๋ถˆ์ผ์น˜? โ†’ warn ๋กœ๊ทธ, return (skip)
  โ”‚
  โ””โ”€ Pass 2: eventType ์ผ์น˜ ์‹œ
       โ””โ”€ convertToEnvelope(node, TypeReference<EventEnvelope<T>>)
            โ””โ”€ KafkaEventConsumer.handle(envelope, recordKey)

์˜ˆ์™ธ ๋ฐœ์ƒ ์‹œ ๋ถ„๋ฅ˜๊ธฐ๊ฐ€ ์•ก์…˜์„ ๊ฒฐ์ •ํ•œ๋‹ค:

Exception ๋ฐœ์ƒ
  โ””โ”€ KafkaExceptionClassifier.classify(exception)
       โ””โ”€ cause chain ์ˆœํšŒํ•˜๋ฉฐ ์ฒซ ๋งค์นญ ๊ทœ์น™ ์ ์šฉ
            โ”œโ”€ RETRY  โ†’ exponential backoff ํ›„ ์žฌ์‹œ๋„
            โ”œโ”€ IGNORE โ†’ skip (warn ๋กœ๊ทธ + ๋ฉ”ํŠธ๋ฆญ)
            โ””โ”€ DLQ    โ†’ {topic}.DLT๋กœ ์ „์†ก

6-3. ์˜ˆ์™ธ ๋ถ„๋ฅ˜ ๊ทœ์น™

์˜ˆ์™ธ ์•ก์…˜ ์ฝ”๋“œ
KafkaMessageDeserializationException DLQ DESERIALIZATION_FAILED
DeserializationException DLQ DESERIALIZATION_FAILED
SerializationException DLQ DESERIALIZATION_FAILED
IllegalArgumentException IGNORE INVALID_EVENT
TimeoutException RETRY TRANSIENT_NETWORK
SocketTimeoutException RETRY TRANSIENT_NETWORK
RetriableException RETRY TRANSIENT_NETWORK
TransientDataAccessException RETRY TRANSIENT_DB
KafkaMessageProcessingException RETRY PROCESSING_FAILED
NonRetryableKafkaMessageProcessingException DLQ NON_RETRYABLE_PROCESSING_FAILED
๊ธฐํƒ€ ๋ชจ๋“  ์˜ˆ์™ธ DLQ UNKNOWN

๋ถ„๋ฅ˜๊ธฐ๋Š” cause chain ์ „์ฒด๋ฅผ ์ˆœํšŒํ•œ๋‹ค. ๋ž˜ํ•‘๋œ ์˜ˆ์™ธ๋„ ์˜ฌ๋ฐ”๋ฅด๊ฒŒ ๋ถ„๋ฅ˜๋œ๋‹ค.


7. ์„ค์ • ๋ ˆํผ๋Ÿฐ์Šค

ํ”„๋กœํผํ‹ฐ ๊ธฐ๋ณธ๊ฐ’ ์„ค๋ช…
spring.kafka.bootstrap-servers localhost:9092 Kafka ๋ธŒ๋กœ์ปค ์ฃผ์†Œ
app.kafka.error-handler.retry.max-attempts 2 ์ตœ๋Œ€ ์žฌ์‹œ๋„ ํšŸ์ˆ˜
app.kafka.error-handler.retry.initial-interval-ms 1000 ์ฒซ ์žฌ์‹œ๋„ ๋Œ€๊ธฐ (ms)
app.kafka.error-handler.retry.multiplier 2.0 backoff ๋ฐฐ์ˆ˜
app.kafka.error-handler.retry.max-interval-ms 10000 ์ตœ๋Œ€ ๋Œ€๊ธฐ ๊ฐ„๊ฒฉ (ms)

8. ๋ฉ”ํŠธ๋ฆญ ๋ ˆํผ๋Ÿฐ์Šค

Producer ๋ฉ”ํŠธ๋ฆญ

๋ฉ”ํŠธ๋ฆญ ํƒ€์ž… ํƒœ๊ทธ
kafka.producer.send.success.count Counter topic, eventType, result
kafka.producer.send.error.count Counter topic, eventType, result
kafka.producer.send.latency Timer (percentile) topic, eventType

Consumer ๋ฉ”ํŠธ๋ฆญ

๋ฉ”ํŠธ๋ฆญ ํƒ€์ž… ํƒœ๊ทธ
kafka.consumer.success.count Counter topic, group, eventType
kafka.consumer.invalid_event.count Counter topic, group, eventType
kafka.consumer.retryable_error.count Counter topic, group, eventType
kafka.consumer.dlt.count Counter topic, group, eventType
kafka.consumer.dedup_hit.count Counter topic, group, eventType
kafka.consumer.processing.time Timer (percentile) topic, group, eventType
kafka.consumer.producer_to_consumer.latency Timer (percentile) topic, group, eventType

eventType ํƒœ๊ทธ๋Š” KafkaMetricTagSanitizer๊ฐ€ ์ •๊ทœํ™”ํ•œ๋‹ค. ํ—ˆ์šฉ๋˜์ง€ ์•Š์€ ๊ฐ’์€ UNKNOWN ๋˜๋Š” OTHER๋กœ ์น˜ํ™˜๋˜์–ด ๋ฉ”ํŠธ๋ฆญ ํญ๋ฐœ์„ ๋ฐฉ์ง€ํ•œ๋‹ค.


9. ์‚ฌ์šฉ ๋ฐฉ๋ฒ•

9-1. ์˜์กด์„ฑ ์ถ”๊ฐ€

repositories {
    mavenCentral()
    maven { url 'https://jitpack.io' }
}

dependencies {
    implementation 'com.github.da-bom:lib-kafka:v1.0.1'
}

9-2. ์ปดํฌ๋„ŒํŠธ ์Šค์บ”

์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ๋ฃจํŠธ ํŒจํ‚ค์ง€๊ฐ€ com.dabom ํ•˜์œ„๊ฐ€ ์•„๋‹ˆ๋ฉด ๋ช…์‹œ์ ์œผ๋กœ ์Šค์บ” ๋ฒ”์œ„๋ฅผ ์ถ”๊ฐ€ํ•œ๋‹ค.

@SpringBootApplication(scanBasePackages = {
    "com.myservice",
    "com.dabom.messaging.kafka"
})
public class MyApplication {}

9-3. ์ด๋ฒคํŠธ ๋ฐœํ–‰

@Service
@RequiredArgsConstructor
public class UsageEventPublishService {
    private final KafkaEventPublisher kafkaEventPublisher;

    public void publish(UsageRealtimePayload payload) {
        kafkaEventPublisher.publish(
                KafkaTopics.USAGE_REALTIME,
                KafkaEventTypes.USAGE_REALTIME,
                payload);
    }
}

์•Œ๋ฆผ ๋ฐœํ–‰:

@Service
@RequiredArgsConstructor
public class NotificationPublishService {
    private final KafkaEventPublisher kafkaEventPublisher;

    public void publish(NotificationPayload payload) {
        kafkaEventPublisher.publish(
                KafkaTopics.NOTIFICATION,
                NotificationEventSupport.toEnvelope(payload));
    }
}

9-4. ์ด๋ฒคํŠธ ์†Œ๋น„ (๋ฐฉ๋ฒ• A: consumeByEventType ๋žŒ๋‹ค)

@Component
@RequiredArgsConstructor
public class UsageRealtimeListener {
    private final KafkaEventMessageSupport kafkaEventMessageSupport;
    private final UsageRealtimeService usageRealtimeService;

    @KafkaListener(
            topics = KafkaTopics.USAGE_REALTIME,
            groupId = KafkaConsumerGroups.DABOM_API_CORE_REALTIME)
    public void consume(ConsumerRecord<String, String> record) {
        kafkaEventMessageSupport.consumeByEventType(
                record,
                KafkaEventTypes.USAGE_REALTIME,
                new TypeReference<EventEnvelope<UsageRealtimePayload>>() {},
                (envelope, key) -> usageRealtimeService.handle(envelope.payload(), key));
    }
}

9-5. ์ด๋ฒคํŠธ ์†Œ๋น„ (๋ฐฉ๋ฒ• B: KafkaEventConsumer ๊ตฌํ˜„)

๋ฆฌ์Šค๋„ˆ๋ฅผ ์–‡๊ฒŒ ์œ ์ง€ํ•˜๊ณ  ์ฒ˜๋ฆฌ ์ฑ…์ž„์„ ๋ถ„๋ฆฌํ•˜๋Š” ํŒจํ„ด:

@Component
@RequiredArgsConstructor
public class UsageRealtimeConsumer implements KafkaEventConsumer<UsageRealtimePayload> {
    private final UsageRealtimeService usageRealtimeService;

    @Override
    public String eventType() {
        return KafkaEventTypes.USAGE_REALTIME;
    }

    @Override
    public TypeReference<EventEnvelope<UsageRealtimePayload>> typeReference() {
        return new TypeReference<>() {};
    }

    @Override
    public void handle(EventEnvelope<UsageRealtimePayload> envelope, String recordKey) {
        usageRealtimeService.handle(envelope.payload(), recordKey);
    }
}

๋ฆฌ์Šค๋„ˆ์—์„œ ์œ„์ž„:

@KafkaListener(topics = KafkaTopics.USAGE_REALTIME,
               groupId = KafkaConsumerGroups.DABOM_API_CORE_REALTIME)
public void consume(ConsumerRecord<String, String> record) {
    usageRealtimeConsumer.consume(record, kafkaEventMessageSupport);
}

9-6. ์˜ˆ์™ธ ์‚ฌ์šฉ ๊ทœ์น™

์ƒํ™ฉ ์‚ฌ์šฉํ•  ์˜ˆ์™ธ ๋ถ„๋ฅ˜ ๊ฒฐ๊ณผ
JSON ํŒŒ์‹ฑ / ์—ญ์ง๋ ฌํ™” ์‹คํŒจ KafkaMessageDeserializationException DLQ
์ผ์‹œ ์žฅ์•  (๋„คํŠธ์›Œํฌ, DB) KafkaMessageProcessingException RETRY
์žฌ์‹œ๋„ ๋ฌด์˜๋ฏธํ•œ ๋น„์ฆˆ๋‹ˆ์Šค ์‹คํŒจ NonRetryableKafkaMessageProcessingException DLQ
๊ณ„์•ฝ ์œ„๋ฐ˜, ์ž…๋ ฅ ๊ฒ€์ฆ ์‹คํŒจ IllegalArgumentException IGNORE

10. ์šด์˜ / ๋ฒ„์ „ ์ •์ฑ…

  • ํƒœ๊ทธ ๋ฒ„์ „ ๊ณ ์ • ์‚ฌ์šฉ: v1.0.1 ํ˜•ํƒœ
  • ๊ธฐ์กด ํƒœ๊ทธ ์žฌ์‚ฌ์šฉ ๊ธˆ์ง€ - ๋ณ€๊ฒฝ ์‹œ ๋ฐ˜๋“œ์‹œ ์ƒˆ ํƒœ๊ทธ ๋ฐœํ–‰
  • ๋ธŒ๋ ˆ์ดํ‚น ๋ณ€๊ฒฝ์€ ๋ฉ”์ด์ € ๋ฒ„์ „ ์—…
  • ํŒจ์น˜/๊ธฐ๋Šฅ ์ถ”๊ฐ€๋Š” ๋งˆ์ด๋„ˆ/ํŒจ์น˜ ๋ฒ„์ „์œผ๋กœ ์ƒˆ ํƒœ๊ทธ ๋ฐœํ–‰

11. ๋ฒ„์ „๋ณ„ ๋ณ€๊ฒฝ ์ด๋ ฅ

v1.0.1

  • ๋ฒ„์ „ ๋ฒ”ํ”„

v1.0.0

  • notification ๊ณ„์•ฝ์„ NotificationType + ๋‹จ์ผ NotificationPayload ๊ตฌ์กฐ๋กœ ๋‹จ์ˆœํ™”
  • EventEnvelope.subType ์ œ๊ฑฐ
  • usage-persist ํ† ํ”ฝ ๊ณ„์•ฝ ์ œ๊ฑฐ (USAGE_PERSIST, UsagePersistPayload, persistence consumer group)
  • NotificationEventSupport.resolveTitle() ํ—ฌํผ ์ถ”๊ฐ€

v0.5.0

  • KafkaTopics, KafkaEventTypes, KafkaConsumerGroups ์ถ”๊ฐ€
  • NotificationSubTypes, NotificationEventSupport ์ถ”๊ฐ€

v0.4.0

  • ํŒจํ‚ค์ง€ ๊ตฌ์กฐ๋ฅผ com.dabom.messaging.kafka ๊ธฐ์ค€์œผ๋กœ ์žฌ์ •๋ฆฌ
  • KafkaEventPublisher, DefaultKafkaEventPublisher, KafkaEventConsumer<T> ์ถ”๊ฐ€
  • tracing ์ง€์› ์ œ๊ฑฐ

v0.3.x ์ดํ•˜

  • ์ดˆ๊ธฐ Kafka ์„ค์ •, ์ด๋ฒคํŠธ envelope, ์—๋Ÿฌ ์ฒ˜๋ฆฌ, ๋ฉ”ํŠธ๋ฆญ ๊ธฐ๋Šฅ ์ œ๊ณต

12. JitPack ์žฅ์•  ์‹œ fallback

# ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ํ”„๋กœ์ ํŠธ์—์„œ
./gradlew clean publishToMavenLocal -x test
// ์†Œ๋น„ ํ”„๋กœ์ ํŠธ์—์„œ
repositories {
    mavenLocal()
    mavenCentral()
    maven { url 'https://jitpack.io' }
}

13. ์ƒ์„ธ ๋ฌธ์„œ

๋ฌธ์„œ ๋‚ด์šฉ
usage-guide ์„ค์น˜๋ถ€ํ„ฐ ๋ฐœํ–‰/์†Œ๋น„๊นŒ์ง€ ๋‹จ๊ณ„๋ณ„ ๊ฐ€์ด๋“œ
kafka-architecture-overview ํ† ํ”ฝ ๊ตฌ์กฐ, ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน, ์ฒ˜๋ฆฌ ํ๋ฆ„
error-handling-policy ์˜ˆ์™ธ ๋ถ„๋ฅ˜ ๊ทœ์น™, retry/DLT ์ •์ฑ…
operations-guide ๋ฉ”ํŠธ๋ฆญ ํ•ด์„, DLT ๊ธฐ์ค€, ์šด์˜ ์ฒดํฌํฌ์ธํŠธ
migration-guide ๊ธฐ์กด ์„œ๋น„์Šค์—์„œ lib-kafka๋กœ ์ „ํ™˜ํ•˜๋Š” ์ ˆ์ฐจ

About

๐Ÿ“ฆ DABOM ๊ณต์šฉ Kafka ๋ฉ”์‹œ์ง• ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages