본문 바로가기
개발서적

카프카, 데이터 플랫폼의 최강자 : 실시간 비동기 스트리밍 솔루션 Kafka의 기본부터 확장 응용까지 (2)

by 공부 안하고 싶은 사람 2022. 4. 29.
반응형

2부 기본 개념과 운영 가이드

카프카의 디자인(구성), 토픽, 파티션, 오프셋, 리플리케이션, 프로듀서와 컨슈머


3장 카프카 디자인

3.1 카프카 디자인 특징

분산 시스템

장점

  • 높은 성능
  • 무중단
  • 확장 용이

페이지 캐시

처리량 높이기 위해 OS의 패이징 캐시를(디스크 데이터를 메모리에 올려놓) 사용

배치 전송 처리

시간이 많이 소요 되는 I/O작업을 묶어서 처리하여 빠른 전송 가능

 

 

3.2 카프카 데이터 모델

토픽

메시지를 구분하기 위한 장소

. _ - 와 서비스명을 조합하여 이름을 분류/구분

파티션

파티션을 나누면 프로듀서에서 토픽으로 메시지를 보내는 방식을 병렬처리 -> 빠른 전송 가능

단점

  • 파티션은 브로커의 디렉토리와 매핑. 카프카에서는 모든 디렉토리의 파일들에 대해 파일 핸들을 열게 되므로 리소스 낭비
  • 장애 복구(리플리케이션) 시간 증가
    • 브로커 장애 발생시 각 파티션별로 리더를 선출하는 과정에서 많은 시간이 소요 될 수 있다
    • 장애발생한 브로커가 컨트롤러일 경우엔 컨트롤러 이관 전까지 리더 선출할 수 없으므로 더 악화 될 수 있다.

주의

  • 프로듀서와 컨슈머의 데이터 처리량, 서버 수를 고려하여 적절한 파티션 수를 나눠야 한다
  • 파티션은 늘릴 수 있지만 줄일 수는 없다(토픽 삭제해야함)

오프셋과 메시지 순서

오프셋 : 각 파티션마다 메시지가 저장되는 위치 (파티션 내에 유일,순차증가)

오프셋 순서대로 컨슘이 가능

 

 

3.3 카프카의 고가용성과 리플리케이션

장애 대응하기 위해 토픽의 각 파티션을 복제

리플리케이션 팩터 설정

vi /kafka경로/config/server.properties

default.replication.factor = 2 수정

기본 값에서 변경된 (숫자-1)만큼, 다른 브로커에 복제된 파티션을 저장

리더 : 원본 파티션

팔로워 : 복제된 파티션

리더에서만 프로듀스/컨슘 발생, 팔로워는 리더를 읽고 복제(메시지/오프셋 등)

리더 장애시, 팔로워가 새로운 리더가 되어 메시징 처리

단점

  • 많은 팔로워가 있다면 그만큼 리소스(스토리지) 낭비
  • 비활성화된 리플리케이션의 상태를 체크하는 과정에서 리소스 사용 증가

주의

  • 데이터의 중요도에 따른 리플리케이션 펙터 수 설정

ISR : 리더 + 팔로워 그룹

팔로워의 데이터 정합성이 맞지 않음을 방지

리더와 팔로워의 데이터 동기화 작업을 유지

리더는 팔로워가 주기적으로 데이터를 pull하지 않는것을 감지하면 ISR 그룹에서 추방(장애시 리더가 될 수 없음)

 

 

3.4 모든 브로커가 다운된다면

마지막 리더가 살아나길 기다린다

데이터 유실 X

리더 복구가 늦어지면 서비스 장애시간 증가

0.11.0.0 부턴 이 방법

ISR에서 추방되었지만 먼저 살아나면 자동으로 리더가 된다

마지막 리더와의 데이터 정합도 차이만큼 데이터 유실 O

빠른 시간 내에 서비스 정상화

설정 변경

  • vi /kafka경로/config/server.properties
  • unclean.leader.election.enable = true 변경

 

 

3.5 카프카에서 사용하는 주키퍼 지노드 역할

주키퍼 CLI접속하여 지노드 확인

/zookeeper경로/bin/zkCli.sh 이후 파일 확인(ls)

  • /peter-kafka/controller
    • 컨트롤러(리더선정) 정보 확인
  • /peter-kafka/brokers
    • /peter-kafka/brokers/ids
      • broker.id 확인, 주키퍼 임시노드를 사용해 등록, 종료시 지노드 사라짐
    • /peter-kafka/brokers/topic
      • 토픽의 파티션수, ISR 구성정보, 리더 정보 확인
    • /peter-kafka/consumers
      • 컨슈머가 각각의 파티션들에 대하 어디까지 읽었는지 기록
      • 오프셋 저장장소는 카프카 토픽
    • /peter-kafka/config
      • 토픽의 상세 설정 정보 확인

4장 카프카 프로듀서

4.1 콘솔 프로듀서로 메시지 보내기

이전 내용 확인 (토픽생성, 콘솔 프로듀서, 콘손 컨슘)

 

 

4.2 자바와 파이썬을 이용한 프로듀서

public class KafkaBookProducer1 {
  public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "peter-kafka001:9092,peter-kafka002:9092,peter-kafka003:9092"); // 브로커 리스트
    props.put("key.serialize","org.apache.kafka.common.serializtion.StringSerializer"); // 직렬화
    props.put("value.serializer","org.apache.kafka.common.serializtion.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);
    producer.send(new ProducerRecord<String, String>("peter-topic", "Apache Kafka about..."));
    producer.close();
  }
}

동기 전송

Producer<String, String> producer = new KafkaProducer<>(props);
try {
    RecordMetadata metadata = producer.send(new ProducerRecord<String, String>("peter-topic", "Apache Kafka about..."));  
  System.out.printf(metadata);
} catch (Exception e) {
  e.printStackTrace();
} finally { producer.close(); }

비동기 전송

class PeterCallback implements Callback {
  public void onCompletion(RecordMetadata metadata, Exception exception) {
    if (metadata != null) {
      //metadata 정보 출력
    } else {
      exception.printStackTrace(); // 카프카가 오류 리턴시 예외처리 필요
    }
  }
}
Producer<String, String> producer = new KafkaProducer<>(props);
try {
    producer.send(new ProducerRecord<String, String>("peter-topic", "Apache Kafka about..."), new PeterCallback());  
  System.out.printf(metadata);
} catch (Exception e) {
  e.printStackTrace();
} finally { producer.close(); }

 

 

4.4 프로듀서 주요 옵션

bootstrap.servers

  • 카프카 클러스터 설정 (호스트이름:포트)
  • 마스터 개념이 없기 때문에 모든 클러스터 설정을 추가

acks

  • 요청 완료전 ack의 수
  • 0인 경우 ack 기다리지 않음
  • 1인 경우 리더만 확인
  • arcs=all 인 경우 ISR의 모든 ack 확인

buffer.memory

  • 프로듀서가 데이터 보내기 위해 잠시 대기(배치 혹은 딜레이전송)할 수 있는 전체 메모리 바이트

compression.type

  • 데이터 압축해서 보내는 확장자

retries

  • 재발송 횟수

batch.size

  • 묶음 바이트 사이즈

linger.ms

  • 배치형태 메시지 보내기전 최대 대기 시간

max.request.size

  • 전송할 수 있는 최대 바이트 사이즈 (default 1MB)

 

 

4.5 메시지 전송 방법

메시지 손실 높지만, 빠른 전송

  • ack=0으로 응답 확인 X

메시지 손실 적고, 적당한 전송

  • acks=1으로 리더만 응답 확인
  • 리플리케이션 중 손실이 일어나면 데이터 유실

메시지 손실 없고, 느린 전송

  • acks=all
  • server.properties 브로커 설정 (ack를 보내기전 최소 리플리케이션 팩터를 지정하는 옵션)
    • min.insync.replicas=1
      • acks=1과 동일
    • min.insync.replicas=2
      • 하나의 리플리케이션을 보장
      • acks=all , min.insync.replicas=2 , 리플리케이션 팩터 3 권장
    • min.insync.replicas=3
      • 팔로워 장애 발생시 ISR에 리플리케이션 팩터수가 부족하여 ack를 보낼 수 없어 , 전체 장애와 같은 현상 발생

5장 카프카 컨슈머

파티션 리더에게서 오프셋(지정가능)부터의 메시지를 수신

5.1 컨슈머 주요 옵션

(오프셋을 토픽에 저장하는 뉴컨슈머 기준으로)

bootstrap.servers

  • 카프카 클러스터 설정 (호스트이름:포트)
  • 마스터 개념이 없기 때문에 모든 클러스터 설정을 추가

fetch.min.bytes

  • 한 번에 가져올 수 있는 최소 데이터 사이즈

fetch.max.bytes

  • 한 번에 가져올 수 있는 최대 데이터 사이즈

fetch.max.wait.ms

  • fetch.min.bytes보다 데이터가 작은 경우 요청에 응답을 기다리는 최대 시간

group.id

  • 컨슈머가 속한 컨슈머 그룹을 식별

enable.auto.commit

  • 백그라운드로 주기적으로 오프셋을 커밋

auto.offset.reset

  • 오프셋이 없거나 존재하지 않는 경우
  • earliest : 가장 초기 오프셋 값
  • latest : 가장 마지막 오프셋 값
  • none : 이전 오프셋 찾지못하면 에러 발생

request.timeout.ms

  • 요청에 대해 응답을 기다리는 최대 시간

heartbeat.interval.ms

  • 그룹 코디네이터에게 얼마나 자주 하트비트를 보낼 것인지
  • 일반적으로 session.timeout.ms의 1/3으로 설정

session.timeout.ms

  • 컨슈머와 브로커 사이의 세션 타임 아웃 시간
  • heartbeat.interval.ms 보내는 시간이 정해진 시간이 지나면 리밸런스를 시도

max.poll.records

  • 단일 호출에 대해 최대 레코드 수를 조정

max.poll.interval.ms

  • 컨슈머가 메시지를 가져가지 않고 하트비트만 보낼 수 있다
  • 이 경우 컨슈머가 무한정 해당 파티션을 점유할 수 없도록 주기적으로 poll을 호출하지 않으면 장애라고 판단하여 컨슈머 그룹에서 제외하여 다른 컨슈머가 파티션을 점유할 수 있도록 한다

auto.commit.interval.ms

  • 주기적으로 오프셋을 커밋하는 시간

더 많은 설정 : https://kafka.apache.org/documentation/#consumerconfigs

 

 

5.2 콘솔 컨슈머로 메시지 가져오기

이전 내용 확인 (토픽생성, 콘솔 프로듀서, 콘손 컨슘)

 

 

5.3 자바와 파이썬을 이용한 컨슈머

public class KafkaBookConsumer1 {
  public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "peter-kafka001:9092,peter-kafka002:9092,peter-kafka003:9092"); // 브로커 리스트
    props.put("group.id", "peter-consumer"); // 컨슈머 그룹 아이디
    props.put("enable.auto.commit", "true"); // 오프셋 리셋
    props.put("auto.offset.reset", "latest");
    props.put("key.deserialize","org.apache.kafka.common.serializtion.StringDeserializer"); // 역직렬화
    props.put("value.deserializer","org.apache.kafka.common.serializtion.StringDeserializer");

    KafkaConsumer<String, String> consumer = new kafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("peter-topic")); // 리스트 형태로 여러 토픽 구독
    try {
      while (true) { // 지속적인 poll
        ConsumerRecords<String, String> records = consumer.poll(100); // 타임아웃 주기 설정
        for (ConsumerRecord<String, String> record : records) {
          // record 출력
        }
      }
    } finally {
      consumer.close();
    }
  }
}

 

 

5.4 파티션과 메시지 순서

컨슈머는 프로듀서가 어떤 순서로 메시지를 보냈는지 모른다. 그저 파티션 마다의 오프셋 순서로 읽어온다

  • 프로듀서가 생성산 메시지 순서 - 파티션 내에선 순서 보장, 파티션 사이에선 순서 보장X
  • 순서가 중요하면 파티션 수 1 (분산처리 못하여 처리량이 떨어진다)

 

 

5.5 컨슈머 그룹

토픽의 파티션 된 데이터들을 1개의 컨슈머 그룹으로 설정된 컨슈머들이 나눠서 컨슘

리밸런스 : 파티션에 대한 컨슈머의 소유권이 이동하는것 (가용성과 확장성 확보)

리밸런스가 일어나는 중에는 일시적으로 컨슈머 그룹 전체가 메시지 가져올 수 없다

컨슈머 그룹내 컨슈머 수 <= 파티션 수 (1파티션 : 1컨슈머 연결하기 때문)

새로운 컨슈머 그룹 추가시 (컨슈머 그룹마다 오프셋을 별도로 관리하기 때문에) 해당 토픽의 정보를 새롭게 사용 가능 -> 하나의 데이터를 다양한 용오도 사용 가능하게 한다

 

 

5.6 커밋과 오프셋

컨슈머 그룹 , 카프카 둘다 오프셋 정보를 가지고 있음. 컨슈머 그룹이 가지고 있는 오프셋 정보를 카프카에 전달하는 행위 — commit

  1. offset 정보는 각 파티션에
    컨슈머 그룹 별로 가져간 메시지의 위치를 토픽에 기록(커밋)
  2. 컨슈머는 파티션의 오프셋부터 메시지 읽기 시작

리밸런스 후

  • 커밋된 오프셋 < 처리한 오프셋 : 중복처리
  • 커밋된 오프셋 > 처리한 오프셋 : 메시지 누락

LAG : 현재 토픽의 저장된 메시지와 컨슈머가 가져간 메시지의 차이(current-offset과 log-end-offset의 차이)

자동커밋

컨슈머를 다루는 사용자가 오프셋 관리를 직접하지 않는 방법

enable.auto.commit=true

poll 요청할 때마다 auto.commit.inerval.ms 설정값을 확인하여 commit 할 시간이라면 commit

commit 사이에 리밸런스 일어난다면, 중복 발생 가능

수동커밋

메시지 처리가 완료될 때까지 메시지를 가져온 것으로 간주되어서는 안될 경우에 사용

.commitSync()로 커밋 시점을 자유롭게 조정

커밋 시점 이전에 exception 발생한다면, 카프카는 중복이 없지만 내부 데이터들은 중복될 수 있다(롤백이 필요)

특정 파티션 할당

key-value 형태로 파티션에 저장되어 있고, 특정 파티션에 대한 메시지들만 가져와야하는 경우에 사용

컨슈머 프로세스가 가용성이 높은 구성인 경우(YARN, Mesos 등), 카프카가 컨슈머의 실패를 감지하고 재조정할 필요 없고 자동으로 컨슈머 스포레스가 다른 시스템에서 재시작 되는 경우에 사용

TopicPartition partition0 = new TopinPartition(topicName, 0);
TopicPartition partition1 = new TopinPartition(topicName, 1);
consumer.assign(Arrays.asList(partition0, partition1)); // 특정 파티션만을 지정하여 컨슘할 수 있다

컨슈머 인스턴스마다 컨슈머 그룹 아이디를 서로 다르게 설정해야 한다 (동일한 컨슈머 그룹 아이디 사용시 오프셋정보를 공유하여 문제가 생길 수 있다)

특정 오프셋부터 메시지 가져오기

consumer.seek(partition0, 2); // 컨슈머에 오프셋 설정

수동으로 어디부터 메시지를 읽어올지 지정할 수 있다


6장 카프카 운영 가이드

6.1 필수 카프카 명령어

/kafka경로/bin/ 에서 기본적으로 제공해주는 명령어 들이 있다

토픽생성

  • /kafka경로/bin/kafka-topics.sh --zookeeper peter-zk001:2181,peter-zk002:2181,peter-zk003:2181/peter-kafka
    --replication-factor 1 --partitions 1 --topic peter-topic --create

토픽 리스트 확인

  • /kafka경로/bin/kafka-topics.sh --zookeeper peter-zk001:2181,peter-zk002:2181,peter-zk003:2181/peter-kafka
    --list

토픽 상세보기

  • /kafka경로/bin/kafka-topics.sh --zookeeper peter-zk001:2181,peter-zk002:2181,peter-zk003:2181/peter-kafka
    --topic peter-topic --describe

토픽 설정 변경

  • 보관주기 변경(기본 7일 -> 1시간)
    • /kafka경로/bin/kafka-configs.sh --zookeeper peter-zk001:2181,peter-zk002:2181,peter-zk003:2181/peter-kafka
      --alter --entity-type topics --entity-name peter-topic --add-config retention.ms=3600000
  • 설정 삭제
    • /kafka경로/bin/kafka-configs.sh --zookeeper peter-zk001:2181,peter-zk002:2181,peter-zk003:2181/peter-kafka
      --alter --entity-type topics --entity-name peter-topic --delete-config retention.ms

토픽 파티션 수 변경

  • /kafka경로/bin/kafka-topics.sh --zookeeper peter-zk001:2181,peter-zk002:2181,peter-zk003:2181/peter-kafka
    --alter --topic peter-topic --partitions 2

토픽 리플리케이션 팩터 변경

// rf.json 파일 
{
  "version":1,
  "partitions":[
    {
      "topic":"peter-topic",
      "partition":0, // 파티션0은
      "replicas":[1,2] // 리더 브로커1, 리플리카 브로커2
    },
    {
      "topic":"peter-topic",
      "partition":1, // 파티션1은
      "replicas":[2,3] // 리더 브로커2, 리플리카 브로커 3
    }
  ]
} // 리더 브로커가 현재 토픽 파티션의 리더 정보와 일치하도록 설정해야 프로듀서/컨슈머에 영향을 주지 않는다
// 리플리케이션 팩터 3 으로 설정하길 원한다면 [1,2,3] 이런식으로 설정
  • /kafka경로/bin/kafka-reaasign-partitions.sh --zookeeper peter-zk001:2181,peter-zk002:2181,peter-zk003:2181/peter-kafka
    --reassignment-json-file /kafka경로/rf.json --execute

컨슈머 그룹 리스트 확인

  • /kafka경로/bin/kafka-consumer-groups.sh --bootstrap-server peter-kafka001:9092,peter-kafka002:9092,peter-kafka003:9092
    --list

컨슈머 상태와 오프셋 확인

  • /kafka경로/bin/kafka-consumer-groups.sh --bootstrap-server peter-kafka001:9092,peter-kafka002:9092,peter-kafka003:9092
    --group peter-consumer --describe

 

 

6.2 주키퍼 스케일 아웃

  • 주키퍼 2대 추가 설치
  • 각 주키퍼에 myid 설정 추가
    • echo "4" > /data/myid
    • echo "5" > /data/myid
  • zoo.cfg에 주키퍼 설정 (추가된 정보까지)
    • server.4=peter-zk004:2888:3888
    • server.5=peter-zk005:2888:3888
  • systemd 설정 추가
  • 운영중인 주키퍼 설정 변경
    • 주키퍼의 상태를 읽고 팔로워부터 변경 (리더를 가장 마지막에 변경)
    • zoo.cfg 파일 수정
    • systemd 재시작
  • 리더 주키퍼에서 팔로워 숫자로 결과 확인
    • echo mntr | nc localhost 2181 | grep zk_synced_followers

 

 

6.3 카프카 스케일 아웃

  • 카프카 2대 추가 설치
  • 각 카프카에 broker설정 파일에서 broker.id 부분만 다른 서버와 겹치지 않게 추가한 후 실행
  • 주키퍼 입력도 기존 브로커에 설정한 정보와 동일하게 설정
  • 카프카 실행
    • systemctl start kafka-server.service
  • 주키퍼 CLI로 카프카 정보 확인
    • /zookeeper경로/bin/zkCli.sh
  • 브로커에 토픽/파티션 분배
    • /kafka경로/partition.json 추가
    • {
        "version" : 1,
        "partitions" : [
          {"topic":"peter5","partition":0,"replicas":[2,1]},
          {"topic":"peter5","partition":1,"replicas":[3,2]},
          {"topic":"peter5","partition":2,"replicas":[4,3]},
          {"topic":"peter5","partition":3,"replicas":[5,4]},
          {"topic":"peter5","partition":4,"replicas":[1,5]}
        ]
      }
    • 분산 실행
      • /kafka경로/bin/kafka-reassign-partitions.sh --zookeeper peter-zk001:2181,peter-zk002:2181,peter-zk003:2181/peter-kafka
        --reassignment-json-file /kafka경로/partition --execute
  • 토픽/파티션 분배시 운영중이라면 리소스에 큰 부담이 될 수 있다
    • 토픽의 사용량이 적은 시간에 수행
    • 토픽의 보관 주기를 줄여서 임시로 사이즈를 축소 시키고 진행

 

 

6.4 카프카 모니터링

카프카 실행 파일에 JMX 설정 추가

/kafka경로/bin/kafka-server-start.sh 변경 -> 버전 변경 후 실행파일 변경 될때 마다 변경해줘야 하므로 비추

# limitations under the License.

export JMX_PORT=9999 # 이부분 추가
if [ $# -lt 1];

카프카 재시작 : systemctl restart kafka-server.service

systemd 환경변수 옵션을 이용

/kafka경로/config/jxm 파일 생성

JMX_PORT=9999

/etc/systemd/system/kafka-server.service 수정

ExecStop=/kafka경로/bin/kafka-server-stop.sh
EnvironmentFile=/kafka경로/config/jmx

systemd 재시작 : systemtl daemon-reload

카프카 재시작 : systemctl restart kafka-server.service

jconsole peter-kafka001:9999 를 입력하여 jconsole실행

 

 

6.5 카프카 매니저 활용

  • 깃허브 접속
    • GitHub.com/yahoo/kafka-manager
  • 최신 릴리즈 확인
  • 다운로드
  • 압축해제
    • unzip 1.3.3.17.zip
  • 경로 이동
    • cd kafka-manager-1.3.3.17
  • 배포 파일 생성
    • ./sbt clean dist
  • 출력문의 마지막 경로의 파일을 원하는 경로에 복사
    • cp /opt/kafka-manager-1.3.3.17/target/universal/kafka-manager-1.3.3.17.zip /usr/local/
  • 현재 위치를 복사한 경로로 이동 후 압축해제
    • cd /usr/local
    • unzip kafka-manager-1.3.3.17.zip
  • 설정 파일 변경
    • conf/application.conf의 주키퍼 주소를 우리주소로 변경
  • 카프카 매니저 실행
    • /매니저경로(usr/local)/kafka-manager-1.3.3.17/bin/kafka-manager -Dconfig.file=/usr/local/kafka-manager-1.3.3.17/conf/application.conf
      -Dhhtp.port=9000
  • peter-kafka001:9000 접속
  • 카프카 클러스터 등록 후, 메뉴를 통해 브로커,토픽을 관리 할 수 있다.
728x90
반응형

댓글