Part 3. 리액티브 스프링
chaper 10 리액터 개요
코드 개발의 2가지 형태
명령형 : 동기
- 리액티브 : 비동기
동기 vs 비동기 & 블락 vs 논블락
펑션A() 펑션B() 펑션C() 상황에서
제어권의 반환 / 결과의 전달 2가지에 포커스
Sync/Async : 시간(순서)이 맞춰져있는가
Block/non-Block : 제어할 수 없는 대상의 처리 방법
- Async + Block : 비동기라 제어권을 가져가도 되는데 굳이 기다림
- Sync + non-Block : 결과 전달이 가능한지 계속적으로 폴링 (ex. Future, 작업 진행 퍼센트)
tip : 결과전달
Sync == Block / Async == non-Block
Async는 결과가 중요X
non-Block은 결과가 중요O
10.1 리액티브 프로그래밍 이해하기
명령형 프로그래밍(순차실행) -> 쓰레드 차단(낭비) -> 리액티브 프로그래밍(동시성 프로그래밍) -> 동시성 문제 발생
리액티브 프로그래밍 특징
함수적
- 선언적
- 파이프라인 혹은 스트림
10.1.1 리액티브 스트림 정의하기
정의
리액티브 스트림 : 백 프레셔를 갖는 비동기 스트림 처리의 표준을 제공하는 것이 목적
백 프레셔 : 데이터를 소비하는 컨슈머가 처리할 수 있는 만큼만 전달 데이터를 제한하여 폭주 방지
자바 스트림 vs 리액티브 스트림
공통점 : 데이터를 작업하기 위한 동일한 api 사용
- 차이점
- 자바 : 동기, 한정된 데이터로 작업
- 리액티브 : 비동기, 무한 데이터셋 처리. 백 프레셔를 통한 데이터 전달 폭주 방지
인터페이스
Publisher : 하나의 Subscription당 하나의 Subscriber에 발행하는 데이터를 생성
public interface Publisher<T> { void subscribe(Subscriber<? super T> subscriber); // 구독 신청 }
Subscriber : 구독 신청하여 Publisher로 부터 이벤트 수신
public interface Subscriber<T> { void onSubscribe(Subscroption sub); // 구독 관리 ???? void onNext(T item); // 데이터 전송 void onError(Throwable ex); // 에러 발생 void onComplete(); // 작업 종료 }
Subscription : 전송되는 데이터를 요청, 구독 취소
public interface Subscription { void request(long n); // 데이터 전송 요청 (n 받고자 하는 데이터수 -> 백 프레셔) void cancel(); // 구독 취소 }
Processor : Subscriber와 Publisher 결합.
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
리액티브 스트림 인터페이스는 스트림 구성하는 API가 없다. -> 프로젝트 리액터는 API가 있다. 리액터는 스프링 5의 리액티브 프로그래밍 모델의 기반
10.2 리액터 시작하기
데이터 파이프라인을 구성하는 것
Publisher 구현체 2개
Mono : 하나의 데이터셋 항목만 갖는 데이터셋에 최적화 -> RxJava의 Observable
Mono.just("test") .map(n -> n.toUpperCase()) .subscribe(System.out::println);
Flux : 0,1 또는 다수의 데이터셋을 갖는 파이프라인 -> RxJava의 Single
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
10.3 리액티브 오퍼레이션 적용하기
생성 오퍼레이션
Flux<String> fruitFlux = Flux.just("Apple","Orange"); // 객체로 부터 FLux 생성
fruitFlux.subscribe( // 구독자 추가
f -> System.out.println(f)
)
StepVerifier.create(fruitFlux) // 스트림 데이터 테스트를 위해 Assertion을 적용하는 법
.expectNext("Apple").expectNext("Orange")
.verifyComplete();
String[] fruit = new String[]{"Apple","Orange"};
Flux<String> fruitFlux = Flux.fromArray(fruits); // 배열으로 부터 Flux 생성
List<String> fruitList = new ArrayList<>();
fruitList.add("Apple");
fruitList.add("Orange");
Flux<String> fruitFlux = Flux.fromIterable(fruitList); // Iterable 구현체로 부터 Flux 생성
Flux<String> fruitFlux = Flux.fromStream(
Stream.of("Apple","Orange")
); // Stream 객체로 Flux 생성
Flux<Integer> intervalFlux = Flux.range(1,5); // 범위내 증가하는 값의 Flux
Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1))
.take(5); // 산격과 주기를 지정하여 값을 방출하는 Flux
조합 오퍼레이션
Flux<String> flux1 = Flux.just("A","B","C");
Flux<String> flux2 = Flux.just("1","2","3");
Flux<String> mergedFlux = flux1.mergeWith(flux2); // 조합(순서 보장 X)
Flux<Tuple2<String, String>> zippedFlux = Flux.zip(flux1, flux2); // Tuple안에 1개씩 번갈아 방출
Flux<String> zippedFlux = FLux.zip(flux1, flux2, (a, b) -> a + ", " + b); // 1개씩 결합하여 방출
Flux<String> firstFlux = Flux.first(flux1, flux2); // 두 Flux중 먼저 방출하는 Flux의 값만 방출
변환 오퍼레이션
Flux<String> flux = Flux.just("1","2","3","4","5");
flux.skip(3); // 앞의 3개를 무시하고 방출
flux.deleayElements(Duration.ofSeconds(1))
.skip(Duration.ofSeconds(4)); // 4초 이후에 방출 -> 4,5만 방출
flux.take(3); // 앞의 3개만 방출
flux.delayElements(Duration.ofSeconds(1))
.take(Duration.ofMillis(3500)); // 3.5초 동안만 방출
flux.filter(a -> !a.equlas("3")) // 조건식(Predicate)으로 방출
flux.distinct(); // 중복제거
Flux<Integer> integerFlux = flux.map(a -> Integer.parseInt(a)); // 동기적 매핑
Flux<Integer> integerFlux = flux.flatMap(a -> Mono.just(a)
.map(a -> Integer.parseInt(a))
.subscribeOn(Schedulers.parallel()) // 병렬처리
); // 비동기적 매핑
Flux<List<String>> bufferedFlux = flux.buffer(3); // 3개식 리스트로 묶기
Mono<Map<Integer, String>> mapMono = flux.colletMap(a -> Integer.parseInt(a)); // 방출하는 항목 모으기
Mono<List<String> mapMono = flux.colletList(); // 방출하는 항목 모으기
로직 오퍼레이션
Flux<String> flux = Flux.just("1","2","3","4","5");
Mono<Boolean> hasAMono = flux.all(a -> a >= 1); // 모든 메시지가 충족하는지 검사
Mono<Boolean> hasAMono = flux.any(a -> a > 4); // 최소 하나의 메시지가 충족하는지 검사
chapter 11 리액티브 API 개발하기
11.1 스프링 WebFlux 사용하기
이벤트 루핑을 통해 한 스레드당 요청 처리량을 증가
WebFlux : 리액티브 프로그래밍 모델을 스프링 MVC에 억지로 집어넣는 대신에, MVC에서 가져와서 별도의 프레임워크를 만든것(디폴트 WAS는 Netty)
스프링 MVC도 Mono나 Flux를 반환할 수 있지만, 다중 스레드에 의존하는 서블릿 기반 웹 프레임워크다. (Block)
public interface TacoRepository extends ReactiveCrudRepository<Taco, Long> {}
// end-to-end 에서 Flux를 반환할 수 있도록 변경
// .subscribe()를 호출하지 않아도 프레임워크에서 호출해준다.
// Flux를 반환하거나 Flux를 인자로 받아 올 수 있다.
@PostMapping()
public Mono<Taco> postTaco(@RequestBody Mono<Taco> taco) {
return tacoRepository.saveAll(taco).next();
}
11.2 함수형 요청 핸들러 정의하기
스프링 함수형 프로그래밍은 애노테이션을 사용하지 않고 요청을 핸들러 코드에 연관시킨다. (애노테이션에 중단점 사용이 안되므로)
RequestPredicate : 처리될 요청의 종류 선언
RouterFunction : 일치하는 요청이 어떻게 핸들러에 전달돼야 하는지 선언
ServerRequest : HTTP 요청, 헤더와 바디 사용
ServerResponse : HTTP 응답, 헤더와 바디 사용
import static ...RequestPredicate.GET;
import static ...RouterFunction.route;
import static ...ServerResponse.ok;
import static ...Mono.just;
@Configuration
public class RouterFunctionConfig {
@Bean
public RouterFunction<?> helloRouterFunction() {
return route(GET("/hello"),
request -> ok().body(just("Hello World"), String.Class))
.andRoute(GET("/bye"),
requset -> ok().body(just("See Ya!"), String.Class));
}
}
11.3 리액티브 컨트롤러 테스트하기
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment=WebEnvironment.RANDOM_PORT)
public class WebTest {
@Autowired
private WebTestClient testClient;
// 통합 테스트
@Test
public void shouldRetrunRecentTacos1() {
testClient.get().uri("/design/recent")
.exchange() // 최근 타코들을 요청
.expectStatus().isOK()
.expectBody()
.jsonPath("$").isArray()
.jsonPath("$[0].id").isEqualTo(taos[0].getId().toString())
.isEqualTo(tacos[1].getId().toString()).jsonPath("$[1].name");
}
// 단위 테스트
@Test
public void shouldRetrunRecentTacos2() {
Taco[] tacos = {testTaco(1L), testTaco(2L), testTaco(3L), testTaco(4L)};
Flux<Taco> tacoFlux = Flux.just(tacos);
TacoRepository tacoRepository = Mockito.mock(TacoRepository.Class);
when(tacoRepository.findAll()).thenReturn(tacoFlux);
WebTestClient testClient = WebTestClient.bindToController(
new DesignTacoController(tacoRepository))
.build(); // 리액티브 컨트롤러 테스트용
testClient.get().uri("/design/recent")
.exchange() // 최근 타코들을 요청
.expectStatus().isOK()
.expectBody()
.jsonPath("$").isArray()
.jsonPath("$[0].id").isEqualTo(taos[0].getId().toString())
.isEqualTo(tacos[1].getId().toString()).jsonPath("$[1].name");
}
}
11.4 REST API를 리액티브하게 사용하기
Flux<Ingredient> ingerdients = WebClient.create()
.post()
.uri("xxxxx")
.body(ingrdientMono, Ingredient.class) // .syncBody()를 통해 publisher 구현체가 아닌 객체 쓸 수 있다.
.retrieve()
.onStatus(HttpStatus::is4xxClientError,
response -> Mono.just(new UnknownIngredientException())) // HTTP 상태 코드 처리
.bodyToFlux(Ingredient.class); // bodyToMono로 Mono를 추출 할 수 있다.
ingerdients
.timeout(Duration.ofSeconds(1)) // 타임아웃 설정
.subscribe(i -> {...}, // 구독을 해야 요청이 전송된다.
e -> {handler}); // 에러 설정
@Bean
public WebClient customWebClient() { // 기본 URI 설정
return WebClient.create("http://localhost:8080");
}
Flux<Ingredient> ingerdients = WebClient.create()
.get()
.uri("xxxxx")
.retrieve() // retrieve
.bodyToFlux(Ingredient.class);
Flux<Ingredient> ingerdients = WebClient.create()
.get()
.uri("xxxxx")
.exchange() // exchange
.flatMap(cr -> cr.bodyToMono(Ingredient.class));
// 동일한 코드지만 쿠키/헤더 정보를 읽어야한다면 아래처럼 써야한다.
11.5 리액티브 웹 API 보안
이전까지는 서블릿 필터 중심의 보안 모델을 사용했지만,
Spring Security 5.0.0 버전부터는 WebFilter사용으로 서블릿 필터에 의존하지 않기 때문에 WebFlux에서도 적용가능(사용법 비슷)
Spring MVC와의 차이점
@Configuration
@EnableWebFluxSecurity // @EnableWebSecurity
public class SecurityConfig { // extends WebSecurityConfigurerAdapter
@Bean // override configure(HttpSecurity http)
public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http) {
return http
.authorizeExchange()
.pathMatchers("/orders").hasAuthority("USER")
.anyExchange().permitAll() // /** 미사용
.and()
.build(); // build 필요
}
}
@Service // configure(AuthenticationManagerbuilder auth)
public ReactiveUserDetailService userDetailService(UserRepository userRepository) {
return new ReactiveUserDetailService() {
@Override
public Mono<UserDetails> findByUsername(String usernmae) {
return userRepository.findByUsername(username)
.map(user -> {
return user.toUserDetails();
});
}
};
}
chapter 12 리액티브 데이터 퍼시스턴스
Repository가 Block이라면 비동기가 아니다.
하지만 당장 RDB를 바꿀 순 없다 -> 리액티브와 리액티브가 아닌 타입 간의 변화(JPA <-> 리액티브)
List<Order> orders = repository.findByUser(user);
Flux<Order> orderFlux = Flux.fromIterable(orders); // 가능한 빨리 리액티브로 변환하여 결과를 처리할 수는 있다.
Taco taco = tacoMono.block(); // 일반 객체로 변환(남용주의)
tacoRepository.save(taco);
tacoFlux.subscribe(taco -> {
tacoReptository.save(taco);
}) // 위 예제보다 리액티브한 방법이다.
카산드라 Repository 작성하기
몽고DB Repository 작성하기(생략)
'개발서적' 카테고리의 다른 글
스프링 인 액션 - (6) (0) | 2022.08.17 |
---|---|
스프링 인 액션 - (5) (0) | 2022.08.12 |
스프링 인 액션 - (3) (0) | 2022.08.10 |
스프링 인 액션 - (2) (0) | 2022.08.10 |
스프링 인 액션 - (1) (0) | 2022.08.09 |
댓글