본문 바로가기
개발서적

스프링 인 액션 - (4)

by 공부 안하고 싶은 사람 2022. 8. 11.
반응형

Part 3. 리액티브 스프링

chaper 10 리액터 개요

코드 개발의 2가지 형태

명령형 : 동기

  • 리액티브 : 비동기

동기 vs 비동기 & 블락 vs 논블락

펑션A() 펑션B() 펑션C() 상황에서

제어권의 반환 / 결과의 전달 2가지에 포커스

Sync/Async : 시간(순서)이 맞춰져있는가

Block/non-Block : 제어할 수 없는 대상의 처리 방법

  • Async + Block : 비동기라 제어권을 가져가도 되는데 굳이 기다림
  • Sync + non-Block : 결과 전달이 가능한지 계속적으로 폴링 (ex. Future, 작업 진행 퍼센트)

tip : 결과전달
Sync == Block / Async == non-Block
Async는 결과가 중요X
non-Block은 결과가 중요O

 

 

10.1 리액티브 프로그래밍 이해하기

명령형 프로그래밍(순차실행) -> 쓰레드 차단(낭비) -> 리액티브 프로그래밍(동시성 프로그래밍) -> 동시성 문제 발생

리액티브 프로그래밍 특징

함수적

  • 선언적
  • 파이프라인 혹은 스트림

10.1.1 리액티브 스트림 정의하기

정의

리액티브 스트림 : 백 프레셔를 갖는 비동기 스트림 처리의 표준을 제공하는 것이 목적

백 프레셔 : 데이터를 소비하는 컨슈머가 처리할 수 있는 만큼만 전달 데이터를 제한하여 폭주 방지

 

자바 스트림 vs 리액티브 스트림

공통점 : 데이터를 작업하기 위한 동일한 api 사용

  • 차이점
    • 자바 : 동기, 한정된 데이터로 작업
    • 리액티브 : 비동기, 무한 데이터셋 처리. 백 프레셔를 통한 데이터 전달 폭주 방지

 

인터페이스

Publisher : 하나의 Subscription당 하나의 Subscriber에 발행하는 데이터를 생성

public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber); // 구독 신청
}

Subscriber : 구독 신청하여 Publisher로 부터 이벤트 수신

public interface Subscriber<T> {
  void onSubscribe(Subscroption sub); // 구독 관리 ????
void onNext(T item); // 데이터 전송
void onError(Throwable ex); // 에러 발생
void onComplete(); // 작업 종료
}

Subscription : 전송되는 데이터를 요청, 구독 취소

public interface Subscription {
void request(long n); // 데이터 전송 요청 (n 받고자 하는 데이터수 -> 백 프레셔)
void cancel(); // 구독 취소
}

Processor : Subscriber와 Publisher 결합.

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}

리액티브 스트림 인터페이스는 스트림 구성하는 API가 없다. -> 프로젝트 리액터는 API가 있다. 리액터는 스프링 5의 리액티브 프로그래밍 모델의 기반

 

10.2 리액터 시작하기

데이터 파이프라인을 구성하는 것

Publisher 구현체 2개

Mono : 하나의 데이터셋 항목만 갖는 데이터셋에 최적화 -> RxJava의 Observable

Mono.just("test")
.map(n -> n.toUpperCase())
.subscribe(System.out::println);

Flux : 0,1 또는 다수의 데이터셋을 갖는 파이프라인 -> RxJava의 Single

<dependency>
  <groupId>io.projectreactor</groupId>
  <artifactId>reactor-core</artifactId>
</dependency>
<dependency>
  <groupId>io.projectreactor</groupId>
  <artifactId>reactor-test</artifactId>
  <scope>test</scope>
</dependency>

 

10.3 리액티브 오퍼레이션 적용하기

생성 오퍼레이션

Flux<String> fruitFlux = Flux.just("Apple","Orange"); // 객체로 부터 FLux 생성
​
fruitFlux.subscribe( // 구독자 추가
  f -> System.out.println(f)
)
  
StepVerifier.create(fruitFlux) // 스트림 데이터 테스트를 위해 Assertion을 적용하는 법
  .expectNext("Apple").expectNext("Orange")
  .verifyComplete(); 
String[] fruit = new String[]{"Apple","Orange"};
Flux<String> fruitFlux = Flux.fromArray(fruits); // 배열으로 부터 Flux 생성
​
List<String> fruitList = new ArrayList<>();
fruitList.add("Apple");
fruitList.add("Orange");
Flux<String> fruitFlux = Flux.fromIterable(fruitList); // Iterable 구현체로 부터 Flux 생성
​
Flux<String> fruitFlux = Flux.fromStream(
  Stream.of("Apple","Orange")
); // Stream 객체로 Flux 생성
Flux<Integer> intervalFlux = Flux.range(1,5); // 범위내 증가하는 값의 Flux
​
Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1))
  .take(5);  // 산격과 주기를 지정하여 값을 방출하는 Flux

 

조합 오퍼레이션

Flux<String> flux1 = Flux.just("A","B","C");
Flux<String> flux2 = Flux.just("1","2","3");
​
Flux<String> mergedFlux = flux1.mergeWith(flux2); // 조합(순서 보장 X)
​
Flux<Tuple2<String, String>> zippedFlux = Flux.zip(flux1, flux2); // Tuple안에 1개씩 번갈아 방출
Flux<String> zippedFlux = FLux.zip(flux1, flux2, (a, b) -> a + ", " + b); // 1개씩 결합하여 방출
​
Flux<String> firstFlux = Flux.first(flux1, flux2); // 두 Flux중 먼저 방출하는 Flux의 값만 방출

 

변환 오퍼레이션

Flux<String> flux = Flux.just("1","2","3","4","5");
​
flux.skip(3); // 앞의 3개를 무시하고 방출
flux.deleayElements(Duration.ofSeconds(1))
  .skip(Duration.ofSeconds(4)); // 4초 이후에 방출 -> 4,5만 방출
​
flux.take(3); // 앞의 3개만 방출
flux.delayElements(Duration.ofSeconds(1))
  .take(Duration.ofMillis(3500)); // 3.5초 동안만 방출
​
flux.filter(a -> !a.equlas("3")) // 조건식(Predicate)으로 방출
  
flux.distinct(); // 중복제거
​
Flux<Integer> integerFlux = flux.map(a -> Integer.parseInt(a)); // 동기적 매핑
Flux<Integer> integerFlux = flux.flatMap(a -> Mono.just(a)
                                         .map(a -> Integer.parseInt(a))
                                         .subscribeOn(Schedulers.parallel()) // 병렬처리
                                         ); // 비동기적 매핑
​
Flux<List<String>> bufferedFlux = flux.buffer(3); // 3개식 리스트로 묶기
​
Mono<Map<Integer, String>> mapMono = flux.colletMap(a -> Integer.parseInt(a)); // 방출하는 항목 모으기
Mono<List<String> mapMono = flux.colletList(); // 방출하는 항목 모으기

 

로직 오퍼레이션

Flux<String> flux = Flux.just("1","2","3","4","5");
​
Mono<Boolean> hasAMono = flux.all(a -> a >= 1); // 모든 메시지가 충족하는지 검사
Mono<Boolean> hasAMono = flux.any(a -> a > 4); // 최소 하나의 메시지가 충족하는지 검사

 


chapter 11 리액티브 API 개발하기

11.1 스프링 WebFlux 사용하기

이벤트 루핑을 통해 한 스레드당 요청 처리량을 증가

WebFlux : 리액티브 프로그래밍 모델을 스프링 MVC에 억지로 집어넣는 대신에, MVC에서 가져와서 별도의 프레임워크를 만든것(디폴트 WAS는 Netty)

스프링 MVC도 Mono나 Flux를 반환할 수 있지만, 다중 스레드에 의존하는 서블릿 기반 웹 프레임워크다. (Block)

public interface TacoRepository extends ReactiveCrudRepository<Taco, Long> {}
// end-to-end 에서 Flux를 반환할 수 있도록 변경
// .subscribe()를 호출하지 않아도 프레임워크에서 호출해준다.
// Flux를 반환하거나 Flux를 인자로 받아 올 수 있다.
@PostMapping()
public Mono<Taco> postTaco(@RequestBody Mono<Taco> taco) {
  return tacoRepository.saveAll(taco).next();
}

 

11.2 함수형 요청 핸들러 정의하기

스프링 함수형 프로그래밍은 애노테이션을 사용하지 않고 요청을 핸들러 코드에 연관시킨다. (애노테이션에 중단점 사용이 안되므로)

RequestPredicate : 처리될 요청의 종류 선언

RouterFunction : 일치하는 요청이 어떻게 핸들러에 전달돼야 하는지 선언

ServerRequest : HTTP 요청, 헤더와 바디 사용

ServerResponse : HTTP 응답, 헤더와 바디 사용

import static ...RequestPredicate.GET;
import static ...RouterFunction.route;
import static ...ServerResponse.ok;
import static ...Mono.just;
​
@Configuration
public class RouterFunctionConfig {
  @Bean
  public RouterFunction<?> helloRouterFunction() {
    return route(GET("/hello"), 
                request -> ok().body(just("Hello World"), String.Class))
      .andRoute(GET("/bye"),
               requset -> ok().body(just("See Ya!"), String.Class));
  }
}

 

11.3 리액티브 컨트롤러 테스트하기

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment=WebEnvironment.RANDOM_PORT)
public class WebTest {
​
  @Autowired
  private WebTestClient testClient;
  
  // 통합 테스트
  @Test
  public void shouldRetrunRecentTacos1() {
    testClient.get().uri("/design/recent")
      .exchange() // 최근 타코들을 요청
      .expectStatus().isOK()
      .expectBody()
        .jsonPath("$").isArray()
        .jsonPath("$[0].id").isEqualTo(taos[0].getId().toString())
        .isEqualTo(tacos[1].getId().toString()).jsonPath("$[1].name");
  }
  
  // 단위 테스트
  @Test
  public void shouldRetrunRecentTacos2() {
    Taco[] tacos = {testTaco(1L), testTaco(2L), testTaco(3L), testTaco(4L)};
    Flux<Taco> tacoFlux = Flux.just(tacos);
​
    TacoRepository tacoRepository = Mockito.mock(TacoRepository.Class);
    when(tacoRepository.findAll()).thenReturn(tacoFlux);
​
    WebTestClient testClient = WebTestClient.bindToController(
      new DesignTacoController(tacoRepository))
      .build(); // 리액티브 컨트롤러 테스트용
​
    testClient.get().uri("/design/recent")
      .exchange() // 최근 타코들을 요청
      .expectStatus().isOK()
      .expectBody()
        .jsonPath("$").isArray()
        .jsonPath("$[0].id").isEqualTo(taos[0].getId().toString())
        .isEqualTo(tacos[1].getId().toString()).jsonPath("$[1].name");
  }
}

 

11.4 REST API를 리액티브하게 사용하기

Flux<Ingredient> ingerdients = WebClient.create()
  .post()
  .uri("xxxxx")
  .body(ingrdientMono, Ingredient.class) // .syncBody()를 통해 publisher 구현체가 아닌 객체 쓸 수 있다.
  .retrieve()
  .onStatus(HttpStatus::is4xxClientError,
           response -> Mono.just(new UnknownIngredientException())) // HTTP 상태 코드 처리
  .bodyToFlux(Ingredient.class); // bodyToMono로 Mono를 추출 할 수 있다.
​
ingerdients
  .timeout(Duration.ofSeconds(1)) // 타임아웃 설정
  .subscribe(i -> {...}, // 구독을 해야 요청이 전송된다.
            e -> {handler}); // 에러 설정
@Bean
public WebClient customWebClient() { // 기본 URI 설정
  return WebClient.create("http://localhost:8080"); 
}
Flux<Ingredient> ingerdients = WebClient.create()
  .get()
  .uri("xxxxx")
  .retrieve() // retrieve
  .bodyToFlux(Ingredient.class);
​
Flux<Ingredient> ingerdients = WebClient.create()
  .get()
  .uri("xxxxx")
  .exchange() // exchange
  .flatMap(cr -> cr.bodyToMono(Ingredient.class));
// 동일한 코드지만 쿠키/헤더 정보를 읽어야한다면 아래처럼 써야한다.

 

11.5 리액티브 웹 API 보안

이전까지는 서블릿 필터 중심의 보안 모델을 사용했지만,

Spring Security 5.0.0 버전부터는 WebFilter사용으로 서블릿 필터에 의존하지 않기 때문에 WebFlux에서도 적용가능(사용법 비슷)

Spring MVC와의 차이점

@Configuration
@EnableWebFluxSecurity // @EnableWebSecurity
public class SecurityConfig { // extends WebSecurityConfigurerAdapter
  @Bean // override configure(HttpSecurity http)
  public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http) {
    return http
      .authorizeExchange()
      .pathMatchers("/orders").hasAuthority("USER")
      .anyExchange().permitAll() // /** 미사용
      .and()
      .build(); // build 필요
  }
}
@Service // configure(AuthenticationManagerbuilder auth)
public ReactiveUserDetailService userDetailService(UserRepository userRepository) {
  return new ReactiveUserDetailService() {
    @Override
    public Mono<UserDetails> findByUsername(String usernmae) {
      return userRepository.findByUsername(username)
        .map(user -> {
          return user.toUserDetails();
        });
    }
  };
}

 


chapter 12 리액티브 데이터 퍼시스턴스

Repository가 Block이라면 비동기가 아니다.

하지만 당장 RDB를 바꿀 순 없다 -> 리액티브와 리액티브가 아닌 타입 간의 변화(JPA <-> 리액티브)

List<Order> orders = repository.findByUser(user); 
Flux<Order> orderFlux = Flux.fromIterable(orders); // 가능한 빨리 리액티브로 변환하여 결과를 처리할 수는 있다.
Taco taco = tacoMono.block(); // 일반 객체로 변환(남용주의)
tacoRepository.save(taco);
​
tacoFlux.subscribe(taco -> {
  tacoReptository.save(taco);
}) // 위 예제보다 리액티브한 방법이다.

카산드라 Repository 작성하기

몽고DB Repository 작성하기(생략)

728x90
반응형

'개발서적' 카테고리의 다른 글

스프링 인 액션 - (6)  (0) 2022.08.17
스프링 인 액션 - (5)  (0) 2022.08.12
스프링 인 액션 - (3)  (0) 2022.08.10
스프링 인 액션 - (2)  (0) 2022.08.10
스프링 인 액션 - (1)  (0) 2022.08.09

댓글