본문 바로가기
MessageQueue

KAFKA

by 공부 안하고 싶은 사람 2021. 3. 20.
반응형

  • 배경 : 실시간 트랜잭션처리와 비동기 처리에서 통합 전송영역의 부재로 인한 시스템 복잡도 증가, 데이터 통합 분석 어려움 해결하고자
  • 비동기로 뛰어난 응답속도
  • 서로 다른 이기종/네트워크에 있는 서비스를 마치 함수 호출하듯 사용가능(RPC/ RPI 방식)
  • 비교군 : rabbitMQ, ZeroMQ, ActiveMQ
  • 장점
    • 실시간 로그 처리에 특화
    • 데이터 중앙화로 인한 분석 가능
    • 데이터 증가함에 따라 스케일 아웃 가능
    • 디스크, 파일 시스템에 메시지를 저장(영속성 보장, sequential하게 처리시 빠름, 유실위험 적고 에러복구 용이)
    • 프로토콜이 간단하므로 오버헤드가 적음
  • 특징
    • 프로듀서가 컨슈머에게 메시지를 직접 전달하지 않고, 중간의 메시싱 시스템에 전달
    • 컨슈머가 불능 상태가 되더라도 프로듀서는 메시지를 메시징 시스템에 전달할 수 있으며, 메시지는 유실되지 않는다. 그리고 컨슈머가 회복되면 다시 메시지를 가져간다.
    • 하나의 토픽에 여러 프로듀서/컨슈머 접근가능
    • 각각의 개체가 N:N 통신을 하는것이 아니기 때문에 확장이 용이
    • 하나의 카프카 클러스터는 3대의 브로커로 시작해 수집 대의 브로커로 확장 가능하다.(무중단)
  • 프로듀서
    • key 값을 통해 특정 파티션에 저장할 수 있다.
  • 컨슈머
    • offset의 위치를 기억해, 마지막으로 읽었던 위치를 추적할 수 있다(fail-over에 대한 신뢰)
    • 컨슈머들은 그룹으로 묶을 수 있고, Partition은 그룹과 1 : N 매칭해야한다.
      (Partition 수 >= 컨슈머 수 로 관리)
    • 같은 컨슈머 그룹은 offset의 위치 정보를 공유하여 이중화 가능하다.
  • Topic & Partition
    • 메시지는 Topic으로 분류되고, Topic은 여러개의 Partition으로 나눠 진다.
      -> 쓰기 처리를 병렬로 가능하게 한다. 단,Round-robin 방식이라 순서를 보장하지 못하며 한 번 늘린 파티션은 절대 줄일 수 없다.
    • Partition내 한 칸은 로그라고 불리며, 순차적으로 메시지가 쌓인다
    • offset은 메세지의 상대위치
  • Broker & Zookeeper
    • Broker는 카프카 서버를 의미, 동일한 노드 내에 여러개의 Broker 띄울 수 있다.
    • Zookeeper는 이러한 분산 메세지 큐(클러스터)의 정보를 관리해주는 역할
      (broker_id, offset, leader, 설정 관리, 홀수로 구성)
      상태 정보를 지노드(znode)에 key-value 형태로 저장, 애플리케이션에서 서로 주고 받는다.
    • 지노드는 자식노드를 가진 계층형 구조로, 데이터를 메모리에 저장하여 처리량이 크고 빠르다.
  • Replication
    • Broker 3대에서 하나의 서버만 leader가 되고 나머지 둘은 follower가 된다.
    • 프로듀서와 컨슈머가 동작하는건 leader가 전적으로 역할 담당
    • 나머지 follower들은 leader와 싱크를 항상 맞춘다. leader가 죽을 경우, 나머지 follower중 하나가 leader로 선출
    • ack = 1이면, 프로듀서가 메세지를 leader한테 보낸다. leader에 쓰여지고 나머지 follower들이 복사되면 ack를 프로듀서에게 전달.
      (메세지의 유실이 없지만, 성능상 defailt로 하여 leader에 쓰여지면 바로 ack를 프로듀서에 전달)
  • Rebalace
    • 파티션X를 맡고있던 컨슈머가 죽어, 그룹 내에 다른 컨슈머가 처리 해야하는 상황
    • 새로운 파티션의 추가/변경됝 상황
    • Rebalace하는 동안 모든 컨슈머의 읽기가 중단되므로, 일시적 서비스 중단됨

설정

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>
  #kafka 설정
  kafka:
    bootstrap-servers: "localhost:9092"
    producer:
      retries: 0
      batch-size: 20000
      compression-type: gzip
      # Custom setting - batch 요청을 위한 버퍼링 시간. batch-size에 다다르면 즉시 메시지를 전송한다.
      linger-ms: 50
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: channel-mt # 식별가능한 변수명
      enable-auto-commit: false
      max-poll-records: 500
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      concurrency: 5
      ack-mode: manual_immediate
      poll-timeout: 3000
      # Custom setting - batch listener -> List<T> 로 컨슘가능
      batch: false
      # Custom setting - auto startup listener
      auto-startup: false
    # Custom setting - retries
    retry:
      max-attempts: 10
      initial-interval: 1000
      multiplier: 2
      max-interval: 10000
      
message.topic:
  request: infobip-request
  response: infobip-response
  result: infobip-result

.yml -> @Configuration

@Configuration
public class KafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.producer.retries}")
    private int producerRetries = 0;
    @Value("${spring.kafka.producer.batch-size}")
    private int producerBatchSize = 20000;
    @Value("${spring.kafka.producer.compression-type}")
    private String producerCompressionType = "gzip";
    @Value("${spring.kafka.producer.linger-ms}")
    private long producerLingerMs = 50;
    @Value("${spring.kafka.producer.buffer-memory}")
    private long producerBufferMemory = 335544320;
    @Value("${spring.kafka.producer.key-serializer}")
    private String producerKeySerializer = "org.apache.kafka.common.serialization.StringSerializer";
    @Value("${spring.kafka.producer.value-serializer}")
    private String producerValueSerializer = "org.apache.kafka.common.serialization.StringSerializer";
    @Value("${spring.kafka.consumer.group-id}")
    private String consumerGroupId;
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean consumerEnableAutoCommit = false;
    @Value("${spring.kafka.consumer.max-poll-records}")
    private int consumerMaxPollRecords = 500;
    @Value("${spring.kafka.consumer.key-deserializer}")
    private String consumerKeyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
    @Value("${spring.kafka.consumer.value-deserializer}")
    private String consumerValueDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
    @Value("${spring.kafka.listener.concurrency}")
    private int listenerConcurrency = 5;
    @Value("${spring.kafka.listener.ack-mode}")
    private String listenerAckMode = "MANUAL_IMMEDIATE";
    @Value("${spring.kafka.listener.poll-timeout}")
    private int listenerPollTimeout = 3000;
    @Value("${spring.kafka.listener.batch}")
    private boolean listenerBatch;
    @Value("${spring.kafka.listener.auto-startup}")
    private boolean listenerAutoStartup;
    @Value("${spring.kafka.retry.max-attempts}")
    private int retryMaxAttempts;
    @Value("${spring.kafka.retry.initial-interval}")
    private int retryInitialInterval;
    @Value("${spring.kafka.retry.multiplier}")
    private int retryMultiplier;
    @Value("${spring.kafka.retry.max-interval}")
    private int retryMaxInterval;
}

토픽 생성 config

@Configuration
public class TopicConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Value("${message.topic.request}")
    private String requestTopicName;

    @Value("${message.topic.response}")
    private String responseTopicName;

    @Value("${message.topic.result}")
    private String resultTopicName;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> config = new HashMap<>();
        config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(config);
    }

    @Bean
    public NewTopic requestTopic() { return new NewTopic(requestTopicName, 1, (short)1); }

    @Bean
    public NewTopic responseTopic() { return new NewTopic(responseTopicName, 1, (short)1); }

    @Bean
    public NewTopic resultTopic() { return new NewTopic(resultTopicName, 1, (short)1); }
}

프로듀서 config

@Configuration
public class KafKaProducerConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerFactoryConfig());
    }

    private Map<String, Object> producerFactoryConfig() {
        Map<String, Object> configProps = new HashMap<>();
        //yml추가한 팩토리 설정 받아서 추가
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.RETRIES_CONFIG, producerRetries); //재시도
        configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, producerBatchSize); //배치사이즈
        configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, producerCompressionType); // 압축타입
        configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferMemory); // 버퍼 메모리
        configProps.put(ProducerConfig.LINGER_MS_CONFIG, producerLingerMs); // max batch size
        return configProps;
    }

    @Bean // 이후 KafkaTemplate<String, String>을 주입받아 produce가능 
    public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); }
}

컨슈머 config

@EnableKafka
@Configuration
public class KafKaConsumerConfig {
    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${kafka.consumer.group-id}")
    private String groupId;
    
    @Autowired
    private CommonKafkaErrorHandler commonErrorHandler;
    @Autowired
    private CommonKafkaBatchErrorHandler commonBatchErrorHandler;

    @Bean
    public ConsumerFactory<String, String> pushEntityConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerFactoryConfig());
    }

    private Map<String, Object> consumerFactoryConfig() {
        Map<String, Object> props = new HashMap<>();
        //yml추가한 팩토리 설정 받아서 추가
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerEnableAutoCommit); // auto-commit
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumerMaxPollRecords); // max-poll-size
        return props;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> pushEntityKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(pushEntityConsumerFactory());
        //팩토리 설정 추가 (batch, autoStartup, errorHandler 등 카프카 설정 추가)
        factory.setBatchListener(listenerBatch);
        // Container Error Handlers 사용시 false 설정
        factory.getContainerProperties().setAckOnError(false);
        if (listenerBatch) {
            // batch listener 를 사용할 경우, kafka에서 retry 를 지원하지 않음
            // batch error handler
            factory.setBatchErrorHandler(commonBatchErrorHandler);
        } else {
            factory.setRetryTemplate(retryTemplate());
            // error handler
            factory.setErrorHandler(commonErrorHandler);
        }
        factory.setAutoStartup(listenerAutoStartup);
        return factory;
    }
    
    //Retry 설정
    protected RetryPolicy retryPolicy() {
        SimpleRetryPolicy policy = new SimpleRetryPolicy();
        policy.setMaxAttempts(retryMaxAttempts);
        return policy;
    }

    protected BackOffPolicy backOffPolicy() {
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
        policy.setInitialInterval(retryInitialInterval);
        policy.setMultiplier(retryMultiplier);
        policy.setMaxInterval(retryMaxInterval);
        return policy;
    }

    protected RetryTemplate retryTemplate() {
        RetryTemplate template = new RetryTemplate();
        template.setRetryPolicy(retryPolicy());
        template.setBackOffPolicy(backOffPolicy());

        return template;
    }
}

sender

@Component
public class KafkaMessageSender {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    protected ObjectMapper mapper;
    
    public void send(String msg, String topicName) {
        final String data;

        try {
            data = mapper.writeValueAsString(msg);
        } catch (JsonProcessingException e) {
            throw e;
        }
        
        Message<String> message = MessageBuilder
                .withPayload(data)
                .setHeader(KafkaHeaders.TOPIC, topicName)
                .build();

        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(SendResult<String, String> stringObjectSendResult) {
                System.out.println("Sent message=[" + stringObjectSendResult.getProducerRecord().value().toString() +
                        "] with offset=[" + stringObjectSendResult.getRecordMetadata().offset() + "]");
            }

            @Override
            public void onFailure(Throwable ex) {
                System.out.println("Unable to send message=[] due to : " + ex.getMessage());
            }
        });
    }
}

listener

@Service
public class KafkaMessageListener {
    @Autowired
    private ObjectMapper objectMapper; // json -> object 변환

    @KafkaListener(topics = "${message.topic.request}"
            , groupId = "${kafka.consumer.group-id}"
            , containerFactory = "pushEntityKafkaListenerContainerFactory")
    public void requestListen(@Payload String jsonString,
                                  @Headers MessageHeaders messageHeaders,
                              Acknowledgment ack) throws JsonProcessingException {
        System.out.println(
                "Received Message: " + jsonString +
                        " headers: " + messageHeaders);
        ack.acknowledge();
    }

    @KafkaListener(topics = "${message.topic.response}"
            , groupId = "${kafka.consumer.group-id}"
            , containerFactory = "pushEntityKafkaListenerContainerFactory")
    public void responseListen(
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            ConsumerRecord<String, String> record,
            Acknowledgment ack) {
        String value = record.value();
        int partition = record.partition();
        logger.debug("received message='{}' with topic='{}', partition-offset='{}'", value, topic, partition);
        ack.acknowledge();
    }
}
728x90
반응형

댓글