Webflux & Spring Cache Abstraction
- 기존(서블릿 스택) 스프링 환경에서는 Spring Cache Abstraction 이 잘되어 있어서 정말 쉽게 적용할 수 있다. 원하는 캐시 구현체를 설정해주고, Spring 에서 지원하는 @Cacheable annotation 을 사용하면 된다.
- 그런데 spring webflux stack 에서는 기본적으로 aop 가 디펜던시에 없고, spring cache abstraction 을 활용할 수 없다.
Spring Webflux 에서 로컬 캐시가 어려운 점
AOP
- 일단 spring-boot-starter-web 종속성을 추가했을땐 aop dependency 가 포함되어 있지만, spring-boot-starter-webflux 종속성에는 aop 가 없다. 그래서 기존에 spring 환경에서 aop 기반으로 동작하는 @Cacheable 이 동작하지 않는다. (@Transactional, @Async, @Retryable 모두 동작하지 않는다.)
- 만약 annotation 방식을 webflux 에서도 사용하려면 기존의 Spring Cache Abstraction 은 이용할 수 없고, custom Cacheable annotation 과 Aspect 를 직접 구현해야 한다.
Cache API
- 기존의 Cache API 는 reactive 에 어울리지 않는 blocking 모델들이다. 그대로 webflux 환경에서 사용한다면, blocking 코드를 넣는 것이다.
- Reactive 환경에서 사용하려면 reactive type 을 지원하는 Cache 구현체가 필요하다. 뒤에서 설명하겠지만, Caffeine 에서 reactive 환경에서 사용할 수 있는 AsyncCache 를 지원한다.
Mono, Flux
-
Mono, Flux 에도 cache 기능을 지원하긴 한다.
Mono.just("data") .cache(Duration.ofSeconds(10))
-
이런식으로 TTL 까지 파라미터로 넣어줄 수도 있다. 하지만 일반적으로 우리가 생각하는 캐시로써 사용할 수 없다. 위에 작성한 예제에서 Mono 객체를 Spring Cache 와 조합하여 사용하면 “data” 가 저장되는 것이 아니라 Mono 객체의 레퍼런스 주소가 저장된다. 그런데 Mono, Flux 는 구독(subscribe)하기 전까지는 데이터를 받아볼 수 없다. 이 간극으로 인해 Spring Cache Abstraction 과 Mono, Flux 는 기존 처럼(web servlet stack) 동작하지 않는다. 그리고 Mono 객체에 TTL 을 위 처럼 설정하고, Spring CacheManager 에 TTL 를 설정한다고 했을때 그 두 TTL 이 정확히 일치하지 않을 수도 있다.
Spring, Caffeine Github 에 올라온 Q&A
- https://github.com/spring-projects/spring-framework/issues/26713
- Q : Spring cache 환경에서 Async caffeine cache 를 적용할 수 있나요?
- A-1 : Spring 개발자의 답변
- Cache API 를 직접 사용한다면, blocking 되기 때문에 reative type 과 맞지 않다.
- A-2 : Caffeine cache 개발자의 답변
- reactive application 을 위한
AsyncCache
interface 가 있고, Caffeine cache 는 구현체를 지원한다. Caffeine 에서는 CompletableFuture 를 사용하기 때문에 reator 에서 쉽게 적용할 수 있다. - 하지만 Spring Cache Abstraction 에 아직 연동하지 못하기 때문에 native api 를 직접 사용해야 한다.
- reactive application 을 위한
- 즉, Async 를 지원하는 캐시 구현체를 사용해야 진짜 reactive application 에서 사용할 수 있는 캐싱 방법이다.
- https://github.com/ben-manes/caffeine/discussions/518
- 위 spring-framework 에 등록된 이슈에서 나온 동일한 이슈인듯
- Q : AsyncCache.. is thread-safe?
- A : Yep,
AsyncCache
is thread-safe.
구현 방법
CacheMono, CacheFlux & AsyncCache
- Reactor-extra 의 CacheMono(CacheFlux) 와 Caffeine 의 AsyncCache 를 조합해서 캐시를 적용해볼 수 있다.
- webflux 환경에서 kotlin, coroutine 을 사용한다고 하더라도 Filter 등 Spring interface 를 구현해야 할때 suspend fun 을 사용할 수 없기 때문에 그때 CacheMono, CacheFlux 를 사용해야한다.
dependency
implementation("io.projectreactor.addons:reactor-extra")
implementation("com.github.ben-manes.caffeine:caffeine")
- reactor-cache 를 위해
reactor-extra
을 추가한다. - AsyncCache 를 위해
caffeine
을 추가한다. - Caffeine 의 AsyncCache 를 직접 native api 호출 방식으로 사용할 것이기 때문에 spring-boot-starter-cache 는 필요없다.
CacheMono, CacheFlux 에 대해 알아보기
- CacheMono 의 lookup, onCacheMissResume, andWriteWith
fun getUser(key: Long): Mono<User> {
// 아래 코드에서 cacheReader, cacheWriter 구현
return CacheMono.lookup(/* cacheReader() */, key)
.onCacheMissResume { userService.getUser(key) }
.andWriteWith(/* cacheWriter() */)
}
- lookup
- key 에 해당하는 cache data 를 찾는 작업
- onCacheMissResume
- lookup 에서 발견된 cache 가 없을 경우 origin data 를 구하기 위한 작업 호출
- andWriteWith
- onCacheMissResume 다음에 호출되어 origin data 를 cache 에 put
Caffeine 의 AsyncCache 설정하기
-
AsyncCache Bean 등록
@Configuration class CaffeineAsyncCacheConfig { // String type 의 key // User 를 Value 로 캐싱하고 싶을때 아래와 같이 설정 @Bean fun asyncUserCache(): AsyncCache<String, User> { return Caffeine.newBuilder() .expireAfterWrite(30, SECONDS) .maximumSize(10_000) .recordStats() // 아쉽지만 actuator 와 연동은 안되는 것 같다. .buildAsync() } }
-
기존 Caffeine 객체를 Bean 으로 등록하는 과정과 설정 코드는 동일한데, 마지막에
buildAsync()
를 추가하면 Caffeine 의 AsyncCache 로 생성된다. -
그리고 기존에는 CacheManager 에 구현체(Caffeine, EhCache 등등..)만 설정해서 Spring Cache 를 이용할 수 있었는데, Reative 방법은 아직 지원하지 않아서 Caffeine 의 AsyncCache 는 직접 native API 를 호출해야한다.
-
한 가지 아쉬운 점은 actuator 를 통해 cache hit rate 보는 것이 캐시 설계, 튜닝을 할때 좋은 참고 자료가 될 것 같았는데, CacheManager 대신 native api 를 직접 사용해서인지 actuator 에서 보여지지 않는다. 코드에서 직접 호출하여 확인할 수 있는 방법은 있긴하다. 그런데 synchronous() 로 view 를 가져올 수 있는데 프로덕션 환경에서 사용해도 될지는 모르겠다. (일단 쓰지 말란 말은 없는 것 같다.)
fun getCacheStats() { val stats = asyncUserCache.synchronous().stats() println(stats) } // CacheStats{hitCount=5, missCount=1, loadSuccessCount=1, loadFailureCount=0, totalLoadTime=2113585, evictionCount=1, evictionWeight=1}
-
-
AsyncCache 라이브러리 코드를 보면, Parameter 와 Return Type 을 보면 Java8 에서 나온 CompletableFuture 를 사용하기 때문에 Async 로직을 작성할 수 있다.
public interface AsyncCache<K, V> { CompletableFuture<V> getIfPresent(@NonNull @CompatibleWith("K") Object key); void put(@NonNull K key, @NonNull CompletableFuture<V> valueFuture); //... }
Application 기능 구현
-
Application code 에 캐시 기능 구현하기 (UserCacheService class)
- AOP, Annotation 방식으로 구현하지 않고,
Decorator pattern
으로 UserService 를 wrapping 한 UserCacheService 를 만드는 방식으로 구현했다. 그 이유는… aop dependency 를 추가하고 직접 custom annotation 을 추가하면 어렵지 않게 구현할 수도 있을 것 같고 구글링해보면 이런 방법의 예제도 나온다. 그런데 spring-framework 개발팀 하지 않은 이유가 있지 않을까?라는 생각으로 직접 caffeine native api 를 사용하는 방식으로 구현했다.
import com.github.benmanes.caffeine.cache.AsyncCache import org.springframework.stereotype.Service import reactor.cache.CacheMono import reactor.core.publisher.Mono import reactor.core.publisher.Signal import java.util.concurrent.CompletableFuture @Service class UserCacheService( private val asyncUserCache: AsyncCache<String, User>, private val userService: UserService, ) { fun getUser(userId: Long): Mono<User> { return CacheMono.lookup(cacheReader(), userId.toString()) .onCacheMissResume { userService.getUser(userId) } .andWriteWith(cacheWriter()) } private fun cacheReader(): (key: String) -> Mono<Signal<out User>> { return { key -> val value = asyncUserCache.getIfPresent(key) if (value == null) { Mono.empty() } else { Mono.fromFuture { value } .map { Signal.next(it) } } } } private fun cacheWriter(): (key: String, value: Signal<out User>) -> Mono<Void> { return { key, value -> Mono.fromRunnable { asyncUserCache.put(key, CompletableFuture.completedFuture(value.get())) } } } }
@Service class UserService { private val log = LoggerFactory.getLogger(javaClass) fun getUser(userId: Long): Mono<User> { val user = User(userId, "bk") log.info("Call getUser method. $user") return Mono.just(user) } }
- AOP, Annotation 방식으로 구현하지 않고,