中间流操作符

map

映射。看(传入请求流可以使用 map 操作符映射到结果,即使执行请求是由挂起函数实现的长时间运行的操作)

suspend fun performRequest(request: Int): String {

delay(1000) // 模拟长时间的异步工作

return “response $request”

}

fun main() = runBlocking {

(1…3).asFlow() // 转换为flow

.map { request -> performRequest(request) }

.collect { response -> println(response) }

}

结果

response 1

response 2

response 3

filter

过滤操作,看

suspend fun performRequest(request: Int): String {

delay(1000) // 模拟长时间的异步工作

return “response $request”

}

fun main() = runBlocking {

(1…3).asFlow() // 转换为flow

.map { request -> performRequest(request) }

.filter { it == “response 1” }

.collect { response -> println(response) }

}

结果:仅返回匹配到的值

response 1

变换操作符

在流变换算子中,最通用的一种叫做变换。 它可以用来模仿简单的转换,比如 map 和 filter,也可以实现更复杂的转换。 使用transform,我们可以发出任意次数的任意值。

例如,使用transform,我们可以在执行长时间运行的异步请求之前发出一个字符串,并在其后响应:

(1…3).asFlow() // a flow of requests

.transform { request ->

emit(“Making request $request”)

emit(performRequest(request))

}

.collect { response -> println(response) }

结果

Making request 1

response 1

Making request 2

response 2

Making request 3

response 3

size限制操作符

顾名思义,限制收集的数量。使用运算符take

它会在判断当发射的值在达到相应限制时取消流程的执行。 因为协程中的取消总是通过抛出异常来执行。所以需要考虑进行相应的异常捕获来保证后续的流畅正常进行不被取消掉。

fun numbers(): Flow = flow {

try {

emit(1)

emit(2)

println(“not execute”)

emit(3)

} finally {

println(“finally in numbers”)

}

}

fun main() = runBlocking {

numbers()

.take(2) // take 两个值

.collect { value -> println(value) }

}

结果

1

2

finally in numbers

终端操作符

终端操作符可以启动一个流,最基础的就是上述常提到的collect。但是还有一些其他的终端操作符,它可能会让一些操作变得更简单:

转换为各种集合, toList 和 toSet。

flowOf(1,2).toList().forEach {

println(it)

}

flowOf(1,2).toSet().forEach(::println)

first , 确保获取且进获取第一个值

返回流发出的第一个元素然后取消流的集合的终端运算符。 如果流为空,则抛出 NoSuchElementException。

val value : Int = flowOf(1, 2).first()

println(value)

使用 reduce 和 fold 将流合并到一个值。

val sum = (1…5).asFlow()

.map { it * it } //平方

.reduce { a, b -> a + b } // 进行累加

println(sum)

//结果

55

fold和reduce使用起来差不多,区别就是fold可以定义初始化,其实很简单,reduce传入的lambda前一个参数是每次计算的结果累计,后一个参数是当前需要传入的值,不明白可以去瞅一眼源码,这里不在引申。

onEach

这个操作符也较为常用,这里也介绍一下,返回在上游流的每个值向下游发出之前调用给定操作的流。

(1…5)

.asFlow()

.onEach {

println(“onEach$it”)

}.collect()

//结果

onEach1

onEach2

onEach3

onEach4

onEach5

操作符的顺序

除非使用对多个流进行操作的特殊运算符,否则流的每个单独集合都按顺序执行。 该集合直接在调用终端运算符的协程中工作。 默认情况下不会启动新的协程。 每个发出的值都由从上游到下游的所有中间操作符处理,然后交付给终端操作符。

(1…5).asFlow()

.filter {

println(“Filter $it”)

it % 2 == 0

}

.map {

println(“Map $it”)

“string $it”

}.collect {

println(“Collect $it”)

}

//结果 按照顺序没有值依次向下发射

Filter 1

Filter 2

Map 2

Collect string 2

Filter 3

Filter 4

Map 4

Collect string 4

Filter 5

Flow 调度器切换

对于UI驱动型的程序来说,需要将长时间计算的任务放在异步线程处理,UI展示工作需要放在主线程处理。也就是说需要将构建器的代码放到异步线程执行,但是终端操作符,比如collect需要在主线程获取,那么怎么做呢?

使用flowOn操作符

在这之前您可能的了解一下,协程的调度器。可以简单参考之前写的一篇文章,有对调度器做简单介绍:https://blog.csdn.net/weixin_44235109/article/details/119981210

fun main() = runBlocking {

flow {

for (i in 1…3) {

//模拟异步处理

delay(100)

log(“Emitting $i”)

emit(i) // emit next value

}

}.flowOn(Dispatchers.Default)//使用flowOn传入Default的调度器

.collect { value ->

log(“Collected $value”)

}

}

结果

16:39:21:954 [DefaultDispatcher-worker-1] Emitting 1

16:39:21:969 [main] Collected 1

16:39:22:071 [DefaultDispatcher-worker-1] Emitting 2

16:39:22:071 [main] Collected 2

16:39:22:178 [DefaultDispatcher-worker-1] Emitting 3

16:39:22:178 [main] Collected 3

可以很明显的看出,构建模块被调度到异步线程处理了。而收集的工作还在主线程进行。

flowOn负责构建的模块调度,那么收集的谁负责呢?

其实和异常处理类似,collect受调用它的协程上下文限制,所以最后的执行线程和当前协程上下文的调度器有关,目前我使用的是idea测试的,默认runBlocking的调度器就是主线程。如果是android上面的话,runBlocking可能就需要传入Dispatchers.Main了。

其实和RxJava还是非常相似的。

注意一点,此时其实已经改变流执行的顺序了。

官方的解释如下:

Another thing to observe here is that the flowOn operator has changed the default sequential nature of the flow. Now collection happens in one coroutine (“coroutine#1”) and emission happens in another coroutine (“coroutine#2”) that is running in another thread concurrently with the collecting coroutine. The flowOn operator creates another coroutine for an upstream flow when it has to change the CoroutineDispatcher in its context.

这里要注意的另一件事是 flowOn 运算符更改了流的默认顺序性质。 现在收集发生在一个协程(“coroutine#1”)中,发射发生在另一个协程(“coroutine#2”)中,该协程与收集协程同时运行在另一个线程中。 当必须在其上下文中更改 CoroutineDispatcher 时, flowOn 运算符为上游流创建另一个协程。

这一块我简单看了一下源码,这里面不同的调度器会遇到多线程的问题,最里面使用了channel进行了调度处理。具体的核心类是ChannelFlow。后面会对flow进行简单源码分析,但篇幅有限不对这一块过深入分析,感兴趣可以自行查看,或找博主私下探讨。

其实上面的看不出来会改变流执行的顺序,下面改变一下代码,验证一下看看

fun main() = runBlocking {

flow {

for (i in 1…3) {

//模拟异步处理

delay(100)

log(“Emitting $i”)

emit(i) // emit next value

}

}.flowOn(Dispatchers.Default)

.collect { value ->

delay(200)

log(“Collected $value”)

}

}

结果

16:52:33:258 [DefaultDispatcher-worker-1] Emitting 1

16:52:33:386 [DefaultDispatcher-worker-1] Emitting 2

16:52:33:482 [main] Collected 1

16:52:33:493 [DefaultDispatcher-worker-1] Emitting 3

16:52:33:684 [main] Collected 2

16:52:33:887 [main] Collected 3

我们只需要将,collect里面增加一个delay即可,发现其实这时候就是发射归发射,收集归收集了。不似上面我们写的程序,发射一个值只有到终端操作符之后才会发射第二个。这里面肯定就会对值进行缓存。那么就会牵扯到一个问题。老生常谈‍♀️,背压处理!

Flow 背压处理

对于背压处理,Kotlin 提供三种解决方案:

| 操作符 | 含义 |

| — | — |

| buffer | 指定固定容量缓存 |

| conflate | 保留最新的值 |

| collectLatest | 新值发送时取消之前的 |

buffer

这里有必要看一下buffer函数的源码定义

public fun Flow.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND)

可以看出需要两个参数,都有默认值。

第一个好理解容量,默认等于BUFFERED,这个值其实时64,可以按照使用时需要自己指定具体的数字。 第二个是指定,当buffer溢出时的操作,默认的操作是挂起。还要两个操作分别是删除最旧的值不要挂起或者删除当前最新的值不要挂起,可以自行查看源码,这里不再引申。

使用

flow {

for (i in 1…3) {

//模拟异步处理

delay(100)

log(“Emitting $i”)

emit(i) // emit next value

}

}.flowOn(Dispatchers.Default)

.buffer()

.collect { value ->

delay(200)

log(“Collected $value”)

}

//结果

17:17:28:420 [DefaultDispatcher-worker-1] Emitting 1

17:17:28:536 [DefaultDispatcher-worker-1] Emitting 2

17:17:28:646 [main] Collected 1

17:17:28:646 [DefaultDispatcher-worker-1] Emitting 3

17:17:28:846 [main] Collected 2

17:17:29:049 [main] Collected 3

conflate

这个只获取最新值也比较好理解,应用场景,比如说获取下载进度,对于用户来说其实每次只需要获取当前最新的进度就好了,不需要把之前的值再去获取一遍,下面也举一个例子

fun main() = runBlocking {

flow {

for (i in 1…3) {

//模拟异步处理

delay(100)

log(“Emitting $i”)

emit(i) // emit next value

}

}.flowOn(Dispatchers.Default)

.conflate()

.collect { value ->

delay(300)//模拟下游处理比较慢

log(“Collected $value”)

}

}

//结果 第一个值肯定可以拿到 当地一个值处理完成之后 最新的值就是3了 所以丢弃了2

17:21:42:916 [DefaultDispatcher-worker-1] Emitting 1

17:21:43:034 [DefaultDispatcher-worker-1] Emitting 2

17:21:43:140 [DefaultDispatcher-worker-1] Emitting 3

17:21:43:236 [main] Collected 1

17:21:43:546 [main] Collected 3

collectLatest

说明一点:这玩意其实也是一个终端操作符

前两个可能都比较好理解,那新值发送时取消之前的是什么意思呢?为了便于理解,直接上例子,按照结果说明:

fun main() = runBlocking {

flow {

for (i in 1…3) {

//模拟异步处理

delay(100)

log(“Emitting $i”)

emit(i) // emit next value

}

}.flowOn(Dispatchers.Default)

.collectLatest {

delay(300)

log(“Collected $it”)

}

}

//结果

17:25:33:916 [DefaultDispatcher-worker-1] Emitting 1

17:25:34:038 [DefaultDispatcher-worker-1] Emitting 2

17:25:34:150 [DefaultDispatcher-worker-1] Emitting 3

17:25:34:453 [main] Collected 3

对比上面的例子,这里只是将collect替换成了collectLatest而已。为什么1没有了呢?

显而易见了,这玩意会在最新的到来会直接取消下游上一个消费的处理,因为有delay所以1还没有来得及打印,就因为下一个值发射了,然后就被取消了!!!您可真霸道呢?‍♀️

Flow 异常处理

当操作符内的发射器或代码抛出异常时,流收集可以以异常结束。 有几种方法可以处理这些异常。

try…catch

较为简单,上

fun simple(): Flow = flow {

for (i in 1…3) {

println(“Emitting $i”)

emit(i) // emit next value

}

}

fun main() = runBlocking {

try {

simple().collect { value ->

println(value)

check(value <= 1) { “Collected $value” }

}

} catch (e: Throwable) {

println(“Caught $e”)

}

}

//结果

fun simple(): Flow = flow {

for (i in 1…3) {

println(“Emitting $i”)

emit(i) // emit next value

}

}

fun main() = runBlocking {

try {

simple().collect { value ->

println(value)

if (value>1){

throw IllegalStateException(“exception value is $value”)

}

}

} catch (e: Throwable) {

println(“Caught $e”)

}

}

显而易见,抛出异常之后,收集结束,如果是UI驱动程序比如:Android还是推荐主从作用域,异常不会向上传播。

思考一个问题,刚刚异常是发生在收集端,如果在构建的时候发生异常呢?

fun simple(): Flow =

flow {

for (i in 1…3) {

println(“Emitting $i”)

emit(i) // emit next value

}

}

.map { value ->

if (value>1){

throw IllegalStateException(“exception value is $value”)

}

“string $value”

}

fun main() = runBlocking {

try {

simple().collect { value -> println(value) }

} catch (e: Throwable) {

println(“Caught $e”)

}

}

//结果

Emitting 1

string 1

Emitting 2

Caught java.lang.IllegalStateException: exception value is 2

完美:异常仍被捕获并停止收集

上述代码的问题就是不够优雅,还有异常对于流来说必须是透明的,使用try … catch 显然违反了透明性,所以Kotlin 封装了try catch.

catch 相关运算符

catch 运算符的主体可以分析异常并根据捕获的异常以不同的方式对其做出反应:

可以使用 throw 重新抛出异常。 可以使用 catch 主体中的发射将异常转换为值的发射。 异常可以被其他代码忽略、记录或处理。

看

simple()

.catch { e -> emit(“Caught $e”) } // 我不仅捕获到了异常,我还能继续发射!!!!

.collect { value -> println(value) }

结果

Emitting 1

string 1

Emitting 2

Caught java.lang.IllegalStateException: exception value is 2

当然因为抛出了异常,协程还是会终止,但是此时异常是以发生的形式传递下去的。

注意:catch 中间运算符,尊重异常透明性,只捕获上游异常(即来自 catch 之上的所有运算符的异常,但不在其之下)。 如果 collect { … } 中的块(位于 catch 下方)抛出异常,则它不会捕获:

fun simple(): Flow = flow {

for (i in 1…3) {

println(“Emitting $i”)

emit(i)

}

}

fun main() = runBlocking {

simple()

.catch { e -> println(“Caught $e”) } // 不会捕获到下游的异常

.collect { value ->

if(value <= 1) throw IllegalStateException(“Collected $value”)

println(value)

}

}

//

Emitting 1

1

Emitting 2

Exception in thread “main” java.lang.IllegalStateException: Collected 2

at org.example.zxf.kotlin11.flow.TestFlowKt$main

1

1

1invokeSuspend$

i

n

l

i

n

e

d

inlined

inlinedcollect$1.emit(Collect.kt:133)

这个时候可以利用onEach将需要可能捕获异常的地方前置,如下所示。

simple()

.onEach { value ->

check(value <= 1) { “Collected $value” }

println(value)

}

.catch { e -> println(“Caught $e”) }

.collect()

onCompletion 相关操作

我们知道和try … catch 搭配的 还有一个finally。所以 flow当然也有一个差不多的了,叫onCompletion

fun simple(): Flow = flow {

for (i in 1…3) {

println(“Emitting $i”)

emit(i)

}

}

fun main() = runBlocking {

simple()

.onCompletion { println(“Done”) }

.collect { value -> println(value) }

}

//结果

Emitting 1

1

Emitting 2

2

Emitting 3

3

Done

抛个异常试试,onCompletion可以通过cause进行判断是否是正常结束.

fun simple(): Flow = flow {

emit(1)

throw RuntimeException()

emit(2)

}

fun main() = runBlocking {

simple()

.onCompletion { cause ->

if (cause != null) {

println(“Flow completed exceptionally”)

}

}

.catch { cause -> println(“Caught exception”) }

.collect { value -> println(value) }

}

//结果

1

Flow completed exceptionally

Caught exception

与 catch 运算符的另一个区别是 onCompletion 会看到所有异常,并且仅在成功完成上游流(没有取消或失败)时才会收到空异常。

fun simple(): Flow = (1…3).asFlow()

fun main() = runBlocking {

simple()

.onCompletion { cause -> println(“Flow completed with $cause”) }

.collect { value ->

if (value>1){

throw IllegalStateException(“exception value is $value”)

}

println(value)

}

}

//结果

1

Flow completed with java.lang.IllegalStateException: exception value is 2

Exception in thread “main” java.lang.IllegalStateException: exception value is 2

注意一点:虽然看到了,但是并没有进行捕获。异常还是抛出了。

Flow 启动

最后提一点和启动相关的,先看下面两个例子:

fun events(): Flow = (1…3).asFlow().onEach { delay(100) }

fun main() = runBlocking {

events()

.onEach { event -> println(“Event: $event”) }

.collect() // <— collect 挂起函数阻塞

println(“Done”)

}

//结果

Event: 1

Event: 2

Event: 3

Done

对于上述,没有异议吧,Done的打印需要等待collect恢复,因为在一个协程内。如果需要不等待呢?那就需要另起一个协程了

fun main() = runBlocking {

launch {

events()

.onEach { event -> println(“Event: $event”) }

.collect() // <— collect 挂起函数阻塞

}

println(“Done”)

}

//结果

Done

Event: 1

Event: 2

Event: 3

但是上述写法可以等价于,下面的写法,flow这玩意封装了另一种启动方式

fun main() = runBlocking {

events()

.onEach { event -> println(“Event: $event”) }

.launchIn(this)

println(“Done”)

}

//结果

Done

Event: 1

Event: 2

Event: 3

看一下launchIn源码!

public fun Flow.launchIn(scope: CoroutineScope): Job = scope.launch {

collect() // tail-call

}

好吧,其实不就是封装了一下嘛。主要是源码好多地方使用launchIn来着,所以这里提一下‍♀️。

Flow原理解析

好吧,激动人心的时刻终于来了,做一些源码分析。

先写一个小

fun main() = runBlocking {

flow {

emit(1)

emit(2)

emit(3)

}.collect {

println(it)

}

}

好了,我们就分析这玩意怎么实现的!往深篇幅有限写不下了。如果您想深入交流,欢迎私信交流。

所以,我先列出来flow函数构建的源码,只贴出核心代码:

public fun flow(@BuilderInference block: suspend FlowCollector.() -> Unit): Flow = SafeFlow(block)

// Named anonymous object

private class SafeFlow(private val block: suspend FlowCollector.() -> Unit) : AbstractFlow() {

override suspend fun collectSafely(collector: FlowCollector) {

collector.block()

}

}

可以看出flow构建了一个SafeFlow将block传入。并且重写了collectSafely方法,调用了block()。这里需要注意一点SafeFlow继承自AbstractFlow.

好,我们在看一调用collect之后发生了什么。

public suspend inline fun Flow.collect(crossinline action: suspend (value: T) -> Unit): Unit =

collect(object : FlowCollector {

override suspend fun emit(value: T) = action(value)

})

public abstract class AbstractFlow : Flow, CancellableFlow {

public final override suspend fun collect(collector: FlowCollector) {

val safeCollector = SafeCollector(collector, coroutineContext)

try {

collectSafely(safeCollector)

} finally {

safeCollector.releaseIntercepted()

}

}

public abstract suspend fun collectSafely(collector: FlowCollector)

}

看的出,这时将collect的block封装成了FlowCollector类型(重写emit方法用于执行block),此时会调用到SafeFlow的collect方法,具体在AbstractFlow里面实现。这里面干了什么呢?看的出,创建了SafeCollector类型的东西,然后将SafeCollector传递给了collectSafely。而在上面的分析中,我们知道collectSafely在构建flow时进行了实现,SafeCollector作为receiver以扩展函数的形式调用了,flow构建器的block(这里面执行了,我们手动的emit操作)。这么一看是不是有点联系起来了

好的,接下来我们继续分析一下,SafeCollector,那这玩意的源码其实也是有点多的,我们应该看哪一个函数呢?通过上面的分析可知,调用collect之后,会调用到SafeFlow的collect方法,进而会以扩展函数的形式调用到flow的block,而在block里面就是我们自己写的emit了,所以receiver既然是SafeCollector,那肯定就是调用SafeCollector的emit了,我们去瞅一瞅SafeCollector的emit函数相关的调用链:

override suspend fun emit(value: T) {

try {

emit(uCont, value)

} catch (e: Throwable) {

}

}

private fun emit(uCont: Continuation, value: T): Any? {

return emitFun(collector as FlowCollector, value, this as Continuation)

}

private val emitFun =

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数初中级Android工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则近万的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年Android移动开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Android开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!

如果你觉得这些内容对你有帮助,可以扫码获取!!(备注:Android)

最后

针对于上面的问题,我总结出了互联网公司Android程序员面试涉及到的绝大部分面试题及答案,并整理做成了文档,以及系统的进阶学习视频资料。 (包括Java在Android开发中应用、APP框架知识体系、高级UI、全方位性能调优,NDK开发,音视频技术,人工智能技术,跨平台技术等技术资料),希望能帮助到你面试前的复习,且找到一个好的工作,也节省大家在网上搜索资料的时间来学习。

《互联网大厂面试真题解析、进阶开发核心学习笔记、全套讲解视频、实战项目源码讲义》点击传送门即可获取!

的分析可知,调用collect之后,会调用到SafeFlow的collect方法,进而会以扩展函数的形式调用到flow的block,而在block里面就是我们自己写的emit了,所以receiver既然是SafeCollector,那肯定就是调用SafeCollector的emit了,我们去瞅一瞅SafeCollector的emit函数相关的调用链:

override suspend fun emit(value: T) {

try {

emit(uCont, value)

} catch (e: Throwable) {

}

}

private fun emit(uCont: Continuation, value: T): Any? {

return emitFun(collector as FlowCollector, value, this as Continuation)

}

private val emitFun =

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数初中级Android工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则近万的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年Android移动开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。

[外链图片转存中…(img-2jSZmwNR-1712663535102)]

[外链图片转存中…(img-vsXU7Bjy-1712663535103)]

[外链图片转存中…(img-5bqXT1wX-1712663535103)]

[外链图片转存中…(img-jxnPpf8m-1712663535103)]

[外链图片转存中…(img-iRgeCfEk-1712663535103)]

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Android开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!

如果你觉得这些内容对你有帮助,可以扫码获取!!(备注:Android)

最后

针对于上面的问题,我总结出了互联网公司Android程序员面试涉及到的绝大部分面试题及答案,并整理做成了文档,以及系统的进阶学习视频资料。 (包括Java在Android开发中应用、APP框架知识体系、高级UI、全方位性能调优,NDK开发,音视频技术,人工智能技术,跨平台技术等技术资料),希望能帮助到你面试前的复习,且找到一个好的工作,也节省大家在网上搜索资料的时间来学习。

[外链图片转存中…(img-rV1UYXST-1712663535104)]

《互联网大厂面试真题解析、进阶开发核心学习笔记、全套讲解视频、实战项目源码讲义》点击传送门即可获取!

好文推荐

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。