본문 바로가기
개발서적

카프카, 데이터 플랫폼의 최강자 : 실시간 비동기 스트리밍 솔루션 Kafka의 기본부터 확장 응용까지 (3)

by 공부 안하고 싶은 사람 2022. 5. 1.
반응형

3부 카프카의 확장과 응용

응용 사례와 다양한 확장 사례, 대안으로 사용할 수 있는 클라우드 기반 메시징 서비스 소개


7장 카프카를 활용한 데이터 파이프라인 구축

7.1 카프카를 활용한 데이터 흐름도

카프카로그 --(파일비트)--> 카프카토픽 --(나이파이)--> 엘라스틱서치/키바나, HDFS

아차피 나이파이 : 데이터 흐름을 정의, 정의된 흐플대로 자동으로 실행해주는 애플리케이션

나이파이를 이용하여 컨슘 + 데이터 처리 가능

 

 

7.2 파일비트를 이용한 메시지 전송

먼저 파일비트로 간단한 로그를 토픽에 프로듀싱

파일비트 : 엘라스틱에서 제공하는 경량 데이터 수집기

설치 및 설정은 생략

 

 

7.3 나이파이를 이용해 메시지 가져오기

  • 설치
  • 설정
    • /nifi경로(/usr/local/nifi)/conf/nifi.properties 수정
    • nifi.web.http.host=peter-kafka001
      nifi.cluster.is.node=true
      nifi.cluster.node.address=peter-kafka001
      nifi.cluster.node.protocol.port=8082
      nifi.zookeeper.connect.string=peter-zk001:2181,peter-zk002:2181,peter-zk003:2181
    • 인증, 튜닝
  • systemd 등록
    • cd /usr/local/nifi/ (nifi경로)
    • bin/nifi.sh install nifi
  • 실행
    • systemctl start nifi.service
  • 나이파이 컨슈머 설정
    • 나이파이 접속
    • 상단의 Processor 드래그앤드롭
    • kafka 검색하여 ConsumeKafka 선택, 더블클릭
    • ConsumeKafka를 우클릭하여 설정
      • PROPERTIES탭
        • Kafka Brokers : peter-kafka001:9092,peter-kafka002:9092,peter-kafka003:9092
        • Topic Names : peter-log
        • Group ID : peter-nifi-consumer001
      • SETTINS 탭
        • Automaticaaly Terminate Relationships -> sucess 체크
    • 추가한 ConsumeKafka 프로세서를 선택하면 메뉴의 Operate 부분에 ConsumeKafka 프로세서가 보임
    • 재생 버튼을 누르면 메시지 가져오기 시작
  • 컨슈머 그룹에 peter-nifi-consumer001 등록됐는지 확인

 

 

7.4 실시간 분석을 위해 엘라스틱서치에 메시지 저장

엘라스틱서치 : 분산형 RESTful 검색 및 분석에진, 질의를 통해 데이터 분석을 빠르게 할 수 있다

  • 설치
    • key 등록
    • 저장소 등록
      • /etc/yun.repos.d/elasticsearch.rep 생성
      • [elasticsearch-6.x]
        name=Elaticsarch repository for 6.x pachages
        baseurl=https://artifacts.elastic.co/pachages/6.x/yum
        gpgcheck=1
        gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
        enabled=1
        autorefresh=1
        type=rpm-md
    • 설치
      • yum -y install elasticsarch
  • 설정
    • /etc/elasticsarch/elasticsarch.yml 변경
    • path.data: /var/lib/elasticsarch
      path.logs: /var/log/elasticsarch
      cluster.name: peter-es
      node.name: peter-kafka001
      network.bind_host: 0.0.0.0
      http.port: 9200
      transport.tcp.port: 9300
    • 실행
      • systemctl start elasticsarch.service
  • 시작
    • 엘라스틱서치 접속
    • 나이파이로 데이터 전송
      • 나이파이 접속
      • 상단의 Processor 드래그앤드롭
      • putelasticsarch 검색하여 PutElasticsarchHttp 프로세서 추가
      • 설정 변경
        • PROPERTIES
        • SETTINGS
          • Automatically Terminate Relationships : failure, retry, sucess 체크
      • 프로세서 연결
        • ConsumeKafka 프로세서 위에 마우스를 올리면 생기는 화살표 드래그앤드롭해서
        • PutElasticsarchHttp프로세서와 연결
      • 프로세서 실행

 

 

7.5 키바나를 이용해 엘라스틱서치에 저장된 데이터 확인

  • 설치
    • key 등록
    • 저장소 등록
      • /etc/yum.repos.d/kinana.repo 생성
      • [kibana-6.x]
        name=Kibana repository for 6.x pachages
        baseurl=https://artifacts.elastic.co/pachages/6.x/yum
        gpgcheck=1
        gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
        enabled=1
        autorefresh=1
        type=rpm-md
    • 설치
      • yum -y install kibana
  • 설정
    • /etc/kibana/kibana.yml 수정
    • server.host: "0.0.0.0"
      elasticsearch.url: "http://peter-kafka001:9200"
    • 실행
      • systemctl start kibana.service
  • 실행

 

 

7.6 현재의 토픽을 이용해 새로운 토픽으로 메시지 재생산

나이파이를 이용하면 토픽별 라우팅하여, 토픽 내용중 특정 메시지만 꺼내서 다시 새로운 토픽으로 메시지를 보낼 수 있다

-> 확장 시스템과 연동, 메시지 분리 가능

  • 설정
    • 나이파이의 추가 설정
      • EvaluateJsonPath 프로세서 추가()
        • PROPERTIES 설정
          • Destination : flowfile-attribute
          • getHostName : $.beat.hostname
      • RouteOnAttribue 프로세서 추가
        • PROPERTIES 설정
          • Routing Strategy : Route to Property name
          • peter-kafka001 : ${getHostName:equals('peter-kafka001')}
      • 프로듀서 추가
        • PROPERTIES 설정
          • Kafka Brokers : 브로커3개
          • Topic Name : peter-kafka001

8장 카프카 스트림즈 API

8.1 스트림 프로세싱 기초

스프링 프로세싱 : 데이터들이 지속적으로 유입되고 나가는 과정에서 이 데이터에 대한 분석이나 질의를 수행하는 것을 의미 (실시간 분석)

<-> 배치 : 이미 저장된 데이터를 기반으로 분석이나 질의를 수행하고 특정 시간에 처리

장점

  • 이벤트에 즉각적으로 반응(빠른 반영)
  • vs 배치
    • 저장하지 않으므로, 더많은 데이터를 분석할 수 있다
    • 저장된 데이터가 아닌, 지속적으로 유입되는 데이터 분석에 최적화
  • 공유 데이터베이스에 대한 요구를 줄일 수 있어서 인프라에 독립적으로 수행 가능

상태 기반 vs 무상태 스트림

상태 기반 : 이전 스트림 결과를 참조 (결과를 저장소에 저장)

무상태 : 현재 스트림만을 처리

 

 

8.2 카프카 스트림즈

특징

  • 간단하고 가벼운 클라이언트 라이브러리 -> 쉽게 사용 가능
  • 시스템이나 카프카에 대한 의존성이 없음
  • 이중화된 로컬 상태 저장소를 지원
  • 카프카 브로커나 클라이언트에 장애가 새익더라도 스트림에 대해선 1번만 처리 되도록 보장
  • 밀리초 단위의 처리 지연을 보장하기 위해 한 번에 한 레코드만 처리
  • 간단하게 스트림 처리 프로그램을 만들 수 있도록 고수준의 스트림 DSL을 지원하고, 저수준의 프로세싱 API도 제공

개념

토폴로지를 만들어서 처리하는 API

  • 스트림 : key-value 형태로 끊임없이 전달되는 데이터 세트
  • 스트림 처리 애플리케이션 : 하나 이상의 프로세서 토폴로지에서 처리되는 로직
  • 스트림 프로세서 : 하나의 노드
    • 소스 프로세서 : 토픽 레코드 읽어서 다른 노드로
    • 싱크 프로세서 : 마지막 노드로 레코드를 특정 토픽에 저장

프로세서 만드는 방법

  • 카프카 스트림즈 DSL에서 데이터를 처리할 때 공통적으로 필요한 map, filter, join, aggregations와 같은 데이터 프로세싱 메소드 제공
  • 프로세서 API를 제공해서 저수준의 처리

아키텍처

  • 각 스트림 파티션은 카프카의 토픽 파티션에 저장된 정렬된 메시지
    • 카프카 스트림즈는 파티션 개수만큼 태스트를 생성
  • 스트림의 데이터 레코드는 카프카 해당 토픽의 메시지(key-value)
  • 데이터 레코드의 키를 통해 다음 스트림(혹은 토픽)으로 전달
  • 사용자가 스레드의 개수를 지정할 수 있다
    • 증가 시킬 경우, 토폴로지를 복제해서 카프카 파티션을 서로 나눈 다음에 효과적인 병렬처리 수행
    • 별도의 스트림즈 코디네이션 없이 카프카 코디네이션 방식을 사용

 

 

 

 

8.3 카프카 스트림즈를 위한 환경설정

카프카1.0 이상에선 이미 포함되어 배포

의존성

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>streams-quickstart-java</artifactId>
  <version>1.0.0</version>
</dependency>

설정 값 입력

Properties pros = new Properties();
pros.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); // 카프카 클러스터 내의 스트림즈 애플리케이션 구분하는 값
pros.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 접근할 브로커의 정보
pros.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // 직렬화
pros.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass();

토폴리지 생성

StreamsBuilder builder = new StreamsBuilder(); // 토폴로지 빌더
KStream<String, String> source = builder.stream("streams-plaintext-input"); // streams-plaintext-input 토픽으로 부터 입력스트림 생성
source.to("streams-pipe-output"); // streams-pipe-output 토픽으로 전달
Topology topology = builder.build(); // 토폴로지 생성

System.out.println(topology.describe()); // 토폴로지 확인

KafkaStreams streams = new KafkaStreams(topology, props); // 스트림즈 객체 생성
streams.start();
streams.close();

 

 

8.5 행 분리 예제 프로그램 만들기

생략

 

 

8.6 단어 빈도수 세기 예지 프로그램 만들기

생략


9장 카프카 SQL을 이용한 스트리밍 처리

저장 기간에 관계없이 스트리밍과 배치 처리를 동시에 실행할 수 있는 분석 방법

생략


10장 그 밖의 클라우드 기반 메시징 서비스

메시징 시스템 : 카프카, 레빗엠큐, 큐피드 등

메시징 시스템 운영(장애처리, 확장)을 자동으로 관리해주는 클라우드 서비스

  • 구글의 Pub/Sub
  • 아마존 Kinesis
  • 마이크로소프트 Azure의 Event Hubs
  • 오라클 EventHub

 

 

10.1 구글의 Pub/Sub 서비스 소개

로그나 메시지를 다른 서비스들과 연결

프로듀서 : 토픽을 생성하거나, 메시지 전달

컨슈머 : 토픽에 Subcriber(컨슈머그룹)를 특정 토픽을 대상으로 만든 다음 메시지 가져옴

차이점 : 메시지를 한 컨슈머쪽으로 보낼 수 있고, 여러 컨슈머 쪽으로 보낼 수도 있다

특징

  • 메시징 플랫폼 단일화
  • push/pull 형태의 메시지 전달 방법을 간편 설정
  • 신뢰성 있는 데이터 저장을 보관(복제된 저장소 이용)
  • 명시적 ACK 제공
  • 플로 컨트롤 제공
  • REST API 제공

 

 

 

사용법 생략

 

 

10.7 카프카와 클라우드 서비스 비교

  카프카 키네시스 펍/섭
사용 형태 DIY 클라우드 서비스 클라우드 서비스
SDK 자바/파이썬 자바/파이썬/고/닷넷 자바/파이썬/고
리플리케이션 팩터 조정 가능 조정 불가 조정 불가
성능 시스템 구성에 따라 샤드 1MB/초 쓰기, 2MB/초 읽기 100MB/초 읽기, 쓰기
ACK 주키퍼에 저장 아마존 다이나모DB서비스에 저장 펍/섭 자체 관리
데이터 저장 기간 조정 가능 최대 7일 최대 7일
메시지 순서 파티션 단위 샤드 단위 토픽 단위
운영 인원 필요 필요 없음 필요 없음
운영 비용 장비/인건비 서비스 사용료 서비스 사용료

도커를 이용한 카프카 설치

  • 도커설치
    • 이전 도커 삭제
      • yum remove docker docker-common docker-selinux docker-engine
    • 패키지 설치
      • yum install -y yum-utils device-mapper-persistent-data-lvm2
    • 저장소 추가
    • 설치
      • yum -y install docker-ce
    • 실행
      • systemctl start docker
  • 카프카설치
    • 카프카 이미지 내려받기
      • docker pull dockerkafka/zookeeper
    • 주키퍼 실행
      • docker run -d --name zookeeper -p 2181:2181 dockerkafka/zookeeper
    • 카프카 실행
      • docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper dockerkafka/kafka
728x90
반응형

댓글