Kafka ๋
Apache Kafka ๋ Java, Scala ๋ก ์์ฑ๋ ๋ถ์ฐ ๋ฐ์ดํฐ ์คํธ๋ฆฌ๋ฐ ํ๋ซํผ์ผ๋ก Linkedin ์์ ๊ฐ๋ฐํ๊ณ Apache ์ฌ๋จ์์ ๊ด๋ฆฌํ๊ณ ์๋ค.
Kafka Broker
- Kafka Broker ๋ Kafka Application ์ ์คํ์ค์ธ ์๋ฒ ์ธ์คํด์ค๋ค.
- Kafka ๊ธฐ๋ณธ์ ์ผ๋ก 3๋ ์ด์์ Kafka Broker ํด๋ฌ์คํฐ๋ก ๊ตฌ์ฑ๋๋ค.
- v2.5 ๊น์ง ์ฃผํคํผ๋ฅผ ํตํด ํด๋ฌ์คํฐ๋ฅผ ๊ด๋ฆฌํ๊ณ , v2.8 ๋ถํฐ KRaft ์ฌ์ฉ.
- n๊ฐ์ ๋ธ๋ก์ปค ์ค 1๋๋ ์ปจํธ๋กค๋ฌ ๊ธฐ๋ฅ์ ์ํํ๋ค.
- ์ปจํธ๋กค๋ฌ์ ์ญํ :
- ๊ฐ ๋ธ๋ก์ปค์๊ฒ ๋ด๋น ํํฐ์ ํ ๋น
- ๋ธ๋ก์ปค ์ ์ ๋์ ๋ชจ๋ํฐ๋ง
- ์ด๋ค ๋ธ๋ก์ปค๊ฐ ์ปจํธ๋กค๋ฌ์ธ์ง๋ ์ฃผํคํผ์ ์ ์ฅ๋๋ค.
- ์ปจํธ๋กค๋ฌ์ ์ญํ :
Record
new ProducerRecord<String, String>("topic", "key", "message");
ConsumerRecords<String, String> records = consumer.poll(1000);
- ๊ฐ์ฒด๋ฅผ Producer ์์ Consumer ๋ก ์ ๋ฌํ ๋ Kafka ๋ด๋ถ์์ byte ํํ๋ก ์ ์ฅํ๊ธฐ ๋๋ฌธ์ ์ง๋ ฌํ/์ญ์ง๋ ฌํ๊ฐ ํ์ํ๋ค.
- Kafka ์์ ๊ธฐ๋ณธ์ ์ผ๋ก ์ ๊ณตํ๋ StringSerializer, ShortSerializer ๋ฑ์ ์ฌ์ฉํ ์ ์๊ณ , ์ปค์คํ ์ง๋ ฌํ ํด๋์ค๋ฅผ ํตํด POJO ๋ ์ง๋ ฌํ/์ญ์ง๋ ฌํ๊ฐ ๊ฐ๋ฅํ๋ค.
Topic & Partition
- ๋ฉ์์ง๋ฅผ Topic ์ผ๋ก ๋ณด๋ด๋ฉด Partition ์ ํตํด ๋ฉ์์ง๋ฅผ ๋ถ๋ฅํ ์ ์๋ค.
- 1๊ฐ์ Topic ์ 1๊ฐ ์ด์์ Partition ์ ๊ฐ์ง ์ ์๋ค.
- ๊ฐ Partition ๋ง๋ค ๊ณ ์ ํ Offset ์ ๊ฐ์ง๋ค.
- ๋ฉ์์ง ์ฒ๋ฆฌ์์๋ Partition ๋ณ๋ก ๊ฐ๊ฐ ์ ์ง ๋ฐ ๊ด๋ฆฌ๋๋ค.
- Producer ๊ฐ Topic ์ ๋ฉ์์ง๋ฅผ ๋ณด๋ด๋ฉด ๋ฉ์์ง๋ ๋ผ์ด๋๋ก๋น ๋๋ Producer ๊ฐ ์ง์ ํ Partition Key ๋ฅผ ํ์ฉํด ๋ฉ์์ง๋ฅผ ์ ์ฅํ Partition ์ ์ง์ ํ ์ ์๋ค.
Kafka log and segment
- Kakfa ๋ ๋ค๋ฅธ ๋ฉ์์ง ํ๋ซํผ๊ณผ ๋ฌ๋ฆฌ ๋ฉ์์ง๋ฅผ ํ์ผ์์คํ ์ ์ ์ฅ๋๋ค.
- ๋ฉ์์ง๊ฐ ์ ์ฅ๋ ๋๋ segment ํ์ผ์ด ์ด๋ ค์์ผ๋ฉฐ, segment ๋ ์๊ฐ ๋๋ ์ฉ๋ ๊ธฐ์ค์ผ๋ก ๋ซํ๋ค.
- segment ๊ฐ ๋ซํ ์ดํ์ ์ผ์ ์๊ฐ ๋๋ ์ฉ๋์ ๋ฐ๋ผ ์ญ์ ๋๋ ์์ถ๋๋ค.
Partition & Consumer
- Partition ๊ฐ์ >= Consumer๊ฐ์
- ์ฆ, 1๊ฐ์ Consumer ๊ฐ 3๊ฐ์ Partition ์ผ๋ก๋ถํฐ ๋ฉ์์ง๋ฅผ ์ปจ์๋ฐ ํ ์ ์๊ณ , 3๊ฐ์ Consumer ๊ฐ ๊ฐ๊ฐ Partition ์ ํ ๋น ๋ฐ์ 1:1 ๋ก ๋ฉ์์ง๋ฅผ ์ปจ์๋ฐ ํ ์ ์์ง๋ง, 4๊ฐ์ Consumer ๊ฐ ์๋ ๊ฒฝ์ฐ 1๊ฐ์ Consumer ๋ ๋๊ฒ๋๋ค.
- Consumer ์ค 1๋๊ฐ ์ฅ์ ๊ฐ ๋ฐ์ํ ๊ฒฝ์ฐ Partition ์ ๋ค์ ํ ๋น ๋ฐ์ Consumer ๋ฅผ ์ฐพ๊ธฐ ์ํด ๋ฆฌ๋ฐธ๋ฐ์ฑ์ด ์ํ๋๋ค.
- Consumer ๋ ๋ชฉ์ ์ ๋ฐ๋ผ Consumer ๊ทธ๋ฃน์ผ๋ก ๋ถ๋ฆฌํ ์ ์๋ค.
Broker Partition Replication
- Broker ์๋ฒ๊ฐ ๋ค์ด๋ ๋ ๋ฐ์ดํฐ ์ ์ค์ ๋ฐฉ์งํ๊ธฐ ์ํด
--replicator-factor 3
์ต์ ์ ํตํด Partition ์ ๋ค๋ฅธ Broker ์ ๋ณต์ ํ์ฌ ์ด์์ ๋์ํ๋ค. - ์ฆ, 1๊ฐ์ Topic ์ 3๊ฐ์ Partition ๊ณผ 3๊ฐ์ Replicator Factor ๋ก ์ค์ ํ๋ฉด, 3๊ฐ์ Broker ์๋ฒ์ ๊ฐ๊ฐ Partition 3๊ฐ๊ฐ ํ ๋น๋์ด ์ด 9๊ฐ์ Partition ์ ๊ฐ์ง๊ฒ๋๋ค.
- Partition ๋ณต์ ๋ ๋ฆฌ๋์ ํ๋ก์๋ก ๊ตฌ์ฑ๋๋ฉฐ, ๋ฆฌ๋๋ Kafka ํด๋ผ์ด์ธํธ์ ์ง์ ๋ฐ์ดํฐ๋ฅผ ์ฃผ๊ณ ๋ฐ๊ณ ํ๋ก์๋ ๋ฆฌ๋๋ก ๋ถํฐ ๋ ๊ณ ๋๋ฅผ ์ง์ ๋ณต์ ํ๋ฉฐ ๋ฆฌ๋๊ฐ ๋ค์ด๋ ๊ฒฝ์ฐ ํ๋ก์ ์ค 1๊ฐ๊ฐ ๋ฆฌ๋๋ก ์ ์ถ๋๋ค.
- ๋ฆฌ๋์ ํ๋ก์๊ฐ ๋ชจ๋ sync ๊ฐ ๋ง๋ ์ํ๋ฅผ In-Sync Replica, ISR ์ด๋ผ๊ณ ํ๋๋ฐ, ๋ง์ฝ ISR ์ด ์๋ ์ํ์์ ์ฅ์ ๊ฐ ๋ฐ์ํ๋ฉด ํ๋ก์๋ ๋ฆฌ๋๊ฐ ๋ณต๊ตฌ๋ ๋ ๊น์ง ๊ธฐ๋ค๋ ค์ผ ํ๊ธฐ ๋๋ฌธ์
unclean.leader.election.enable
์ต์ ์ true ๋ก ์ค์ ํด์ ISR ์ํ๊ฐ ์๋ ํ๋ก์๋ ๋ฆฌ๋๋ก ์ ์ถ๋ ์์๊ฒ๋ ์กฐ์นํ์ฌ ๋ค๋ฅธ ๋ ์ฝ๋๋ฅผ ๋จผ์ ์ฒ๋ฆฌํ ์ ์๋๋ก ์ค์ ํ ์ ์๋ค.
- ๋ฆฌ๋์ ํ๋ก์๊ฐ ๋ชจ๋ sync ๊ฐ ๋ง๋ ์ํ๋ฅผ In-Sync Replica, ISR ์ด๋ผ๊ณ ํ๋๋ฐ, ๋ง์ฝ ISR ์ด ์๋ ์ํ์์ ์ฅ์ ๊ฐ ๋ฐ์ํ๋ฉด ํ๋ก์๋ ๋ฆฌ๋๊ฐ ๋ณต๊ตฌ๋ ๋ ๊น์ง ๊ธฐ๋ค๋ ค์ผ ํ๊ธฐ ๋๋ฌธ์
Kafka Producer
Properties prop = new Properties();
prop.put("bootstrap.servers", "kafka01:9092, kafka01:9092, kafka01:9092");
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<Integer, String> producer = new KafkaProducer<>(prop);
producer.send(new ProducerRecord<>("topicName", "key", "value"));
producer.send(new ProducerRecord<>("topicName", "value"));
producer.close();
send()
๋ฅผ ํธ์ถํ๋ฉดSerializer
๋ฅผ ํตํด Record ๋ฅผ byte ๋ก ๋ณํํ๊ณPartitioner
๋ฅผ ํตํด ์ ์ฅ๋ ํํฐ์ ์ ๊ฒฐ์ ํ๋ค.- ์ดํ Record ๋ค์ Buffer ์ ์ ์ฅํ๋๋ฐ ๋ฐฐ์น๋ก ๋ฌถ์ด์ ์ ์ฅํ๋ค.
- ์ดํ Sender ๊ฐ ๋ฐฐ์น๋ฅผ Kafka Broker ์๊ฒ ๋ฐฐ์น๋ฅผ ์ ์กํ๋ค.
- Sender ๋ ๋ณ๋์ ์ค๋ ๋๋ก ์๋ํ๋ค.
- ๋ฐฐ์น๊ฐ ์ฐผ๋์ง ์ฌ๋ถ์ ์๊ด์์ด Sender ๋ ์ฐจ๋ก๋๋ก Broker ์ ์ ์กํ๋ค.
- Sender ๊ฐ ๋ฐฐ์น๋ฅผ ์ ์กํ๋ ๋์
send() -> Serializer -> Partitioner
๋ ๋ฉ์์ง๋ฅผ ๋ฐฐ์น๋ก ๋ชจ์๋ค.
- ์ฒ๋ฆฌ๋ ๊ด๋ จ๋ ์ฃผ์ ์ค์
batch.size
๋ ๋ฐฐ์น ํฌ๊ธฐ๋ฅผ ์ง์ ํ๊ณ , ๋ฐฐ์น๊ฐ ๋ค ์ฐจ๋ฉด ๋ฐ๋ก ์ ์กํ๋ค.linger.ms
์ ์ก ๋๊ธฐ ์๊ฐ์ผ๋ก ๊ธฐ๋ณธ๊ฐ์ด 0์ธ๋ฐ, ๋๊ธฐ ์๊ฐ์ ์ฃผ๋ฉด ์ค์ ๊ฐ๋งํผ ๊ธฐ๋ค๋ ธ๋ค๊ฐ ๋ฐฐ์น๋ฅผ ์ ์กํ๊ณ ๋๊ธฐ ์๊ฐ์ด ์์ผ๋ฉด ๋ฐ๋ก ๋ฐฐ์น๋ฅผ ๋ณด๋ธ๋ค.
Future<RecordMetadata> f = producer.send(new ProducerRecord<>("topic", "value"));
try {
RecordMetadata meta = f.get(); // ๋ธ๋กํน
}
send()
๋Future
๋ฅผ ๋ฐํํด์ ์ ์ก ๊ฒฐ๊ณผ๋ฅผ ํ์ธํ ์ ์๋ค.- ํ๋์ ๋ฉ์์ง๋ฅผ ๋ณด๋ผ๋ ๋ง๋ค ๋ธ๋กํน๋๊ธฐ ๋๋ฌธ์ ๋ฐฐ์น์ ๋ฉ์์ง๊ฐ ์์ด์ง ์๊ณ ํ๋์ฉ๋ง ๋ค์ด๊ฐ๊ธฐ ๋๋ฌธ์ ์ฒ๋ฆฌ๋์ด ์ ํ๋๋ค.
- ๋์ ๋ฉ์์ง๊ฑด๋ณ๋ก ํ์คํ๊ฒ ๊ฒฐ๊ณผ๋ฅผ ์ ์ ์๋ค.
producer.send(new ProducerRecord<>("topic", "value"),
new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception ex) {
}
}
);
- ๋์ Callback ๊ฐ์ฒด๋ฅผ ์ ๋ฌํด์ ๋ ผ๋ธ๋กํน ๋ฐฉ์์ผ๋ก ์ ์ก์ ํ์ธํ ์ ์๋ค. ๋๋ฌธ์ ๋ฐฐ์น๊ฐ ์ ์์ ์ผ๋ก ์์ด๊ธฐ ๋๋ฌธ์ ์ฒ๋ฆฌ๋ ์ ํ๊ฐ ์๋ค.
์ ์ก ๋ณด์ฅ ack + min.insync.replicas
- ์์์ ISR ์ ๋ํด ์ค๋ช
ํ๋ฏ์ด ๋ฉ์์ง๊ฐ ์ฌ๋ฌ ๋ธ๋ก์ปค์ ๊ฑธ์ณ ๋ณต์ ๋๋๋ฐ, ํ๋ก๋์ ์
์ฅ์์ ack ๊ฐ์ ํตํด ์ ์ก ๋ณด์ฅ์ ํ์ธํ ์ ์๋ค.
- ack = 0 ์ผ ๊ฒฝ์ฐ ์๋ฒ์ ์๋ต์ ๊ธฐ๋ค๋ฆฌ์ง ์๊ธฐ์ ์ ์ก ๋ณด์ฅ์ด ๋์ง ์๋๋ค.
- ack = 1 ์ผ ๊ฒฝ์ฐ ํํฐ์ ์ ๋ฆฌ๋์ ์ ์ฅ๋๋ฉด ๋ธ๋ก์ปค๋ก๋ถํฐ ์๋ต์ด ์จ๋ค. ๋๋ฌธ์ ๋ฆฌ๋์ ์ฅ์ ๊ฐ ๋๋ฉด ๋ฉ์์ง๊ฐ ์ ์ค๋ ์ ์๋ค.
- ack = all ์ผ ๊ฒฝ์ฐ ๋ชจ๋ ๋ฆฌํ๋ฆฌ์นด์ ์ ์ฅ๋๋ฉด ์๋ต์ ๋ฐ๋๋ค. ์ด๋ ๋ฆฌํ๋ฆฌ์นด๋ ๋ธ๋ก์ปค์ min.insync.replicas ์ค์ ์ ๋ฐ๋ผ ๋ฌ๋ผ์ง๋ค.
- ๋ฆฌํ๋ฆฌ์นด ๊ฐ์๊ฐ 3์ด๊ณ ack = all ์ผ ๋
- min.insync.replicas = 1 ์ด๋ฉด, ack = 1 ๊ณผ ๋์ผํ๊ฒ ๋ฆฌ๋์ ์ ์ฅ๋๋ฉด ์๋ตํ๋ค.
- min.insync.replicas = 2 ์ด๋ฉด, ๋ฆฌ๋์ ์ ์ฅํ๊ณ ํ๋ก์ 1๊ฐ์ ์ ์ฅ๋๋ฉด ์๋ตํ๋ค.
- min.insync.replicas = 3 ์ด๋ฉด, ๋ฆฌ๋์ ํ๋ก์ 2๊ฐ๊ฐ ๋ชจ๋ ์ ์ฅ๋์ผ ์๋ตํ๋ค. ํ๋ก์ ์ค ํ๋๋ผ๋ ์ฅ์ ๊ฐ ๋๋ฉด ๋ฆฌํ๋ฆฌ์นด ๋ถ์กฑ์ผ๋ก ์ ์ฅ์ ์คํจํ๋ค.
Kafka Consumer
Properties prop = new Properties();
prop.put("bootstrap.servers", "localhost:9092");
prop.put("group.id", "group1"); // ์ปจ์๋จธ๊ทธ๋ฃน ์ง์
prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
consumer.subscribe(Collections.singleton("simple")); // ํ ํฝ ๊ตฌ๋
while () {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
System.out.println(record.topic());
System.out.println(record.partition());
System.out.println(record.offset());
}
}
consumer.close();
- ์ ์ฝ๋๋ Consumer ๊ฐ Partition ์ผ๋ก๋ถํฐ ๋ฉ์์ง๋ฅผ ์ฝ์ด์ฌ ๋ ์ฌ์ฉํ๋ ์ ํ์ ์ธ ์ฝ๋ ๊ตฌ์กฐ๋ค.
Commit & Offset
consumer.poll()
๋ฉ์๋๋ ์ด์ ์ ์ปค๋ฐ๋ ์คํ์ ์ผ๋ก๋ถํฐ ๋ฉ์์ง๋ฅผ ์ฝ์ด์ค๊ณ ๋ง์ง๋ง์ผ๋ก ์ฝ์ด์จ ๋ฉ์์ง์ ์คํ์ ์ ๊ธฐ์ค์ผ๋ก ๋ค์ ์ปค๋ฐํ๋ค. ์ด ๊ณผ์ ์ ๋ฐ๋ณตํ์ฌ ๋ฉ์์ง๋ฅผ ์ฝ์ด์ค๋ ๊ตฌ์กฐ๋ค.- ๋ง์ฝ ์ฒ์ ์ ๊ทผ์ด๊ฑฐ๋ ์ปค๋ฐํ ์คํ์
์ด ์๋ ๊ฒจ์ฐ
auto.offset.reset
์ค์ ์ ํ์ฉํ ์ ์๋ค.- auto.offset.reset = earliest ์ ๊ฒฝ์ฐ ๋งจ ์ฒ์ ๋ฉ์์ง๋ฅผ ์คํ์ ์ผ๋ก ์ฌ์ฉํ๋ค.
- auto.offset.reset = latest ์ ๊ฒฝ์ฐ ๊ธฐ๋ณธ๊ฐ์ผ๋ก ์ ์ฉ๋๋ฉฐ ๋ง์ง๋ง ๋ฉ์์ง๋ฅผ ์คํ์ ์ผ๋ก ์ฌ์ฉํ๋ค.
- auto.offset.reset = none ์ ๊ฒฝ์ฐ ์ปจ์๋จธ ๊ทธ๋ฃน์ ๋ํ ์ด์ ์ปค๋ฐ์ด ์์ผ๋ฉด ์ต์ ์ ์ด ๋ฐ์ํ๊ธฐ์ ์ ์ฌ์ฉํ์ง ์๋๋ค.
์กฐํ์ ์ํฅ์ ์ฃผ๋ ์ค์
- fetch.min.bytes ์ค์ ์ ์กฐํ ์ ๋ธ๋ก์ปค๊ฐ ์ ์กํ ์ต์ ๋ฐ์ดํฐ ํฌ๊ธฐ๋ฅผ ์ง์ ํ๋ค.
- ๊ธฐ๋ณธ๊ฐ์ 1์ด๋ฉฐ, ์ด ๊ฐ์ด ํฌ๋ฉด ์ต์ ๋ฐ์ดํฐ ํฌ๊ธฐ๊ฐ ๋ค ์ฐฐ ๋๊น์ง ๋๊ธฐํ๊ธฐ ๋๋ฌธ์ ๋๊ธฐ ์๊ฐ์ ๋์ง๋ง ์ฒ๋ฆฌ๋์ด ์ฆ๊ฐํ๋ค.
- fetch.max.wait.ms ์ค์ ์ ๋ฐ์ดํฐ๊ฐ ์ต์ ํฌ๊ธฐ๊ฐ ๋ ๋๊น์ง ๊ธฐ๋ค๋ฆด ์๊ฐ์ ์๋ฏธํ๋ค.
- ๊ธฐ๋ณธ๊ฐ์ 500์ด๋ฉฐ, ๋ธ๋ก์ปค๊ฐ ๋ฐ์ดํฐ๊ฐ ๋ค ์ฐฐ ๋๊น์ง ๋ฌดํ์ ๊ธฐ๋ค๋ฆด ์ ์๊ธฐ์ ๋ฆฌํดํ ๋๊น์ง ๋๊ธฐํ๋ ์๊ฐ์ผ๋ก poll() ๋ฉ์๋์ ๋๊ธฐ ์๊ฐ๊ณผ ๋ค๋ฅด๋ค.
- max.partition.fetch.bytes ์ค์ ์ ํํฐ์
๋น ์๋ฒ๊ฐ ๋ฆฌํดํ ์ ์๋ ์ต๋ ํฌ๊ธฐ๋ฅผ ์๋ฏธํ๋ค.
- ๊ธฐ๋ณธ๊ฐ์ 1048576 (1MB)์ด๋ฉฐ ๋ฐ์ดํฐ๊ฐ ์ด ๊ฐ์ ๋์ด๊ฐ๋ฉด ๋ฐ๋ก ๋ฆฌํดํ๋ค.
์๋ ์ปค๋ฐ/์๋ ์ปค๋ฐ
- enable.auto.commit ์ค์
- true: ์ผ์ ์ฃผ๊ธฐ๋ก ์ปจ์๋จธ๊ฐ ์ฝ์ ์คํ์ ์ ์ปค๋ฐ (๊ธฐ๋ณธ๊ฐ)
- false: ์๋์ผ๋ก ์ปค๋ฐ ์คํ
- auto.commit.interval.ms ์ค์
- ์๋ ์ปค๋ฐ ์ฃผ๊ธฐ๋ฅผ ์๋ฏธํ๊ณ ๊ธฐ๋ณธ๊ฐ์ 5000ms ์ฆ, 5์ด๋ค.
- poll(), close() ๋ฉ์๋ ํธ์ถ ์ ์๋์ผ๋ก ์ปค๋ฐ์ ์คํํ๋ค.
์๋ ์ปค๋ฐ : ๋๊ธฐ/๋น๋๊ธฐ ์ปค๋ฐ
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// ์ฒ๋ฆฌ
}
try {
consumer.commitSync();
} catch (Exception ex) {
// ์ปค๋ฐ ์คํจ ์ ์๋ฌ ๋ฐ์
}
- commitSync() ๋ ์คํจ ์ ์ต์ ์ ์ ๋ฐ์์ํค๊ธฐ ๋๋ฌธ์ ์ด์ ๋ฐ๋ฅธ ์ฒ๋ฆฌ๊ฐ ํ์ํ๋ค.
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// ์ฒ๋ฆฌ
}
consumer.commitAsync(OffsetCommitCallback callback);
- commitAsync() ๋ ๋น๋๊ธฐ๋ก ์ปค๋ฐํ๊ธฐ ๋๋ฌธ์ ์ฑ๊ณต ์ฌ๋ถ๋ฅผ ํ์ธํ๊ณ ์ถ์ผ๋ฉด callback ์ ํตํด ์ฒ๋ฆฌํด์ผ ํ๋ค.
์ฌ์ฒ๋ฆฌ์ ์์
- ์นดํ์นด ์ฌ์ฉ ์ ์ฃผ์ํ ์ ์ ์ปจ์๋จธ๊ฐ ๋์ผํ ๋ฉ์์ง๋ฅผ ์กฐํํ ์ ์๋ค๋ ์ ์ด๋ค.
- ์ผ์์ ์ปค๋ฐ ์คํจ, ๋ฆฌ๋ฐธ๋ฐ์ค ๋ฑ์ ์ํด ๋ฐ์ํ๋ค.
- ๋๋ฌธ์ ์ปจ์๋จธ๋ ๋ฉฑ๋ฑ์ฑ(idempotence)์ ๊ณ ๋ คํด์ผ ํ๋ค.
- ๋ฐ์ดํฐ ํน์ฑ์ ๋ฐ๋ผ ํ์์คํฌํ, ์ผ๋ จ๋ฒํธ ๋ฑ์ ํ์ฉํด์ ๋ฉฑ๋์ฑ์ ๊ตฌํํด์ค์ผํ๋ค.
์ธ์ ํ์์์, ํํธ๋นํธ, ์ต๋ poll ๊ฐ๊ฒฉ
- ์ปจ์๋จธ๋ ํํธ๋นํธ๋ฅผ ์ ์กํด์ ์ฐ๊ฒฐ์ ์ ์งํ๋ค.
- ๋ธ๋ก์ปค๋ ์ผ์ ์๊ฐ ์ปจ์๋จธ๋ก๋ถํฐ ํํธ๋นํธ๊ฐ ์์ผ๋ฉด ์ปจ์๋จธ๋ฅผ ๊ทธ๋ฃน์์ ์ ์ธํ๊ณ ๋ฆฌ๋ฐธ๋ฐ์ค๋ฅผ ์งํํ๋ค.
- session.timeout.ms: ์ธ์ ํ์ ์์ ์๊ฐ (๊ธฐ๋ณธ๊ฐ 10์ด)
- heartbeat.interval.ms: ํํธ๋นํธ ์ ์ก ์ฃผ๊ธฐ (๊ธฐ๋ณธ๊ฐ 3์ด), ์ธ์ ํ์์์์ 1/3 ์ดํ๋ฅผ ์ถ์ฒํ๋ค.
- max.poll.interval.ms: poll() ๋ฉ์๋์ ์ต๋ ํธ์ถ ๊ฐ๊ฒฉ
- ์ด ์๊ฐ์ด ์ง๋๋๋ก poll() ํ์ง ์์ผ๋ฉด ์ปจ์๋จธ๋ฅผ ๊ทธ๋ฃน์์ ๋นผ๊ณ ๋ฆฌ๋ฐธ๋ฐ์ค ์งํ
์ข ๋ฃ ์ฒ๋ฆฌ
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
consumer.subscribe(Collections.singleton("simple")); // ํ ํฝ ๊ตฌ๋
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(); // wakeup()
// records ์ฒ๋ฆฌ
try {
consumer.commitAsync();
} catch (Exception e) {
// ์ปค๋ฐ ์คํจ ์ฒ๋ฆฌ
}
}
} catch (Exception ex) {
// Wakeup ์ต์
์
์ฒ๋ฆฌ
} finally {
consumer.close();
}
- ์ผ๋ฐ์ ์ผ๋ก ์ปจ์๋จธ๋ ๋ฌดํ๋ฃจํ ์์์ ๋ฉ์์ง๋ฅผ ๋ฐ์์ค๊ธฐ ๋๋ฌธ์
wakeup()
๋ฉ์๋๋ฅผ ํตํด ์ต์ ์ ์ ๋ฐ์์์ผ ์ปจ์๋จธ๋ฅผ ์ข ๋ฃํ ์ ์๋ค. - KafkaConsumer ๋ Thread-safe ํ์ง ์๊ธฐ ๋๋ฌธ์ ์ฌ๋ฌ ์ค๋ ๋์์ ์ฌ์ฉํ๋ฉด ์๋๋ค.
- wakeup() ๋ฉ์๋๋ ๋ค๋ฅธ ์ค๋ ๋์์ ์คํํด์ผ๋๋ค.