@Configuration
public class KafkaTopicConfiguration {
@Bean
public AdminClient adminClient(KafkaAdmin kafkaAdmin){
return AdminClient.create(kafkaAdmin.getConfigurationProperties());
}
@Bean
public NewTopic clip2(){ // 토픽을 생성
return TopicBuilder.name("clip2").build();
}
@Bean
public KafkaAdmin.NewTopics clip2s(){
return new KafkaAdmin.NewTopics(
TopicBuilder.name("clip2-part1").build(),
TopicBuilder.name("clip2-part2")
.partitions(3)
.replicas(1)
.config(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(1000*60*60))
.build()
);
}
}
위처럼 스프링에서는 Configuration을 하면 프로젝트가 실행될 때 Topic을 생성하게 된다.
이때 TopicBuilder를 활용하여 토픽 이름, 파티션, 레플리카, 리텐션 등을 지정할 수 있으며 위 코드의 경우 리텐션을 1시간으로 설정하였다. 다만 이미 생성된 토픽이 있고 프로젝트를 재 실행할 경우 파티션의 개수만 수정되고 나머지는 그대로 유지된다.
desciprtionTopics를 사용하면 보다 세부적인 토픽정보를 볼 수 있다.
@Bean
public ApplicationRunner runner(AdminClient adminClient){ // <key value> Type Param
return args -> {
Map<String, TopicListing> stringTopicListingMap = adminClient.listTopics().namesToListings().get();
for(String topicName : stringTopicListingMap.keySet()){
System.out.println(stringTopicListingMap.get(topicName));
Map<String, TopicDescription> descriptionMap = adminClient.describeTopics(Collections.singleton(topicName)).all().get();
System.out.println(descriptionMap);
if(!stringTopicListingMap.get(topicName).isInternal()){ // 내부 토픽은 절대 삭제해선 안된다.
adminClient.deleteTopics(Collections.singleton(topicName));
}
}
};
}