Kafka ๋ž€


Apache Kafka ๋Š” Java, Scala ๋กœ ์ž‘์„ฑ๋œ ๋ถ„์‚ฐ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ํ”Œ๋žซํผ์œผ๋กœ Linkedin ์—์„œ ๊ฐœ๋ฐœํ•˜๊ณ  Apache ์žฌ๋‹จ์—์„œ ๊ด€๋ฆฌํ•˜๊ณ  ์žˆ๋‹ค.

Kafka Broker

  • Kafka Broker ๋Š” Kafka Application ์„ ์‹คํ–‰์ค‘์ธ ์„œ๋ฒ„ ์ธ์Šคํ„ด์Šค๋‹ค.
  • Kafka ๊ธฐ๋ณธ์ ์œผ๋กœ 3๋Œ€ ์ด์ƒ์˜ Kafka Broker ํด๋Ÿฌ์Šคํ„ฐ๋กœ ๊ตฌ์„ฑ๋œ๋‹ค.
  • v2.5 ๊นŒ์ง€ ์ฃผํ‚คํผ๋ฅผ ํ†ตํ•ด ํด๋Ÿฌ์Šคํ„ฐ๋ฅผ ๊ด€๋ฆฌํ–ˆ๊ณ , v2.8 ๋ถ€ํ„ฐ KRaft ์‚ฌ์šฉ.
  • n๊ฐœ์˜ ๋ธŒ๋กœ์ปค ์ค‘ 1๋Œ€๋Š” ์ปจํŠธ๋กค๋Ÿฌ ๊ธฐ๋Šฅ์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.
    • ์ปจํŠธ๋กค๋Ÿฌ์˜ ์—ญํ• :
      • ๊ฐ ๋ธŒ๋กœ์ปค์—๊ฒŒ ๋‹ด๋‹น ํŒŒํ‹ฐ์…˜ ํ• ๋‹น
      • ๋ธŒ๋กœ์ปค ์ •์ƒ ๋™์ž‘ ๋ชจ๋‹ˆํ„ฐ๋ง
      • ์–ด๋–ค ๋ธŒ๋กœ์ปค๊ฐ€ ์ปจํŠธ๋กค๋Ÿฌ์ธ์ง€๋Š” ์ฃผํ‚คํผ์— ์ €์žฅ๋œ๋‹ค.

Record

new ProducerRecord<String, String>("topic", "key", "message");
ConsumerRecords<String, String> records = consumer.poll(1000);
  • ๊ฐ์ฒด๋ฅผ Producer ์—์„œ Consumer ๋กœ ์ „๋‹ฌํ•  ๋•Œ Kafka ๋‚ด๋ถ€์—์„œ byte ํ˜•ํƒœ๋กœ ์ €์žฅํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์ง๋ ฌํ™”/์—ญ์ง๋ ฌํ™”๊ฐ€ ํ•„์š”ํ•˜๋‹ค.
  • Kafka ์—์„œ ๊ธฐ๋ณธ์ ์œผ๋กœ ์ œ๊ณตํ•˜๋Š” StringSerializer, ShortSerializer ๋“ฑ์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๊ณ , ์ปค์Šคํ…€ ์ง๋ ฌํ™” ํด๋ž˜์Šค๋ฅผ ํ†ตํ•ด POJO ๋„ ์ง๋ ฌํ™”/์—ญ์ง๋ ฌํ™”๊ฐ€ ๊ฐ€๋Šฅํ•˜๋‹ค.

Topic & Partition

  • ๋ฉ”์‹œ์ง€๋ฅผ Topic ์œผ๋กœ ๋ณด๋‚ด๋ฉด Partition ์„ ํ†ตํ•ด ๋ฉ”์‹œ์ง€๋ฅผ ๋ถ„๋ฅ˜ํ•  ์ˆ˜ ์žˆ๋‹ค.
  • 1๊ฐœ์˜ Topic ์€ 1๊ฐœ ์ด์ƒ์˜ Partition ์„ ๊ฐ€์งˆ ์ˆ˜ ์žˆ๋‹ค.
  • ๊ฐ Partition ๋งˆ๋‹ค ๊ณ ์œ ํ•œ Offset ์„ ๊ฐ€์ง„๋‹ค.
  • ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ์ˆœ์„œ๋Š” Partition ๋ณ„๋กœ ๊ฐ๊ฐ ์œ ์ง€ ๋ฐ ๊ด€๋ฆฌ๋œ๋‹ค.
  • Producer ๊ฐ€ Topic ์— ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด๋ฉด ๋ฉ”์‹œ์ง€๋Š” ๋ผ์šด๋“œ๋กœ๋นˆ ๋˜๋Š” Producer ๊ฐ€ ์ง€์ •ํ•œ Partition Key ๋ฅผ ํ™œ์šฉํ•ด ๋ฉ”์‹œ์ง€๋ฅผ ์ €์žฅํ•  Partition ์„ ์ง€์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.

Kafka log and segment

  • Kakfa ๋Š” ๋‹ค๋ฅธ ๋ฉ”์‹œ์ง• ํ”Œ๋žซํผ๊ณผ ๋‹ฌ๋ฆฌ ๋ฉ”์‹œ์ง€๋ฅผ ํŒŒ์ผ์‹œ์Šคํ…œ์— ์ €์žฅ๋œ๋‹ค.
  • ๋ฉ”์‹œ์ง€๊ฐ€ ์ €์žฅ๋  ๋•Œ๋Š” segment ํŒŒ์ผ์ด ์—ด๋ ค์žˆ์œผ๋ฉฐ, segment ๋Š” ์‹œ๊ฐ„ ๋˜๋Š” ์šฉ๋Ÿ‰ ๊ธฐ์ค€์œผ๋กœ ๋‹ซํžŒ๋‹ค.
  • segment ๊ฐ€ ๋‹ซํžŒ ์ดํ›„์—” ์ผ์ • ์‹œ๊ฐ„ ๋˜๋Š” ์šฉ๋Ÿ‰์— ๋”ฐ๋ผ ์‚ญ์ œ ๋˜๋Š” ์••์ถ•๋œ๋‹ค.

Partition & Consumer

  • Partition ๊ฐœ์ˆ˜ >= Consumer๊ฐœ์ˆ˜
  • ์ฆ‰, 1๊ฐœ์˜ Consumer ๊ฐ€ 3๊ฐœ์˜ Partition ์œผ๋กœ๋ถ€ํ„ฐ ๋ฉ”์‹œ์ง€๋ฅผ ์ปจ์Šˆ๋ฐ ํ•  ์ˆ˜ ์žˆ๊ณ , 3๊ฐœ์˜ Consumer ๊ฐ€ ๊ฐ๊ฐ Partition ์„ ํ• ๋‹น ๋ฐ›์•„ 1:1 ๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์ปจ์Šˆ๋ฐ ํ•  ์ˆ˜ ์žˆ์ง€๋งŒ, 4๊ฐœ์˜ Consumer ๊ฐ€ ์žˆ๋Š” ๊ฒฝ์šฐ 1๊ฐœ์˜ Consumer ๋Š” ๋†€๊ฒŒ๋œ๋‹ค.
  • Consumer ์ค‘ 1๋Œ€๊ฐ€ ์žฅ์• ๊ฐ€ ๋ฐœ์ƒํ•  ๊ฒฝ์šฐ Partition ์„ ๋‹ค์‹œ ํ• ๋‹น ๋ฐ›์„ Consumer ๋ฅผ ์ฐพ๊ธฐ ์œ„ํ•ด ๋ฆฌ๋ฐธ๋Ÿฐ์‹ฑ์ด ์ˆ˜ํ–‰๋œ๋‹ค.
  • Consumer ๋Š” ๋ชฉ์ ์— ๋”ฐ๋ผ Consumer ๊ทธ๋ฃน์œผ๋กœ ๋ถ„๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค.

Broker Partition Replication

  • Broker ์„œ๋ฒ„๊ฐ€ ๋‹ค์šด๋  ๋•Œ ๋ฐ์ดํ„ฐ ์œ ์‹ค์„ ๋ฐฉ์ง€ํ•˜๊ธฐ ์œ„ํ•ด --replicator-factor 3 ์˜ต์…˜์„ ํ†ตํ•ด Partition ์„ ๋‹ค๋ฅธ Broker ์— ๋ณต์ œํ•˜์—ฌ ์ด์Šˆ์— ๋Œ€์‘ํ•œ๋‹ค.
  • ์ฆ‰, 1๊ฐœ์˜ Topic ์„ 3๊ฐœ์˜ Partition ๊ณผ 3๊ฐœ์˜ Replicator Factor ๋กœ ์„ค์ •ํ•˜๋ฉด, 3๊ฐœ์˜ Broker ์„œ๋ฒ„์— ๊ฐ๊ฐ Partition 3๊ฐœ๊ฐ€ ํ• ๋‹น๋˜์–ด ์ด 9๊ฐœ์˜ Partition ์„ ๊ฐ€์ง€๊ฒŒ๋œ๋‹ค.
  • Partition ๋ณต์ œ๋Š” ๋ฆฌ๋”์™€ ํŒ”๋กœ์›Œ๋กœ ๊ตฌ์„ฑ๋˜๋ฉฐ, ๋ฆฌ๋”๋Š” Kafka ํด๋ผ์ด์–ธํŠธ์™€ ์ง์ ‘ ๋ฐ์ดํ„ฐ๋ฅผ ์ฃผ๊ณ  ๋ฐ›๊ณ  ํŒ”๋กœ์›Œ๋Š” ๋ฆฌ๋”๋กœ ๋ถ€ํ„ฐ ๋ ˆ๊ณ ๋“œ๋ฅผ ์ง€์† ๋ณต์ œํ•˜๋ฉฐ ๋ฆฌ๋”๊ฐ€ ๋‹ค์šด๋  ๊ฒฝ์šฐ ํŒ”๋กœ์›Œ ์ค‘ 1๊ฐœ๊ฐ€ ๋ฆฌ๋”๋กœ ์„ ์ถœ๋œ๋‹ค.
    • ๋ฆฌ๋”์™€ ํŒ”๋กœ์›Œ๊ฐ€ ๋ชจ๋‘ sync ๊ฐ€ ๋งž๋Š” ์ƒํƒœ๋ฅผ In-Sync Replica, ISR ์ด๋ผ๊ณ  ํ•˜๋Š”๋ฐ, ๋งŒ์•ฝ ISR ์ด ์•„๋‹Œ ์ƒํƒœ์—์„œ ์žฅ์• ๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด ํŒ”๋กœ์›Œ๋Š” ๋ฆฌ๋”๊ฐ€ ๋ณต๊ตฌ๋  ๋•Œ ๊นŒ์ง€ ๊ธฐ๋‹ค๋ ค์•ผ ํ•˜๊ธฐ ๋•Œ๋ฌธ์— unclean.leader.election.enable ์˜ต์…˜์„ true ๋กœ ์„ค์ •ํ•ด์„œ ISR ์ƒํƒœ๊ฐ€ ์•„๋‹Œ ํŒ”๋กœ์›Œ๋„ ๋ฆฌ๋”๋กœ ์„ ์ถœ๋  ์ˆ˜์žˆ๊ฒŒ๋” ์กฐ์น˜ํ•˜์—ฌ ๋‹ค๋ฅธ ๋ ˆ์ฝ”๋“œ๋ฅผ ๋จผ์ € ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋„๋ก ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.

Kafka Producer


Properties prop = new Properties();
prop.put("bootstrap.servers", "kafka01:9092, kafka01:9092, kafka01:9092");
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
KafkaProducer<Integer, String> producer = new KafkaProducer<>(prop);
 
producer.send(new ProducerRecord<>("topicName", "key", "value"));
producer.send(new ProducerRecord<>("topicName", "value"));
 
producer.close();
  • send() ๋ฅผ ํ˜ธ์ถœํ•˜๋ฉด Serializer ๋ฅผ ํ†ตํ•ด Record ๋ฅผ byte ๋กœ ๋ณ€ํ™˜ํ•˜๊ณ  Partitioner ๋ฅผ ํ†ตํ•ด ์ €์žฅ๋  ํŒŒํ‹ฐ์…˜์„ ๊ฒฐ์ •ํ•œ๋‹ค.
  • ์ดํ›„ Record ๋“ค์„ Buffer ์— ์ €์žฅํ•˜๋Š”๋ฐ ๋ฐฐ์น˜๋กœ ๋ฌถ์–ด์„œ ์ €์žฅํ•œ๋‹ค.
  • ์ดํ›„ Sender ๊ฐ€ ๋ฐฐ์น˜๋ฅผ Kafka Broker ์—๊ฒŒ ๋ฐฐ์น˜๋ฅผ ์ „์†กํ•œ๋‹ค.
    • Sender ๋Š” ๋ณ„๋„์˜ ์Šค๋ ˆ๋“œ๋กœ ์ž‘๋™ํ•œ๋‹ค.
    • ๋ฐฐ์น˜๊ฐ€ ์ฐผ๋Š”์ง€ ์—ฌ๋ถ€์— ์ƒ๊ด€์—†์ด Sender ๋Š” ์ฐจ๋ก€๋Œ€๋กœ Broker ์— ์ „์†กํ•œ๋‹ค.
    • Sender ๊ฐ€ ๋ฐฐ์น˜๋ฅผ ์ „์†กํ•˜๋Š” ๋™์•ˆ send() -> Serializer -> Partitioner ๋Š” ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐฐ์น˜๋กœ ๋ชจ์€๋‹ค.
  • ์ฒ˜๋ฆฌ๋Ÿ‰ ๊ด€๋ จ๋œ ์ฃผ์š” ์„ค์ •
    • batch.size ๋Š” ๋ฐฐ์น˜ ํฌ๊ธฐ๋ฅผ ์ง€์ •ํ•˜๊ณ , ๋ฐฐ์น˜๊ฐ€ ๋‹ค ์ฐจ๋ฉด ๋ฐ”๋กœ ์ „์†กํ•œ๋‹ค.
    • linger.ms ์ „์†ก ๋Œ€๊ธฐ ์‹œ๊ฐ„์œผ๋กœ ๊ธฐ๋ณธ๊ฐ’์ด 0์ธ๋ฐ, ๋Œ€๊ธฐ ์‹œ๊ฐ„์„ ์ฃผ๋ฉด ์„ค์ •๊ฐ’๋งŒํผ ๊ธฐ๋‹ค๋ ธ๋‹ค๊ฐ€ ๋ฐฐ์น˜๋ฅผ ์ „์†กํ•˜๊ณ  ๋Œ€๊ธฐ ์‹œ๊ฐ„์ด ์—†์œผ๋ฉด ๋ฐ”๋กœ ๋ฐฐ์น˜๋ฅผ ๋ณด๋‚ธ๋‹ค.
Future<RecordMetadata> f = producer.send(new ProducerRecord<>("topic", "value"));
try {
	RecordMetadata meta = f.get(); // ๋ธ”๋กœํ‚น
}
  • send() ๋Š” Future ๋ฅผ ๋ฐ˜ํ™˜ํ•ด์„œ ์ „์†ก ๊ฒฐ๊ณผ๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.
  • ํ•˜๋‚˜์˜ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ผ๋•Œ ๋งˆ๋‹ค ๋ธ”๋กœํ‚น๋˜๊ธฐ ๋•Œ๋ฌธ์— ๋ฐฐ์น˜์— ๋ฉ”์‹œ์ง€๊ฐ€ ์Œ“์ด์ง€ ์•Š๊ณ  ํ•˜๋‚˜์”ฉ๋งŒ ๋“ค์–ด๊ฐ€๊ธฐ ๋•Œ๋ฌธ์— ์ฒ˜๋ฆฌ๋Ÿ‰์ด ์ €ํ•˜๋œ๋‹ค.
  • ๋Œ€์‹  ๋ฉ”์‹œ์ง€๊ฑด๋ณ„๋กœ ํ™•์‹คํ•˜๊ฒŒ ๊ฒฐ๊ณผ๋ฅผ ์•Œ ์ˆ˜ ์žˆ๋‹ค.
producer.send(new ProducerRecord<>("topic", "value"),
	new Callback() {
		@Override
		public void onCompletion(RecordMetadata metadata, Exception ex) {
		}
	}
);
  • ๋Œ€์‹  Callback ๊ฐ์ฒด๋ฅผ ์ „๋‹ฌํ•ด์„œ ๋…ผ๋ธ”๋กœํ‚น ๋ฐฉ์‹์œผ๋กœ ์ „์†ก์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค. ๋•Œ๋ฌธ์— ๋ฐฐ์น˜๊ฐ€ ์ •์ƒ์ ์œผ๋กœ ์Œ“์ด๊ธฐ ๋•Œ๋ฌธ์— ์ฒ˜๋ฆฌ๋Ÿ‰ ์ €ํ•˜๊ฐ€ ์—†๋‹ค.

์ „์†ก ๋ณด์žฅ ack + min.insync.replicas

  • ์œ„์—์„œ ISR ์— ๋Œ€ํ•ด ์„ค๋ช…ํ–ˆ๋“ฏ์ด ๋ฉ”์‹œ์ง€๊ฐ€ ์—ฌ๋Ÿฌ ๋ธŒ๋กœ์ปค์— ๊ฑธ์ณ ๋ณต์ œ๋˜๋Š”๋ฐ, ํ”„๋กœ๋“€์„œ ์ž…์žฅ์—์„œ ack ๊ฐ’์„ ํ†ตํ•ด ์ „์†ก ๋ณด์žฅ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.
    • ack = 0 ์ผ ๊ฒฝ์šฐ ์„œ๋ฒ„์˜ ์‘๋‹ต์„ ๊ธฐ๋‹ค๋ฆฌ์ง€ ์•Š๊ธฐ์— ์ „์†ก ๋ณด์žฅ์ด ๋˜์ง€ ์•Š๋Š”๋‹ค.
    • ack = 1 ์ผ ๊ฒฝ์šฐ ํŒŒํ‹ฐ์…˜์˜ ๋ฆฌ๋”์— ์ €์žฅ๋˜๋ฉด ๋ธŒ๋กœ์ปค๋กœ๋ถ€ํ„ฐ ์‘๋‹ต์ด ์˜จ๋‹ค. ๋•Œ๋ฌธ์— ๋ฆฌ๋”์— ์žฅ์• ๊ฐ€ ๋‚˜๋ฉด ๋ฉ”์‹œ์ง€๊ฐ€ ์œ ์‹ค๋  ์ˆ˜ ์žˆ๋‹ค.
    • ack = all ์ผ ๊ฒฝ์šฐ ๋ชจ๋“  ๋ฆฌํ”Œ๋ฆฌ์นด์— ์ €์žฅ๋˜๋ฉด ์‘๋‹ต์„ ๋ฐ›๋Š”๋‹ค. ์ด๋•Œ ๋ฆฌํ”Œ๋ฆฌ์นด๋Š” ๋ธŒ๋กœ์ปค์˜ min.insync.replicas ์„ค์ •์— ๋”ฐ๋ผ ๋‹ฌ๋ผ์ง„๋‹ค.
  • ๋ฆฌํ”Œ๋ฆฌ์นด ๊ฐœ์ˆ˜๊ฐ€ 3์ด๊ณ  ack = all ์ผ ๋•Œ
    • min.insync.replicas = 1 ์ด๋ฉด, ack = 1 ๊ณผ ๋™์ผํ•˜๊ฒŒ ๋ฆฌ๋”์— ์ €์žฅ๋˜๋ฉด ์‘๋‹ตํ•œ๋‹ค.
    • min.insync.replicas = 2 ์ด๋ฉด, ๋ฆฌ๋”์— ์ €์žฅํ•˜๊ณ  ํŒ”๋กœ์›Œ 1๊ฐœ์— ์ €์žฅ๋˜๋ฉด ์‘๋‹ตํ•œ๋‹ค.
    • min.insync.replicas = 3 ์ด๋ฉด, ๋ฆฌ๋”์™€ ํŒ”๋กœ์›Œ 2๊ฐœ๊ฐ€ ๋ชจ๋‘ ์ €์žฅ๋˜์•ผ ์‘๋‹ตํ•œ๋‹ค. ํŒ”๋กœ์›Œ ์ค‘ ํ•˜๋‚˜๋ผ๋„ ์žฅ์• ๊ฐ€ ๋‚˜๋ฉด ๋ฆฌํ”Œ๋ฆฌ์นด ๋ถ€์กฑ์œผ๋กœ ์ €์žฅ์— ์‹คํŒจํ•œ๋‹ค. 

Kafka Consumer


Properties prop = new Properties();
prop.put("bootstrap.servers", "localhost:9092");
prop.put("group.id", "group1"); // ์ปจ์Šˆ๋จธ๊ทธ๋ฃน ์ง€์ •
prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
consumer.subscribe(Collections.singleton("simple")); // ํ† ํ”ฝ ๊ตฌ๋…
while () {
	ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
	for (ConsumerRecord<String, String> record : records) {
		System.out.println(record.value());
		System.out.println(record.topic());
		System.out.println(record.partition());
		System.out.println(record.offset());
	}
}
consumer.close();
  • ์œ„ ์ฝ”๋“œ๋Š” Consumer ๊ฐ€ Partition ์œผ๋กœ๋ถ€ํ„ฐ ๋ฉ”์‹œ์ง€๋ฅผ ์ฝ์–ด์˜ฌ ๋•Œ ์‚ฌ์šฉํ•˜๋Š” ์ „ํ˜•์ ์ธ ์ฝ”๋“œ ๊ตฌ์กฐ๋‹ค.

Commit & Offset

  • consumer.poll() ๋ฉ”์„œ๋“œ๋Š” ์ด์ „์— ์ปค๋ฐ‹๋œ ์˜คํ”„์…‹์œผ๋กœ๋ถ€ํ„ฐ ๋ฉ”์‹œ์ง€๋ฅผ ์ฝ์–ด์˜ค๊ณ  ๋งˆ์ง€๋ง‰์œผ๋กœ ์ฝ์–ด์˜จ ๋ฉ”์‹œ์ง€์˜ ์˜คํ”„์…‹์„ ๊ธฐ์ค€์œผ๋กœ ๋‹ค์‹œ ์ปค๋ฐ‹ํ•œ๋‹ค. ์ด ๊ณผ์ •์„ ๋ฐ˜๋ณตํ•˜์—ฌ ๋ฉ”์‹œ์ง€๋ฅผ ์ฝ์–ด์˜ค๋Š” ๊ตฌ์กฐ๋‹ค.
  • ๋งŒ์•ฝ ์ฒ˜์Œ ์ ‘๊ทผ์ด๊ฑฐ๋‚˜ ์ปค๋ฐ‹ํ•œ ์˜คํ”„์…‹์ด ์—†๋Š” ๊ฒจ์šฐ auto.offset.reset ์„ค์ •์„ ํ™œ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.
    • auto.offset.reset = earliest ์˜ ๊ฒฝ์šฐ ๋งจ ์ฒ˜์Œ ๋ฉ”์‹œ์ง€๋ฅผ ์˜คํ”„์…‹์œผ๋กœ ์‚ฌ์šฉํ•œ๋‹ค.
    • auto.offset.reset = latest ์˜ ๊ฒฝ์šฐ ๊ธฐ๋ณธ๊ฐ’์œผ๋กœ ์ ์šฉ๋˜๋ฉฐ ๋งˆ์ง€๋ง‰ ๋ฉ”์‹œ์ง€๋ฅผ ์˜คํ”„์…‹์œผ๋กœ ์‚ฌ์šฉํ•œ๋‹ค.
    • auto.offset.reset = none ์˜ ๊ฒฝ์šฐ ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน์— ๋Œ€ํ•œ ์ด์ „ ์ปค๋ฐ‹์ด ์—†์œผ๋ฉด ์ต์…‰์…˜์ด ๋ฐœ์ƒํ•˜๊ธฐ์— ์ž˜ ์‚ฌ์šฉํ•˜์ง€ ์•Š๋Š”๋‹ค.

์กฐํšŒ์— ์˜ํ–ฅ์„ ์ฃผ๋Š” ์„ค์ •

  • fetch.min.bytes ์„ค์ •์€ ์กฐํšŒ ์‹œ ๋ธŒ๋กœ์ปค๊ฐ€ ์ „์†กํ•  ์ตœ์†Œ ๋ฐ์ดํ„ฐ ํฌ๊ธฐ๋ฅผ ์ง€์ •ํ•œ๋‹ค.
    • ๊ธฐ๋ณธ๊ฐ’์€ 1์ด๋ฉฐ, ์ด ๊ฐ’์ด ํฌ๋ฉด ์ตœ์†Œ ๋ฐ์ดํ„ฐ ํฌ๊ธฐ๊ฐ€ ๋‹ค ์ฐฐ ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐํ•˜๊ธฐ ๋•Œ๋ฌธ์— ๋Œ€๊ธฐ ์‹œ๊ฐ„์€ ๋Š˜์ง€๋งŒ ์ฒ˜๋ฆฌ๋Ÿ‰์ด ์ฆ๊ฐ€ํ•œ๋‹ค.
  • fetch.max.wait.ms ์„ค์ •์€ ๋ฐ์ดํ„ฐ๊ฐ€ ์ตœ์†Œ ํฌ๊ธฐ๊ฐ€ ๋  ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ฆด ์‹œ๊ฐ„์„ ์˜๋ฏธํ•œ๋‹ค.
    • ๊ธฐ๋ณธ๊ฐ’์€ 500์ด๋ฉฐ, ๋ธŒ๋กœ์ปค๊ฐ€ ๋ฐ์ดํ„ฐ๊ฐ€ ๋‹ค ์ฐฐ ๋•Œ๊นŒ์ง€ ๋ฌดํ•œ์ • ๊ธฐ๋‹ค๋ฆด ์ˆ˜ ์—†๊ธฐ์— ๋ฆฌํ„ดํ•  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐํ•˜๋Š” ์‹œ๊ฐ„์œผ๋กœ poll() ๋ฉ”์„œ๋“œ์˜ ๋Œ€๊ธฐ ์‹œ๊ฐ„๊ณผ ๋‹ค๋ฅด๋‹ค.
  • max.partition.fetch.bytes ์„ค์ •์€ ํŒŒํ‹ฐ์…˜ ๋‹น ์„œ๋ฒ„๊ฐ€ ๋ฆฌํ„ดํ•  ์ˆ˜ ์žˆ๋Š” ์ตœ๋Œ€ ํฌ๊ธฐ๋ฅผ ์˜๋ฏธํ•œ๋‹ค.
    • ๊ธฐ๋ณธ๊ฐ’์€ 1048576 (1MB)์ด๋ฉฐ ๋ฐ์ดํ„ฐ๊ฐ€ ์ด ๊ฐ’์„ ๋„˜์–ด๊ฐ€๋ฉด ๋ฐ”๋กœ ๋ฆฌํ„ดํ•œ๋‹ค.

์ž๋™ ์ปค๋ฐ‹/์ˆ˜๋™ ์ปค๋ฐ‹

  • enable.auto.commit ์„ค์ •
    • true: ์ผ์ • ์ฃผ๊ธฐ๋กœ ์ปจ์Šˆ๋จธ๊ฐ€ ์ฝ์€ ์˜คํ”„์…‹์„ ์ปค๋ฐ‹ (๊ธฐ๋ณธ๊ฐ’)
    • false: ์ˆ˜๋™์œผ๋กœ ์ปค๋ฐ‹ ์‹คํ–‰
  • auto.commit.interval.ms ์„ค์ •
    • ์ž๋™ ์ปค๋ฐ‹ ์ฃผ๊ธฐ๋ฅผ ์˜๋ฏธํ•˜๊ณ  ๊ธฐ๋ณธ๊ฐ’์€ 5000ms ์ฆ‰, 5์ดˆ๋‹ค.
  • poll(), close() ๋ฉ”์„œ๋“œ ํ˜ธ์ถœ ์‹œ ์ž๋™์œผ๋กœ ์ปค๋ฐ‹์„ ์‹คํ–‰ํ•œ๋‹ค.

์ˆ˜๋™ ์ปค๋ฐ‹ : ๋™๊ธฐ/๋น„๋™๊ธฐ ์ปค๋ฐ‹

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
	// ์ฒ˜๋ฆฌ
}
try {
	consumer.commitSync();
} catch (Exception ex) {
	// ์ปค๋ฐ‹ ์‹คํŒจ ์‹œ ์—๋Ÿฌ ๋ฐœ์ƒ
}
  • commitSync() ๋Š” ์‹คํŒจ ์‹œ ์ต์…‰์…˜์„ ๋ฐœ์ƒ์‹œํ‚ค๊ธฐ ๋•Œ๋ฌธ์— ์ด์— ๋”ฐ๋ฅธ ์ฒ˜๋ฆฌ๊ฐ€ ํ•„์š”ํ•˜๋‹ค.
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
	// ์ฒ˜๋ฆฌ
}
consumer.commitAsync(OffsetCommitCallback callback);
  • commitAsync() ๋Š” ๋น„๋™๊ธฐ๋กœ ์ปค๋ฐ‹ํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์„ฑ๊ณต ์—ฌ๋ถ€๋ฅผ ํ™•์ธํ•˜๊ณ  ์‹ถ์œผ๋ฉด callback ์„ ํ†ตํ•ด ์ฒ˜๋ฆฌํ•ด์•ผ ํ•œ๋‹ค.

์žฌ์ฒ˜๋ฆฌ์™€ ์ˆœ์„œ

  • ์นดํ”„์นด ์‚ฌ์šฉ ์‹œ ์ฃผ์˜ํ•  ์ ์€ ์ปจ์Šˆ๋จธ๊ฐ€ ๋™์ผํ•œ ๋ฉ”์‹œ์ง€๋ฅผ ์กฐํšŒํ•  ์ˆ˜ ์žˆ๋‹ค๋Š” ์ ์ด๋‹ค.
    • ์ผ์‹œ์  ์ปค๋ฐ‹ ์‹คํŒจ, ๋ฆฌ๋ฐธ๋Ÿฐ์Šค ๋“ฑ์— ์˜ํ•ด ๋ฐœ์ƒํ•œ๋‹ค.
  • ๋•Œ๋ฌธ์— ์ปจ์Šˆ๋จธ๋Š” ๋ฉฑ๋“ฑ์„ฑ(idempotence)์„ ๊ณ ๋ คํ•ด์•ผ ํ•œ๋‹ค.
  • ๋ฐ์ดํ„ฐ ํŠน์„ฑ์— ๋”ฐ๋ผ ํƒ€์ž„์Šคํƒฌํ”„, ์ผ๋ จ๋ฒˆํ˜ธ ๋“ฑ์„ ํ™œ์šฉํ•ด์„œ ๋ฉฑ๋“์„ฑ์„ ๊ตฌํ˜„ํ•ด์ค˜์•ผํ•œ๋‹ค.

์„ธ์…˜ ํƒ€์ž„์•„์›ƒ, ํ•˜ํŠธ๋น„ํŠธ, ์ตœ๋Œ€ poll ๊ฐ„๊ฒฉ

  • ์ปจ์Šˆ๋จธ๋Š” ํ•˜ํŠธ๋น„ํŠธ๋ฅผ ์ „์†กํ•ด์„œ ์—ฐ๊ฒฐ์„ ์œ ์ง€ํ•œ๋‹ค.
    • ๋ธŒ๋กœ์ปค๋Š” ์ผ์ • ์‹œ๊ฐ„ ์ปจ์Šˆ๋จธ๋กœ๋ถ€ํ„ฐ ํ•˜ํŠธ๋น„ํŠธ๊ฐ€ ์—†์œผ๋ฉด ์ปจ์Šˆ๋จธ๋ฅผ ๊ทธ๋ฃน์—์„œ ์ œ์™ธํ•˜๊ณ  ๋ฆฌ๋ฐธ๋Ÿฐ์Šค๋ฅผ ์ง„ํ–‰ํ•œ๋‹ค.
    • session.timeout.ms: ์„ธ์…˜ ํƒ€์ž„ ์•„์›ƒ ์‹œ๊ฐ„ (๊ธฐ๋ณธ๊ฐ’ 10์ดˆ)
    • heartbeat.interval.ms: ํ•˜ํŠธ๋น„ํŠธ ์ „์†ก ์ฃผ๊ธฐ (๊ธฐ๋ณธ๊ฐ’ 3์ดˆ), ์„ธ์…˜ ํƒ€์ž„์•„์›ƒ์˜ 1/3 ์ดํ•˜๋ฅผ ์ถ”์ฒœํ•œ๋‹ค.
  • max.poll.interval.ms: poll() ๋ฉ”์„œ๋“œ์˜ ์ตœ๋Œ€ ํ˜ธ์ถœ ๊ฐ„๊ฒฉ
    • ์ด ์‹œ๊ฐ„์ด ์ง€๋‚˜๋„๋ก poll() ํ•˜์ง€ ์•Š์œผ๋ฉด ์ปจ์Šˆ๋จธ๋ฅผ ๊ทธ๋ฃน์—์„œ ๋นผ๊ณ  ๋ฆฌ๋ฐธ๋Ÿฐ์Šค ์ง„ํ–‰

์ข…๋ฃŒ ์ฒ˜๋ฆฌ

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
consumer.subscribe(Collections.singleton("simple")); // ํ† ํ”ฝ ๊ตฌ๋…
try {
	while (true) {
		ConsumerRecords<String, String> records = consumer.poll(); // wakeup()
		// records ์ฒ˜๋ฆฌ
		try {
			consumer.commitAsync();
		} catch (Exception e) {
			// ์ปค๋ฐ‹ ์‹คํŒจ ์ฒ˜๋ฆฌ
		}
	}
} catch (Exception ex) {
	// Wakeup ์ต์…‰์…˜ ์ฒ˜๋ฆฌ
} finally {
	consumer.close();
}
  • ์ผ๋ฐ˜์ ์œผ๋กœ ์ปจ์Šˆ๋จธ๋Š” ๋ฌดํ•œ๋ฃจํ”„ ์•ˆ์—์„œ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›์•„์˜ค๊ธฐ ๋•Œ๋ฌธ์— wakeup() ๋ฉ”์„œ๋“œ๋ฅผ ํ†ตํ•ด ์ต์…‰์…˜์„ ๋ฐœ์ƒ์‹œ์ผœ ์ปจ์Šˆ๋จธ๋ฅผ ์ข…๋ฃŒํ•  ์ˆ˜ ์žˆ๋‹ค.
  • KafkaConsumer ๋Š” Thread-safe ํ•˜์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์— ์—ฌ๋Ÿฌ ์Šค๋ ˆ๋“œ์—์„œ ์‚ฌ์šฉํ•˜๋ฉด ์•ˆ๋œ๋‹ค.
    • wakeup() ๋ฉ”์„œ๋“œ๋Š” ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ์—์„œ ์‹คํ–‰ํ•ด์•ผ๋œ๋‹ค.

References