반응형
- 배경 : 실시간 트랜잭션처리와 비동기 처리에서 통합 전송영역의 부재로 인한 시스템 복잡도 증가, 데이터 통합 분석 어려움 해결하고자
- 비동기로 뛰어난 응답속도
- 서로 다른 이기종/네트워크에 있는 서비스를 마치 함수 호출하듯 사용가능(RPC/ RPI 방식)
- 비교군 : rabbitMQ, ZeroMQ, ActiveMQ
- 장점
- 실시간 로그 처리에 특화
- 데이터 중앙화로 인한 분석 가능
- 데이터 증가함에 따라 스케일 아웃 가능
- 디스크, 파일 시스템에 메시지를 저장(영속성 보장, sequential하게 처리시 빠름, 유실위험 적고 에러복구 용이)
- 프로토콜이 간단하므로 오버헤드가 적음
- 특징
- 프로듀서가 컨슈머에게 메시지를 직접 전달하지 않고, 중간의 메시싱 시스템에 전달
- 컨슈머가 불능 상태가 되더라도 프로듀서는 메시지를 메시징 시스템에 전달할 수 있으며, 메시지는 유실되지 않는다. 그리고 컨슈머가 회복되면 다시 메시지를 가져간다.
- 하나의 토픽에 여러 프로듀서/컨슈머 접근가능
- 각각의 개체가 N:N 통신을 하는것이 아니기 때문에 확장이 용이
- 하나의 카프카 클러스터는 3대의 브로커로 시작해 수집 대의 브로커로 확장 가능하다.(무중단)
- 프로듀서
- key 값을 통해 특정 파티션에 저장할 수 있다.
- 컨슈머
- offset의 위치를 기억해, 마지막으로 읽었던 위치를 추적할 수 있다(fail-over에 대한 신뢰)
- 컨슈머들은 그룹으로 묶을 수 있고, Partition은 그룹과 1 : N 매칭해야한다.
(Partition 수 >= 컨슈머 수 로 관리) - 같은 컨슈머 그룹은 offset의 위치 정보를 공유하여 이중화 가능하다.
- Topic & Partition
- 메시지는 Topic으로 분류되고, Topic은 여러개의 Partition으로 나눠 진다.
-> 쓰기 처리를 병렬로 가능하게 한다. 단,Round-robin 방식이라 순서를 보장하지 못하며 한 번 늘린 파티션은 절대 줄일 수 없다. - Partition내 한 칸은 로그라고 불리며, 순차적으로 메시지가 쌓인다
- offset은 메세지의 상대위치
- 메시지는 Topic으로 분류되고, Topic은 여러개의 Partition으로 나눠 진다.
- Broker & Zookeeper
- Broker는 카프카 서버를 의미, 동일한 노드 내에 여러개의 Broker 띄울 수 있다.
- Zookeeper는 이러한 분산 메세지 큐(클러스터)의 정보를 관리해주는 역할
(broker_id, offset, leader, 설정 관리, 홀수로 구성)
상태 정보를 지노드(znode)에 key-value 형태로 저장, 애플리케이션에서 서로 주고 받는다. - 지노드는 자식노드를 가진 계층형 구조로, 데이터를 메모리에 저장하여 처리량이 크고 빠르다.
- Replication
- Broker 3대에서 하나의 서버만 leader가 되고 나머지 둘은 follower가 된다.
- 프로듀서와 컨슈머가 동작하는건 leader가 전적으로 역할 담당
- 나머지 follower들은 leader와 싱크를 항상 맞춘다. leader가 죽을 경우, 나머지 follower중 하나가 leader로 선출
- ack = 1이면, 프로듀서가 메세지를 leader한테 보낸다. leader에 쓰여지고 나머지 follower들이 복사되면 ack를 프로듀서에게 전달.
(메세지의 유실이 없지만, 성능상 defailt로 하여 leader에 쓰여지면 바로 ack를 프로듀서에 전달)
- Rebalace
- 파티션X를 맡고있던 컨슈머가 죽어, 그룹 내에 다른 컨슈머가 처리 해야하는 상황
- 새로운 파티션의 추가/변경됝 상황
- Rebalace하는 동안 모든 컨슈머의 읽기가 중단되므로, 일시적 서비스 중단됨
설정
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
#kafka 설정
kafka:
bootstrap-servers: "localhost:9092"
producer:
retries: 0
batch-size: 20000
compression-type: gzip
# Custom setting - batch 요청을 위한 버퍼링 시간. batch-size에 다다르면 즉시 메시지를 전송한다.
linger-ms: 50
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: channel-mt # 식별가능한 변수명
enable-auto-commit: false
max-poll-records: 500
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
concurrency: 5
ack-mode: manual_immediate
poll-timeout: 3000
# Custom setting - batch listener -> List<T> 로 컨슘가능
batch: false
# Custom setting - auto startup listener
auto-startup: false
# Custom setting - retries
retry:
max-attempts: 10
initial-interval: 1000
multiplier: 2
max-interval: 10000
message.topic:
request: infobip-request
response: infobip-response
result: infobip-result
.yml -> @Configuration
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.producer.retries}")
private int producerRetries = 0;
@Value("${spring.kafka.producer.batch-size}")
private int producerBatchSize = 20000;
@Value("${spring.kafka.producer.compression-type}")
private String producerCompressionType = "gzip";
@Value("${spring.kafka.producer.linger-ms}")
private long producerLingerMs = 50;
@Value("${spring.kafka.producer.buffer-memory}")
private long producerBufferMemory = 335544320;
@Value("${spring.kafka.producer.key-serializer}")
private String producerKeySerializer = "org.apache.kafka.common.serialization.StringSerializer";
@Value("${spring.kafka.producer.value-serializer}")
private String producerValueSerializer = "org.apache.kafka.common.serialization.StringSerializer";
@Value("${spring.kafka.consumer.group-id}")
private String consumerGroupId;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean consumerEnableAutoCommit = false;
@Value("${spring.kafka.consumer.max-poll-records}")
private int consumerMaxPollRecords = 500;
@Value("${spring.kafka.consumer.key-deserializer}")
private String consumerKeyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
@Value("${spring.kafka.consumer.value-deserializer}")
private String consumerValueDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
@Value("${spring.kafka.listener.concurrency}")
private int listenerConcurrency = 5;
@Value("${spring.kafka.listener.ack-mode}")
private String listenerAckMode = "MANUAL_IMMEDIATE";
@Value("${spring.kafka.listener.poll-timeout}")
private int listenerPollTimeout = 3000;
@Value("${spring.kafka.listener.batch}")
private boolean listenerBatch;
@Value("${spring.kafka.listener.auto-startup}")
private boolean listenerAutoStartup;
@Value("${spring.kafka.retry.max-attempts}")
private int retryMaxAttempts;
@Value("${spring.kafka.retry.initial-interval}")
private int retryInitialInterval;
@Value("${spring.kafka.retry.multiplier}")
private int retryMultiplier;
@Value("${spring.kafka.retry.max-interval}")
private int retryMaxInterval;
}
토픽 생성 config
@Configuration
public class TopicConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value("${message.topic.request}")
private String requestTopicName;
@Value("${message.topic.response}")
private String responseTopicName;
@Value("${message.topic.result}")
private String resultTopicName;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(config);
}
@Bean
public NewTopic requestTopic() { return new NewTopic(requestTopicName, 1, (short)1); }
@Bean
public NewTopic responseTopic() { return new NewTopic(responseTopicName, 1, (short)1); }
@Bean
public NewTopic resultTopic() { return new NewTopic(resultTopicName, 1, (short)1); }
}
프로듀서 config
@Configuration
public class KafKaProducerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerFactoryConfig());
}
private Map<String, Object> producerFactoryConfig() {
Map<String, Object> configProps = new HashMap<>();
//yml추가한 팩토리 설정 받아서 추가
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.RETRIES_CONFIG, producerRetries); //재시도
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, producerBatchSize); //배치사이즈
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, producerCompressionType); // 압축타입
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferMemory); // 버퍼 메모리
configProps.put(ProducerConfig.LINGER_MS_CONFIG, producerLingerMs); // max batch size
return configProps;
}
@Bean // 이후 KafkaTemplate<String, String>을 주입받아 produce가능
public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); }
}
컨슈머 config
@EnableKafka
@Configuration
public class KafKaConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Autowired
private CommonKafkaErrorHandler commonErrorHandler;
@Autowired
private CommonKafkaBatchErrorHandler commonBatchErrorHandler;
@Bean
public ConsumerFactory<String, String> pushEntityConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerFactoryConfig());
}
private Map<String, Object> consumerFactoryConfig() {
Map<String, Object> props = new HashMap<>();
//yml추가한 팩토리 설정 받아서 추가
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerEnableAutoCommit); // auto-commit
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumerMaxPollRecords); // max-poll-size
return props;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> pushEntityKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(pushEntityConsumerFactory());
//팩토리 설정 추가 (batch, autoStartup, errorHandler 등 카프카 설정 추가)
factory.setBatchListener(listenerBatch);
// Container Error Handlers 사용시 false 설정
factory.getContainerProperties().setAckOnError(false);
if (listenerBatch) {
// batch listener 를 사용할 경우, kafka에서 retry 를 지원하지 않음
// batch error handler
factory.setBatchErrorHandler(commonBatchErrorHandler);
} else {
factory.setRetryTemplate(retryTemplate());
// error handler
factory.setErrorHandler(commonErrorHandler);
}
factory.setAutoStartup(listenerAutoStartup);
return factory;
}
//Retry 설정
protected RetryPolicy retryPolicy() {
SimpleRetryPolicy policy = new SimpleRetryPolicy();
policy.setMaxAttempts(retryMaxAttempts);
return policy;
}
protected BackOffPolicy backOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(retryInitialInterval);
policy.setMultiplier(retryMultiplier);
policy.setMaxInterval(retryMaxInterval);
return policy;
}
protected RetryTemplate retryTemplate() {
RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(retryPolicy());
template.setBackOffPolicy(backOffPolicy());
return template;
}
}
sender
@Component
public class KafkaMessageSender {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
protected ObjectMapper mapper;
public void send(String msg, String topicName) {
final String data;
try {
data = mapper.writeValueAsString(msg);
} catch (JsonProcessingException e) {
throw e;
}
Message<String> message = MessageBuilder
.withPayload(data)
.setHeader(KafkaHeaders.TOPIC, topicName)
.build();
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> stringObjectSendResult) {
System.out.println("Sent message=[" + stringObjectSendResult.getProducerRecord().value().toString() +
"] with offset=[" + stringObjectSendResult.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
System.out.println("Unable to send message=[] due to : " + ex.getMessage());
}
});
}
}
listener
@Service
public class KafkaMessageListener {
@Autowired
private ObjectMapper objectMapper; // json -> object 변환
@KafkaListener(topics = "${message.topic.request}"
, groupId = "${kafka.consumer.group-id}"
, containerFactory = "pushEntityKafkaListenerContainerFactory")
public void requestListen(@Payload String jsonString,
@Headers MessageHeaders messageHeaders,
Acknowledgment ack) throws JsonProcessingException {
System.out.println(
"Received Message: " + jsonString +
" headers: " + messageHeaders);
ack.acknowledge();
}
@KafkaListener(topics = "${message.topic.response}"
, groupId = "${kafka.consumer.group-id}"
, containerFactory = "pushEntityKafkaListenerContainerFactory")
public void responseListen(
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
ConsumerRecord<String, String> record,
Acknowledgment ack) {
String value = record.value();
int partition = record.partition();
logger.debug("received message='{}' with topic='{}', partition-offset='{}'", value, topic, partition);
ack.acknowledge();
}
}
728x90
반응형
댓글