๐Ÿค” Flow in Coroutine

์ฝ”๋ฃจํ‹ด์—์„œ ํ”Œ๋กœ์šฐ๋Š” ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์ด๋ฉฐ ์ฝ”๋ฃจํ‹ด ์ƒ์—์„œ ๋ฆฌ์•กํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์„ ์ง€์› ํ•˜๊ธฐ ์œ„ํ•œ ๊ตฌ์„ฑ ์š”์†Œ๋‹ค. ํ”Œ๋กœ์šฐ๋Š” suspend function์„ ์‚ฌ์šฉํ•˜์—ฌ ๊ฐ’์„ ๋น„๋™๊ธฐ์ ์œผ๋กœ ์ƒ์„ฑํ•˜๊ณ  ์‚ฌ์šฉํ•œ๋‹ค. ์ฆ‰ ๊ธฐ๋ณธ ์Šค๋ ˆ๋“œ๋ฅผ ์ฐจ๋‹จํ•˜์ง€ ์•Š๊ณ  ๋„คํŠธ์›Œํฌ ์š”์ฒญ์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

Untitled

  1. Producer

    ์ƒ์‚ฐ์ž๋Š” DataSource๋กœ๋ถ€ํ„ฐ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์™€ ์ŠคํŠธ๋ฆผ์— ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐœํ–‰ํ•œ๋‹ค. flow ๋ธ”๋ก ์•ˆ์—์„œ emit ํ•จ์ˆ˜๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ๋‚ด๋ณด๋‚ธ๋‹ค.

    class KakaoRemoteDataSource @Inject constructor(
        private val kakaoService: KakaoService
    ) : KakaoDataSource {
        override fun getKeywordPlace(
            keyword: String
        ): Flow<PlaceKeywordResult> = flow{
    		emit(kakaoService.getKeywordPlace(keyword))
    	}
    }
    

    ์•„๋ž˜ ์˜ˆ์—์„œ๋Š” ๋ฐ์ดํ„ฐ ์†Œ์Šค๊ฐ€ ๊ณ ์ •๋œ ๊ฐ„๊ฒฉ์œผ๋กœ ์ตœ์‹  ๋‰ด์Šค๋ฅผ ๊ฐ€์ ธ ์˜จ๋‹ค.

    class NewsRemoteDataSource(
    ย  ย  private val newsApi: NewsApi,
    ย  ย  private val refreshIntervalMs: Long = 5000
    ) {
    ย  ย  val latestNews: Flow<List<ArticleHeadline>> = flow {
    ย  ย  ย  ย  while(true) {
    ย  ย  ย  ย  ย  ย  val latestNews = newsApi.fetchLatestNews()
    ย  ย  ย  ย  ย  ย  emit(latestNews) // Emits the result of the request to the flow
    ย  ย  ย  ย  ย  ย  delay(refreshIntervalMs) // Suspends the coroutine for some time
    ย  ย  ย  ย  }
    ย  ย  }
    }
    
  2. Intermediary

    ์ค‘๊ฐœ์ž๋Š” ์ŠคํŠธ๋ฆผ์— ๋‚ด๋ณด๋‚ด๋Š” ๊ฐ๊ฐ์˜ ๊ฐ’์ด๋‚˜ ์ŠคํŠธ๋ฆผ ์ž์ฒด๋ฅผ ์ˆ˜์ •ํ•  ์ˆ˜ ์žˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด ์„œ๋ฒ„๋กœ ๋ถ€ํ„ฐ ๋ฐ›์€ ๋ฐ์ดํ„ฐ๋‚˜ ๋ฐ์ดํ„ฐ์˜ ์ž๋ฃŒํ˜•์ด view์—์„œ ํ•„์š”ํ•˜๋„๋ก ๋ฐ”๊พธ๋Š” ์ž‘์—…์ด ์ด์— ํ•ด๋‹นํ•œ๋‹ค. ์ค‘๊ฐ„ ์—ฐ์‚ฐ์ž์—๋Š” **map(๋ฐ์ดํ„ฐ ๋ณ€ํ˜•), filter(๋ฐ์ดํ„ฐ ํ•„ํ„ฐ๋ง), onEach(๋ชจ๋“  ๋ฐ์ดํ„ฐ ๋งˆ๋‹ค ์—ฐ์‚ฐ ์ˆ˜ํ–‰)**์ด ์žˆ๋‹ค.

    class KakaoRepositoryImpl @Inject constructor(
        private val kakaoRemoteDataSource: KakaoRemoteDataSource,
    ): KakaoRepository {
        override fun getKeywordPlace(
            keyword: String
        ): Flow<List<Place>> = kakaoRemoteDataSource.getKeywordPlace(keyword).map{
            it.documents.map{p->Place(p.place_name, p.category_name, p.x, p.y)}
        }
    }
    
  3. Consumer

    ์†Œ๋น„์ž๋Š” collect๋ฅผ ์ด์šฉํ•˜์—ฌ ์ŠคํŠธ๋ฆผ์˜ ๊ฐ’์„ ์‚ฌ์šฉํ•œ๋‹ค. collect๋Š” suspend function์ด๋ฏ€๋กœ ์ฝ”๋ฃจํ‹ด ๋‚ด์—์„œ ์‹คํ–‰๋˜์–ด์•ผ ํ•œ๋‹ค.

    class LatestNewsViewModel(
    ย  ย  private val newsRepository: NewsRepository
    ) : ViewModel() {
    
    ย  ย  init {
    ย  ย  ย  ย  viewModelScope.launch {
    ย  ย  ย  ย  ย  ย  // Trigger the flow and consume its elements using collect
    ย  ย  ย  ย  ย  ย  newsRepository.favoriteLatestNews.collect { favoriteNews ->
    ย  ย  ย  ย  ย  ย  ย  ย  // Update View with the latest favorite news
    ย  ย  ย  ย  ย  ย  }
    ย  ย  ย  ย  }
    ย  ย  }
    }
    

์˜ˆ์™ธ ์ฒ˜๋ฆฌ

catch ๋ธ”๋ก์„ ์‚ฌ์šฉํ•˜์—ฌ ์˜ˆ์™ธ๋ฅผ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค.

class LatestNewsViewModel(
ย  ย  private val newsRepository: NewsRepository
) : ViewModel() {

ย  ย  init {
ย  ย  ย  ย  viewModelScope.launch {
ย  ย  ย  ย  ย  ย  newsRepository.favoriteLatestNews
ย  ย  ย  ย  ย  ย  ย  ย  // Intermediate catch operator. If an exception is thrown,
ย  ย  ย  ย  ย  ย  ย  ย  // catch and update the UI
ย  ย  ย  ย  ย  ย  ย  ย  .catch { exception -> notifyError(exception) }
ย  ย  ย  ย  ย  ย  ย  ย  .collect { favoriteNews ->
ย  ย  ย  ย  ย  ย  ย  ย  ย  ย  // Update View with the latest favorite news
ย  ย  ย  ย  ย  ย  ย  ย  }
ย  ย  ย  ย  }
ย  ย  }
}

flowOn

withContext์™€ ๋น„์Šทํ•˜๊ฒŒ ์ˆ˜ํ–‰ํ•˜๋Š” context๋ฅผ ๋ฐ”๊ฟ€ ์ˆ˜ ์žˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ ์ฝ”๋ฃจํ‹ด์—์„œ emit์„ ํ•˜๋Š” context์™€ collectํ•˜๋Š” context๋Š” ๊ฐ™์•„์•ผ ํ•œ๋‹ค. ์ด๋Ÿฐ ๊ฒฝ์šฐ flowOn์„ ์‚ฌ์šฉํ•˜์—ฌ emitํ•˜๋Š” ๋ถ€๋ถ„์˜ context๋ฅผ ๋ฐ”๊ฟ€ ์ˆ˜ ์žˆ๋‹ค. flowOn ์—ฐ์‚ฐ์ž๋Š” flowOn ์œ„์— ๋ถ€๋ถ„(upstream)๋งŒ ์˜ํ–ฅ์„ ๋ฐ›๋Š”๋‹ค.

class NewsRemoteDataSource(
ย  ย  ...,
ย  ย  private val ioDispatcher: CoroutineDispatcher
) {
ย  ย  val latestNews: Flow<List<ArticleHeadline>> = flow {
ย  ย  ย  ย  // Executes on the IO dispatcher
ย  ย  ย  ย  ...
ย  ย  }
ย  ย  ย  ย  .flowOn(ioDispatcher)
}

๋ฐ์ดํ„ฐ ์†Œ์Šค ๋ ˆ์ด์–ด๊ฐ€ I/O ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๋ฏ€๋กœ I/O ์ž‘์—…์— ์ตœ์ ํ™”๋œ ๋””์ŠคํŒจ์ณ๋ฅผ ์‚ฌ์šฉํ•ด์•ผ ํ•œ๋‹ค.

๐Ÿ˜‹ StateFlow

Flow์˜ ํ•œ๊ณ„ ๋ฐ LiveData ๋ฌธ์ œ์ 

ํ•˜์ง€๋งŒ Flow๋Š” ๋ฐ์ดํ„ฐ์˜ ํ๋ฆ„์„ ๋ฐœ์ƒ์‹œํ‚ค๊ธฐ๋งŒ ํ• ๋ฟ ๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•˜์ง€ ์•Š๋Š”๋‹ค. ์ฆ‰ ํ™”๋ฉด์ด ์žฌ๊ตฌ์„ฑ๋  ๋•Œ๋งˆ๋‹ค ์„œ๋ฒ„๋‚˜ ๋กœ์ปฌ DB๋กœ๋ถ€ํ„ฐ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›์•„์˜ค๊ฑฐ๋‚˜ ๋ฐ์ดํ„ฐ ๊ฐ’์ด collect ๋ ๋•Œ๋งˆ๋‹ค ViewModel์— ์ €์žฅํ•ด๋†“๊ณ  ์‚ฌ์šฉํ•ด์•ผํ•œ๋‹ค. ์ „์ž๋Š” ๋งค์šฐ ๋น„ํšจ์œจ์ ์ธ ๋ฐฉ๋ฒ•์ด๋ฏ€๋กœ ์‚ฌ์šฉํ•˜์ง€ ์•Š๋Š”๋‹ค.

๊ธฐ์กด์—๋Š” LiveData๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์„œ๋ฒ„๋กœ ๋ถ€ํ„ฐ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›์•„์™”๋‹ค. LiveData๋Š” ์˜ต์ €๋ฒ„ ํŒจํ„ด์„ ํ™œ์šฉํ•˜์—ฌ ๊ตฌํ˜„๋˜์—ˆ๊ณ  ์ƒ๋ช…์ฃผ๊ธฐ์˜ ๋ณ€ํ™”๋ฅผ ์ž์ฒด์ ์œผ๋กœ ์ธ์‹ํ•œ๋‹ค๋Š” ์žฅ์ ์ด ์žˆ๋‹ค.

ํ•˜์ง€๋งŒ Clean Architecture๋ฅผ ๋„์ž…ํ•˜๋ฉด์„œ Domain Layer์—์„œ๋„ ๊ทธ๋Œ€๋กœ LiveData๋ฅผ ์ ์šฉํ•˜๋Š” ๊ฒฝ์šฐ๊ฐ€ ์žˆ๋Š”๋ฐ ์ด๋Š” ๋งž์ง€ ์•Š๋‹ค. ๋„๋ฉ”์ธ ๊ณ„์ธต์€ ์•ˆ๋“œ๋กœ์ด๋“œ์— ์˜์กด์„ฑ์„ ๊ฐ€์ง€์ง€ ์•Š๋Š” ์ˆœ์ˆ˜ ์ฝ”ํ‹€๋ฆฐ ์ฝ”๋“œ๋กœ ์ž‘์„ฑ๋˜์–ด์•ผ ํ•œ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ LiveData๋Š” UI์™€ ๋งค์šฐ ๋ฐ€์ ‘ํ•˜๊ฒŒ ์—ฐ๊ฒฐ๋˜์–ด ์žˆ๊ณ  ์•ˆ๋“œ๋กœ์ด๋“œ ํ”Œ๋žซํผ์— ์†ํ•ด์žˆ๊ธฐ ๋•Œ๋ฌธ์— ๋„๋ฉ”์ธ ๋ ˆ์ด์–ด์—์„œ ์‚ฌ์šฉํ•˜๊ธฐ์— ์ ํ•ฉํ•˜์ง€ ์•Š๋‹ค.

StateFlow ๋“ฑ์žฅ