$ tar -xzf kafka_2.13-2.8.0.tgz
$ cd kafka_2.13-2.8.0
# Run Zookeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties
# Run Kafka
$ bin/kafka-server-start.sh config/server.properties
# Create Topic
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
# Show List Topic
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
# Describe Topic
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
# Write Message
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
# Read Message
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
# Remove Topic
$ bin/kafka-topics.sh --delete --topic quickstart-events --bootstrap-server localhost:9092
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> kafkaTemplate){ // <key value> Type Param
return args -> {
kafkaTemplate.send("quickstart-events", "hello-world"); //Produce data
};
}
@Component
public class QuickStartConsumer {
@KafkaListener(id="study-id", topics = "quickstart-events") // 해당 메서드가 Consumer로 동작 topic에 맞는 메세지를 fetch해온다.
public void listen(String message){
System.out.println("=========");
System.out.println(message);
System.out.println("=========");
}
}
Producing할 때 value를 String 값으로 선언하였으므로, Consume과정에서 인자를 String으로 받는다.