[코루틴 #4 데이터 주고받기] Sequence, Flow, Channel
Language/코루틴

[코루틴 #4 데이터 주고받기] Sequence, Flow, Channel

728x90
반응형

 

 

 

 

 

 


Iterator


  • 인덱스로 요소를 검색할 수 없으므로 순서대로만 엑세스 할 수 있습니다.
  • 더 많은 요소가 있는지 여부를 나타내는 hasNext()함수가 있습습니다.
  • 요소는 한 방향으로만 검색할 수 있습니다. (이전 요소를 검색할 방법은 없음) (hot stream)

호출 사이에서 일시중단되지만 실행 중에는 일시중단 될 수 없습니다. 그래서 일시중단 연산이 없어도 반복할 수 있습니다. 앞으로 나올 Sequence와 Iteratort는 CoroutineContext를 받지 않습니다. 기본적으로 코드를 호출한 Context와 동일한 Context에서 코드가 실행되기 때문입니다. 정보 잔출 후에만 일시중지가 가능하기 때문에 이를 위해서 yield() 혹은 gieldAll() 함수를 호출해야 합니다.

val iterator = iterator {
  yield("First")
  yield("Second")
  yield("Third")
  }

println(iterator.next())
println(iterator.next())
println(iterator.next())
}

이 코드는 세 가지 요소를 포함하는 Iterator를 빌드합니다. 요소가 처음 요청될 때 첫번째 줄이 실행돼 "First" 값이 산출되고 이후에 실행이 중단됩니다. 다음 요소가 요청되면 두번째 줄이 실행돼 "Second" 가 산출되고 다시 일시 중단됩니다. 따라서 Iterator가 포함하는 세 가지 요소를 얻으려면 next() 함수를 세 번 호출하면 됩니다.

 

 


Sequence


  • 인덱스로 값을 가져올 수 있다.
  • 상태가 저장되지 않으며, 상호작용한 후 자동으로 재설정 된다.
  • 한 번의 호출로 값 그룹을 가져올 수 있다.

일시 중단 시퀀스를 만들기 위해 sequence() 빌더를 사용합니다.

빌더는 일시 중단 람다를 가져와 Sequence 를 반환합니다.

val sequence: Sequence<Any> = sequence {
  yield("A")
  yield(1)
  yield(32L)
}

 

일시중단 시퀀스는 상태가 없고, 사용된 후에 재설정될 수 있습니다.

Iterator를 사용하는 것과는 달리 시퀀스는 각각의 호출마다 요소의 처음부터 실행됨을 알 수 있습니다. (cold stream)

 

 

코루틴 사용 범위와 시점 결정하기 : Sequence( )

- sequence( ) (시퀀스)를 사용하면 아주 많은 값을 만들어내는 코드로부터 특정 값의 범위를 가져올 수 있다. 

- sequence( ) 는 cold stream 이다. (요청할 때마다 처음부터 새로 발행)

 

1) sequence( ) 로 코루틴 생성하기

  • 시퀀스는 코틀린의 표준 라이브러리 중 하나
  • 시퀀스를 사용하면 아주 많은 값을 만들어내는 코드로부터 특정 값의 범위를 가져올 수 있음

 

시퀀스 함수를 사용한 피보나치 수열

// 피보나치 수열 생성
val fibonacciSeq = sequence {
    var a = 0
    var b = 1
    yield(1)  // (1)

    while (true) {
        yield(a + b)  // (2)
        val tmp = a + b
        a = b
        b = tmp
    }
}

// generateSequence()의 사용
val seq = sequence {
    val start = 0
    // 단일 값 산출
    yield(start)
    // 반복 값 산출
    yieldAll(1..5 step 2)
    // 무한한 시퀀스에서 산출
    yieldAll(generateSequence(8) { it * 3 })
}


fun main() {
    println(fibonacciSeq.take(8).toList()) // (3)
    println(seq.take(7).toList()) // [0, 1, 3, 5, 8, 24, 72]

    // 다음 요소에 대한 지정
    val saved = fibonacciSeq.iterator()
    println("${saved.next()}, ${saved.next()}, ${saved.next()}")

}

 

 


Flow


코루틴 플로우(Flow)는 suspend function을 보완하기 위해 나왔다.


비동기 동작의 결과로 suspend function 이 하나의 결과물 던진다면,
플로우를 이용하여 여러 개의 결과를 원하는 형식으로 던질 수 있다.



특징

비동기이며 코루틴에서만 동작 가능한 것은 suspend function 과 동일하다.

다른 점은 함수 앞에 suspend 를 붙이지 않아도 된다.

 

cold stream ( kotlin의 sequence )으로 동작하며 hot stream 은 지원하지 않는다.

그렇기에 데이터는 요청할 때마다 처음부터 새로 발행되며,

요청 전에는 선언만 있을 뿐 아무런 동작도 하지 않는다.

 

직접 취소할 수 있는 함수는 따로 제공하지 않는다.



 

Flow 생성

우선 Flow를 생성하는 방법부터 알아보겠다.

 

 

 

 

 

Flow에게 데이터 요청하기

모든 코드는 다 코루틴 내부에서 실행되어야 한다. 

 

데이터 요청을 위한 함수는 

collect, collectIndexed, collectLatest, launchIn 터미널 연산자 등이 있다. 

 

 

 

Flow 취소하기

Flow는 취소와 관련된 함수는 없다. 

취소를 원한다면 다음과 같이 외부에서 취소가 가능한 무언가로 감싸줘야만 한다. 

launch, async  등이 적절한 예시이다. 

위의 lanchInIn 함수가 job을 반환하니 이것을 활용해도 된다. 

 

 

코루틴 문백(Context) 변경하기

 

 

 

Flow 연산자

 

 

 

Flow Zip And Combine

 

 

 

 

LiveData vs Flow
- LiveData, Flow 둘 다 데이터를 전송하기 위한 용도이다. LiveData 는 ViewModel에서 데이터 전송 및 통신의 목적으로 사용하고 있어서 메모리의 누수룰 막을 수 있고, 생명주기를 보장할 수 있다. Room의 경우에도 LiveData를 지원해주나, LiveData는 BackPressure 혹은 Thread 의 변경을 지원해주지 않는다.

- LiveData와 Flow는 서로 상호작용이 가능하다. LiveData는 최신 데이터를 View로 전달할 수 있고, Flow는 UseCase & Repository & DataSource 레이어들과 긴밀하게 작동하여 데이터를 수집하고, 서로다른 코루틴 범위에서 작동할 수 있다. 그래서 ViewModel - View 간에는 LiveData, Data 레이어에서는 Flow가 가장 이상적인 방법이라고 할 수 있다.

 

 

 

 

 

 


Channel


채널(Channel) 은 자료를 서로 주고 받기 위해 약속되 일종의 통료 역할을 합니다.
코루틴의 채널은 넌 블로킹 전송 개념으로 사용되고 있습니다.
채널을 구현할 때는 SendChannel과 ReceiveChannel 인터페이스를 이용해 값들의 스트림을 전송하는 방법을 제공합니다.
실제 전송에는 다음과 같이 지연 함수의 send()와 receive()함수를 사용합니다.

  • SendChannel의 suspend fun send(element:E)
  • ReceiveChannel의 suspend fun receive(:E)




1) send()와 receive() 함수로 채널 사용해 보기

fun main() = runBlocking<Unit> { val channel = Channel<Int>() // ===== 1. 일반적인 send()와 receive()의 역할 ===== launch { // 여기에 다량의 CPU 연산작업이나 비동기 로직을 둘 수 있다. for (x in 1..5) channel.send(x * x) } // 5개의 값을 채널로부터 받는다 repeat(5) { println(channel.receive()) } println("Done!") // ===== 2. close()로 닫고 반복문을 사용해 읽기 ===== launch { for (x in 1..5) channel.send(x * x) channel.close() // 모두 보내고 닫기 명시 } // for 루프를 사용해 끝까지 읽기 for (element in channel) println(element) println("Done!") }

채널을 통해 send() 함수로 값을 보내 놓으면 이후 receive() 함수를 통해 값을 받을 수 있습니다. 일반 큐와는 다르게 더이상 전달 요소가 없으면 채널을 닫을 수 있습니다. 보통 for문을 구성해 채널을 받고 close()를 사용하면 바로 채널을 닫는 것이 아니라 닫겠다는 특수한 토큰을 보냅니다.

여기서 보내는 쪽과 받는 쪽에 몇가지 중요한 상태가 있습니다. 송신자는 SendChannel에서 채널이 꽉 차있는지, 즉 isFull 값이 true

2) produce 생산자 소비자 패턴 구성하기

fun main() = runBlocking { val routine1 = GlobalScope.produce { delay(Random().nextInt(1000).toLong()) send("A") } val routine2 = GlobalScope.produce { delay(Random().nextInt(1000).toLong()) send("B") } // 먼저 수행된 것을 받게 된다. val result = select<String> { routine1.onReceive { result -> result } routine2.onReceive { result -> result } } println("Result was $result") }

 

3) 버퍼를 가진 채널 구성하기

fun main() = runBlocking<Unit> { val channel = Channel<Int>(3) // 버퍼 capacity 값을 준다 val sender = launch(coroutineContext) { // 송신자 측 repeat(10) { println("Sending $it") channel.send(it) // 지속적으로 보내다가 꽉 차면 일시 지연됨 } } // 아무것도 안받고 1초가 잠시 기다린 후 delay(1000) sender.cancel() // 송신자의 작업을 취소 한다. }



4) select 로 여러 루틴 처리하기

fun main() = runBlocking { val routine1 = GlobalScope.produce { delay(Random().nextInt(1000).toLong()) send("A") } val routine2 = GlobalScope.produce { delay(Random().nextInt(1000).toLong()) send("B") } // 먼저 수행된 것을 받게 된다. val result = select<String> { routine1.onReceive { result -> result } routine2.onReceive { result -> result } } println("Result was $result") }

 

 

 

 

참고

https://two22.tistory.com/16?category=1134189 

 

 

 

 

 
728x90
반응형