3부 카프카의 확장과 응용
응용 사례와 다양한 확장 사례, 대안으로 사용할 수 있는 클라우드 기반 메시징 서비스 소개
7장 카프카를 활용한 데이터 파이프라인 구축
7.1 카프카를 활용한 데이터 흐름도
카프카로그 --(파일비트)--> 카프카토픽 --(나이파이)--> 엘라스틱서치/키바나, HDFS
아차피 나이파이 : 데이터 흐름을 정의, 정의된 흐플대로 자동으로 실행해주는 애플리케이션
나이파이를 이용하여 컨슘 + 데이터 처리 가능
7.2 파일비트를 이용한 메시지 전송
먼저 파일비트로 간단한 로그를 토픽에 프로듀싱
파일비트 : 엘라스틱에서 제공하는 경량 데이터 수집기
설치 및 설정은 생략
7.3 나이파이를 이용해 메시지 가져오기
- 설치
- 나이파이 접속
- 릴리즈 확인
- 원하는 경로에 다운로드
- 압축해제
- tar zxf nifi-1.5.0-bin.tar.gz
- 심볼릭 링크 생성
- ln -s nifi-1.5.0 nifi
- 설정
- /nifi경로(/usr/local/nifi)/conf/nifi.properties 수정
-
nifi.web.http.host=peter-kafka001 nifi.cluster.is.node=true nifi.cluster.node.address=peter-kafka001 nifi.cluster.node.protocol.port=8082 nifi.zookeeper.connect.string=peter-zk001:2181,peter-zk002:2181,peter-zk003:2181
- 인증, 튜닝
- systemd 등록
- cd /usr/local/nifi/ (nifi경로)
- bin/nifi.sh install nifi
- 실행
- systemctl start nifi.service
- 나이파이 컨슈머 설정
- 나이파이 접속
- 상단의 Processor 드래그앤드롭
- kafka 검색하여 ConsumeKafka 선택, 더블클릭
- ConsumeKafka를 우클릭하여 설정
- PROPERTIES탭
- Kafka Brokers : peter-kafka001:9092,peter-kafka002:9092,peter-kafka003:9092
- Topic Names : peter-log
- Group ID : peter-nifi-consumer001
- SETTINS 탭
- Automaticaaly Terminate Relationships -> sucess 체크
- PROPERTIES탭
- 추가한 ConsumeKafka 프로세서를 선택하면 메뉴의 Operate 부분에 ConsumeKafka 프로세서가 보임
- 재생 버튼을 누르면 메시지 가져오기 시작
- 컨슈머 그룹에 peter-nifi-consumer001 등록됐는지 확인
7.4 실시간 분석을 위해 엘라스틱서치에 메시지 저장
엘라스틱서치 : 분산형 RESTful 검색 및 분석에진, 질의를 통해 데이터 분석을 빠르게 할 수 있다
- 설치
- key 등록
- rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
- 저장소 등록
- /etc/yun.repos.d/elasticsearch.rep 생성
-
[elasticsearch-6.x] name=Elaticsarch repository for 6.x pachages baseurl=https://artifacts.elastic.co/pachages/6.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md
- 설치
- yum -y install elasticsarch
- key 등록
- 설정
- /etc/elasticsarch/elasticsarch.yml 변경
-
path.data: /var/lib/elasticsarch path.logs: /var/log/elasticsarch cluster.name: peter-es node.name: peter-kafka001 network.bind_host: 0.0.0.0 http.port: 9200 transport.tcp.port: 9300
- 실행
- systemctl start elasticsarch.service
- 시작
- 엘라스틱서치 접속
- 나이파이로 데이터 전송
- 나이파이 접속
- 상단의 Processor 드래그앤드롭
- putelasticsarch 검색하여 PutElasticsarchHttp 프로세서 추가
- 설정 변경
- PROPERTIES
- Elasticsarch URL : http://peter-kafka001:9200
- Index : peter-log-${now():format("yyyy.MM.dd")} 일별 인덱스
- Type : filebeast
- SETTINGS
- Automatically Terminate Relationships : failure, retry, sucess 체크
- PROPERTIES
- 프로세서 연결
- ConsumeKafka 프로세서 위에 마우스를 올리면 생기는 화살표 드래그앤드롭해서
- PutElasticsarchHttp프로세서와 연결
- 프로세서 실행
7.5 키바나를 이용해 엘라스틱서치에 저장된 데이터 확인
- 설치
- key 등록
- rpm rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsarch
- 저장소 등록
- /etc/yum.repos.d/kinana.repo 생성
-
[kibana-6.x] name=Kibana repository for 6.x pachages baseurl=https://artifacts.elastic.co/pachages/6.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md
- 설치
- yum -y install kibana
- key 등록
- 설정
- /etc/kibana/kibana.yml 수정
-
server.host: "0.0.0.0" elasticsearch.url: "http://peter-kafka001:9200"
- 실행
- systemctl start kibana.service
- 실행
- 접속
- Management - Index Patterns 클릭
- peter-log-* 입력 한 후 Create
- Discover 메뉴 클릭
7.6 현재의 토픽을 이용해 새로운 토픽으로 메시지 재생산
나이파이를 이용하면 토픽별 라우팅하여, 토픽 내용중 특정 메시지만 꺼내서 다시 새로운 토픽으로 메시지를 보낼 수 있다
-> 확장 시스템과 연동, 메시지 분리 가능
- 설정
- 나이파이의 추가 설정
- EvaluateJsonPath 프로세서 추가()
- PROPERTIES 설정
- Destination : flowfile-attribute
- getHostName : $.beat.hostname
- PROPERTIES 설정
- RouteOnAttribue 프로세서 추가
- PROPERTIES 설정
- Routing Strategy : Route to Property name
- peter-kafka001 : ${getHostName:equals('peter-kafka001')}
- PROPERTIES 설정
- 프로듀서 추가
- PROPERTIES 설정
- Kafka Brokers : 브로커3개
- Topic Name : peter-kafka001
- PROPERTIES 설정
- EvaluateJsonPath 프로세서 추가()
- 나이파이의 추가 설정
8장 카프카 스트림즈 API
8.1 스트림 프로세싱 기초
스프링 프로세싱 : 데이터들이 지속적으로 유입되고 나가는 과정에서 이 데이터에 대한 분석이나 질의를 수행하는 것을 의미 (실시간 분석)
<-> 배치 : 이미 저장된 데이터를 기반으로 분석이나 질의를 수행하고 특정 시간에 처리
장점
- 이벤트에 즉각적으로 반응(빠른 반영)
- vs 배치
- 저장하지 않으므로, 더많은 데이터를 분석할 수 있다
- 저장된 데이터가 아닌, 지속적으로 유입되는 데이터 분석에 최적화
- 공유 데이터베이스에 대한 요구를 줄일 수 있어서 인프라에 독립적으로 수행 가능
상태 기반 vs 무상태 스트림
상태 기반 : 이전 스트림 결과를 참조 (결과를 저장소에 저장)
무상태 : 현재 스트림만을 처리
8.2 카프카 스트림즈
특징
- 간단하고 가벼운 클라이언트 라이브러리 -> 쉽게 사용 가능
- 시스템이나 카프카에 대한 의존성이 없음
- 이중화된 로컬 상태 저장소를 지원
- 카프카 브로커나 클라이언트에 장애가 새익더라도 스트림에 대해선 1번만 처리 되도록 보장
- 밀리초 단위의 처리 지연을 보장하기 위해 한 번에 한 레코드만 처리
- 간단하게 스트림 처리 프로그램을 만들 수 있도록 고수준의 스트림 DSL을 지원하고, 저수준의 프로세싱 API도 제공
개념
토폴로지를 만들어서 처리하는 API
- 스트림 : key-value 형태로 끊임없이 전달되는 데이터 세트
- 스트림 처리 애플리케이션 : 하나 이상의 프로세서 토폴로지에서 처리되는 로직
- 스트림 프로세서 : 하나의 노드
- 소스 프로세서 : 토픽 레코드 읽어서 다른 노드로
- 싱크 프로세서 : 마지막 노드로 레코드를 특정 토픽에 저장
프로세서 만드는 방법
- 카프카 스트림즈 DSL에서 데이터를 처리할 때 공통적으로 필요한 map, filter, join, aggregations와 같은 데이터 프로세싱 메소드 제공
- 프로세서 API를 제공해서 저수준의 처리
아키텍처
- 각 스트림 파티션은 카프카의 토픽 파티션에 저장된 정렬된 메시지
- 카프카 스트림즈는 파티션 개수만큼 태스트를 생성
- 스트림의 데이터 레코드는 카프카 해당 토픽의 메시지(key-value)
- 데이터 레코드의 키를 통해 다음 스트림(혹은 토픽)으로 전달
- 사용자가 스레드의 개수를 지정할 수 있다
- 증가 시킬 경우, 토폴로지를 복제해서 카프카 파티션을 서로 나눈 다음에 효과적인 병렬처리 수행
- 별도의 스트림즈 코디네이션 없이 카프카 코디네이션 방식을 사용
8.3 카프카 스트림즈를 위한 환경설정
카프카1.0 이상에선 이미 포함되어 배포
의존성
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>streams-quickstart-java</artifactId>
<version>1.0.0</version>
</dependency>
설정 값 입력
Properties pros = new Properties();
pros.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); // 카프카 클러스터 내의 스트림즈 애플리케이션 구분하는 값
pros.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 접근할 브로커의 정보
pros.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // 직렬화
pros.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass();
토폴리지 생성
StreamsBuilder builder = new StreamsBuilder(); // 토폴로지 빌더
KStream<String, String> source = builder.stream("streams-plaintext-input"); // streams-plaintext-input 토픽으로 부터 입력스트림 생성
source.to("streams-pipe-output"); // streams-pipe-output 토픽으로 전달
Topology topology = builder.build(); // 토폴로지 생성
System.out.println(topology.describe()); // 토폴로지 확인
KafkaStreams streams = new KafkaStreams(topology, props); // 스트림즈 객체 생성
streams.start();
streams.close();
8.5 행 분리 예제 프로그램 만들기
생략
8.6 단어 빈도수 세기 예지 프로그램 만들기
생략
9장 카프카 SQL을 이용한 스트리밍 처리
저장 기간에 관계없이 스트리밍과 배치 처리를 동시에 실행할 수 있는 분석 방법
생략
10장 그 밖의 클라우드 기반 메시징 서비스
메시징 시스템 : 카프카, 레빗엠큐, 큐피드 등
메시징 시스템 운영(장애처리, 확장)을 자동으로 관리해주는 클라우드 서비스
- 구글의 Pub/Sub
- 아마존 Kinesis
- 마이크로소프트 Azure의 Event Hubs
- 오라클 EventHub
10.1 구글의 Pub/Sub 서비스 소개
로그나 메시지를 다른 서비스들과 연결
프로듀서 : 토픽을 생성하거나, 메시지 전달
컨슈머 : 토픽에 Subcriber(컨슈머그룹)를 특정 토픽을 대상으로 만든 다음 메시지 가져옴
차이점 : 메시지를 한 컨슈머쪽으로 보낼 수 있고, 여러 컨슈머 쪽으로 보낼 수도 있다
특징
- 메시징 플랫폼 단일화
- push/pull 형태의 메시지 전달 방법을 간편 설정
- 신뢰성 있는 데이터 저장을 보관(복제된 저장소 이용)
- 명시적 ACK 제공
- 플로 컨트롤 제공
- REST API 제공
사용법 생략
10.7 카프카와 클라우드 서비스 비교
카프카 | 키네시스 | 펍/섭 | |
---|---|---|---|
사용 형태 | DIY | 클라우드 서비스 | 클라우드 서비스 |
SDK | 자바/파이썬 | 자바/파이썬/고/닷넷 | 자바/파이썬/고 |
리플리케이션 팩터 | 조정 가능 | 조정 불가 | 조정 불가 |
성능 | 시스템 구성에 따라 | 샤드 1MB/초 쓰기, 2MB/초 읽기 | 100MB/초 읽기, 쓰기 |
ACK | 주키퍼에 저장 | 아마존 다이나모DB서비스에 저장 | 펍/섭 자체 관리 |
데이터 저장 기간 | 조정 가능 | 최대 7일 | 최대 7일 |
메시지 순서 | 파티션 단위 | 샤드 단위 | 토픽 단위 |
운영 인원 | 필요 | 필요 없음 | 필요 없음 |
운영 비용 | 장비/인건비 | 서비스 사용료 | 서비스 사용료 |
도커를 이용한 카프카 설치
- 도커설치
- 이전 도커 삭제
- yum remove docker docker-common docker-selinux docker-engine
- 패키지 설치
- yum install -y yum-utils device-mapper-persistent-data-lvm2
- 저장소 추가
- yum-config-manager --add-rep https://download.docker.com/linux/centos/docker-ce.repo
- 설치
- yum -y install docker-ce
- 실행
- systemctl start docker
- 이전 도커 삭제
- 카프카설치
- 카프카 이미지 내려받기
- docker pull dockerkafka/zookeeper
- 주키퍼 실행
- docker run -d --name zookeeper -p 2181:2181 dockerkafka/zookeeper
- 카프카 실행
- docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper dockerkafka/kafka
- 카프카 이미지 내려받기
'개발서적' 카테고리의 다른 글
스프링 인 액션 - (3) (0) | 2022.08.10 |
---|---|
스프링 인 액션 - (2) (0) | 2022.08.10 |
스프링 인 액션 - (1) (0) | 2022.08.09 |
카프카, 데이터 플랫폼의 최강자 : 실시간 비동기 스트리밍 솔루션 Kafka의 기본부터 확장 응용까지 (2) (0) | 2022.04.29 |
카프카, 데이터 플랫폼의 최강자 : 실시간 비동기 스트리밍 솔루션 Kafka의 기본부터 확장 응용까지 (1) (0) | 2022.04.11 |
댓글