kafka 이벤트 발행
kafka 이벤트를 발행해봅니다.
1 | public class ProducerDemo { |
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.1
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: FUBUbuy5SqqwDTpqZI7_hg
위와 같이 결과가 뜨지만, 실제로는 컨슈머에 전달되지 않았습니다. 컨슈머까지 이벤트를 전달하기 위해서 아래와 같이 producer.flush()
와 producer.close()
를 추가해줍니다.
1 | public class ProducerDemo { |
결과는 다음과 같이 출력됩니다.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.1
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: FUBUbuy5SqqwDTpqZI7_hg
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
컨슈머는 다음과 같이 정상적으로 이벤트를 수신하였습니다..1
2daeukyui-iMac:kafka_2.11-2.1.0 daeuky$ ./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic second_topic --from-beginning
hello world
이유인 즉슨, 카프카 프로듀서는 asynchronous 하게 동작합니다. producer.flush()
메소드 내부를 타고 들어가보면 java.nio.channels.Selector#wakeup
이 호출되면서 즉시 결과를 리턴해줍니다.
또 다른 방법으로는 producer.send()
가 future
객채를 반환하고 있기 때문에 다음과 같이 future
객체의 get()
을 명시적으로 호출함으로써 이벤트를 발행할 수 있습니다.
1 | // send data |
카프카 프로듀서 콜백
org.apache.kafka.clients.producer.KafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord<K,V>, org.apache.kafka.clients.producer.Callback)
의 두 번째 파라미터로 org.apache.kafka.clients.producer.Callback
인터페이스의 구현체를 전달해주면, 이벤트 발행 후 액션을 정의할 수 있습니다.
1 | public class ProducerDemoWithCallback { |
RecordMetadata
의 정보를 이용해서 발행된 이벤트의 메타데이터들을 확인할 수 있습니다. 토픽의 이름과 이벤트 발행에 사용된 파티션의 정보, 오프셋, 시간 정보 등을 얻을 수 있습니다. 결과는 다음과 같습니다.
[kafka-producer-network-thread | producer-1] INFO com.daeuky.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata.
Topic:second_topic
Partition:0
Offset:6
TimeStamp:1552398118875
[kafka-producer-network-thread | producer-1] INFO com.daeuky.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata.
Topic:second_topic
Partition:0
Offset:7
TimeStamp:1552398118883
[kafka-producer-network-thread | producer-1] INFO com.daeuky.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata.
Topic:second_topic
Partition:0
Offset:8
TimeStamp:1552398118883
[kafka-producer-network-thread | producer-1] INFO com.daeuky.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata.
Topic:second_topic
Partition:0
Offset:9
TimeStamp:1552398118883
[kafka-producer-network-thread | producer-1] INFO com.daeuky.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata.
Topic:second_topic
Partition:0
Offset:10
TimeStamp:1552398118883
[kafka-producer-network-thread | producer-1] INFO com.daeuky.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata.
Topic:second_topic
Partition:0
Offset:11
TimeStamp:1552398118883
[kafka-producer-network-thread | producer-1] INFO com.daeuky.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata.
Topic:second_topic
Partition:0
Offset:12
TimeStamp:1552398118883
[kafka-producer-network-thread | producer-1] INFO com.daeuky.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata.
Topic:second_topic
Partition:0
Offset:13
TimeStamp:1552398118883
[kafka-producer-network-thread | producer-1] INFO com.daeuky.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata.
Topic:second_topic
Partition:0
Offset:14
TimeStamp:1552398118883
[kafka-producer-network-thread | producer-1] INFO com.daeuky.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata.
Topic:second_topic
Partition:0
Offset:15
TimeStamp:1552398118883
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
Comments