자바로 카프카 프로듀서 만들기

kafka 이벤트 발행

kafka 이벤트를 발행해봅니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ProducerDemo {
public static void main(String[] args) {

String bootstrapServers = "127.0.0.1:9092";

// create Producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// create the producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

// create a producer record
ProducerRecord<String, String> record =
new ProducerRecord<String, String>("second_topic", "hello world");

// send data
producer.send(record);
}
}

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.1
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: FUBUbuy5SqqwDTpqZI7_hg

위와 같이 결과가 뜨지만, 실제로는 컨슈머에 전달되지 않았습니다. 컨슈머까지 이벤트를 전달하기 위해서 아래와 같이 producer.flush()producer.close() 를 추가해줍니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ProducerDemo {
public static void main(String[] args) {

String bootstrapServers = "127.0.0.1:9092";

// create Producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// create the producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

// create a producer record
ProducerRecord<String, String> record =
new ProducerRecord<String, String>("second_topic", "hello world");

// send data
producer.send(record);

// flush data
producer.flush();

// flush and close producer
producer.close();
}
}

결과는 다음과 같이 출력됩니다.

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.1
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: FUBUbuy5SqqwDTpqZI7_hg
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

컨슈머는 다음과 같이 정상적으로 이벤트를 수신하였습니다..

1
2
daeukyui-iMac:kafka_2.11-2.1.0 daeuky$ ./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic second_topic --from-beginning
hello world

이유인 즉슨, 카프카 프로듀서는 asynchronous 하게 동작합니다. producer.flush() 메소드 내부를 타고 들어가보면 java.nio.channels.Selector#wakeup 이 호출되면서 즉시 결과를 리턴해줍니다.

또 다른 방법으로는 producer.send()future 객채를 반환하고 있기 때문에 다음과 같이 future 객체의 get() 을 명시적으로 호출함으로써 이벤트를 발행할 수 있습니다.

1
2
3
// send data
Future<RecordMetadata> recordMetadataFuture = producer.send(record);
recordMetadataFuture.get();

카프카 프로듀서 콜백

org.apache.kafka.clients.producer.KafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord<K,V>, org.apache.kafka.clients.producer.Callback) 의 두 번째 파라미터로 org.apache.kafka.clients.producer.Callback 인터페이스의 구현체를 전달해주면, 이벤트 발행 후 액션을 정의할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class ProducerDemoWithCallback {
public static void main(String[] args) {

final Logger logger = LoggerFactory.getLogger(ProducerDemoWithCallback.class);

String bootstrapServers = "127.0.0.1:9092";

// create Producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// create the producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);


for (int i = 0 ; i < 10; i++) {
// create a producer record
ProducerRecord<String, String> record =
new ProducerRecord<String, String>("second_topic", "hello world " + Integer.toString(i));

// send data - asynchronous
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// executes every time a record is successfully sent or an exception is thrown
if (e == null) {
// the record was successfully sent
logger.info("Received new metadata. \n"
+ "Topic:" + recordMetadata.topic() + "\n"
+ "Partition:" + recordMetadata.partition() + "\n"
+ "Offset:" + recordMetadata.offset() + "\n"
+ "TimeStamp:" + recordMetadata.timestamp());
} else {
logger.error("Error while producing", e);
}
}
});
}

// flush data
producer.flush();

// flush and close producer
producer.close();
}
}

RecordMetadata 의 정보를 이용해서 발행된 이벤트의 메타데이터들을 확인할 수 있습니다. 토픽의 이름과 이벤트 발행에 사용된 파티션의 정보, 오프셋, 시간 정보 등을 얻을 수 있습니다. 결과는 다음과 같습니다.

[kafka-producer-network-thread | producer-1] INFO com.daeuky.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata.
Topic:second_topic
Partition:0
Offset:6
TimeStamp:1552398118875
[kafka-producer-network-thread | producer-1] INFO com.daeuky.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata.
Topic:second_topic
Partition:0
Offset:7
TimeStamp:1552398118883
[kafka-producer-network-thread | producer-1] INFO com.daeuky.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata.
Topic:second_topic
Partition:0
Offset:8
TimeStamp:1552398118883
[kafka-producer-network-thread | producer-1] INFO com.daeuky.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata.
Topic:second_topic
Partition:0
Offset:9
TimeStamp:1552398118883
[kafka-producer-network-thread | producer-1] INFO com.daeuky.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata.
Topic:second_topic
Partition:0
Offset:10
TimeStamp:1552398118883
[kafka-producer-network-thread | producer-1] INFO com.daeuky.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata.
Topic:second_topic
Partition:0
Offset:11
TimeStamp:1552398118883
[kafka-producer-network-thread | producer-1] INFO com.daeuky.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata.
Topic:second_topic
Partition:0
Offset:12
TimeStamp:1552398118883
[kafka-producer-network-thread | producer-1] INFO com.daeuky.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata.
Topic:second_topic
Partition:0
Offset:13
TimeStamp:1552398118883
[kafka-producer-network-thread | producer-1] INFO com.daeuky.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata.
Topic:second_topic
Partition:0
Offset:14
TimeStamp:1552398118883
[kafka-producer-network-thread | producer-1] INFO com.daeuky.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata.
Topic:second_topic
Partition:0
Offset:15
TimeStamp:1552398118883
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

특정 문자열이 커밋되는 것을 막아보자 (번역) InnoDB Locking

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×