主页 > 电脑硬件  > 

Kotlin中RxJava用法

Kotlin中RxJava用法

RxJava 是一个基于观察者模式的响应式编程库,广泛用于处理异步事件流。Kotlin 与 RxJava 结合使用可以简化异步编程和事件处理。以下是一些常见的 RxJava 用法示例。 配置文件中引用:

implementation("io.reactivex.rxjava3:rxjava:3.0.2"); implementation("io.reactivex.rxjava3:rxandroid:3.0.2") // 如果你在 Android 项目中使用

各种用法代码如下:

fun main() { //1.各种创建 Observable.create<Int> { emitter -> for (i in 1..10) { emitter.onNext(i) } emitter.onComplete() }.subscribe( { item -> println(item) }, { error -> println(error) }, { println("complete") } )//打印:1 2 3 4 5 6 7 8 9 10 complete 共11行 //2.just创建 val observable = Observable.just(1, 2, 3, 4, 5) observable.subscribe { println(it) } //打印:1 2 3 4 5 共5行 //3.fromArray创建 val observable2 = Observable.fromArray(1, 2, 3, 4, 5) observable2.subscribe { println(it) } //打印:1 2 3 4 5 共5行 //4.range创建 val observable3 = Observable.range(1, 5) observable3.subscribe { println(it) } //打印:1 2 3 4 5 共5行 //5.interval创建 val observable4 = Observable.interval(1, TimeUnit.SECONDS) observable4.subscribe { println(it) } //每隔1秒打印一次数字 //6.timer创建 val observable5 = Observable.timer(1, TimeUnit.SECONDS) observable5.subscribe { println(it) } //1秒后打印数字0 //7.error创建 val observable6 = Observable.error<Throwable>(RuntimeException("error")) observable6.subscribe ({item->println(item)},{e -> println(e)} ) //打印:java.lang.RuntimeException: error //8.fromIterable创建 val list = listOf(1, 2, 3, 4, 5) val observable7 = Observable.fromIterable(list) observable7.subscribe { println(it) } //打印:1 2 3 4 5 共5行 // 二.操作符 //1.map 将发射的数据进行转换。 val observable8 = Observable.just(1, 2, 3, 4, 5) observable8.map { it * 2 }.subscribe { println(it) } //打印:2 4 6 8 10 共5行 //2.flatMap 将发射的数据进行转换,并且将转换后的数据合并后发射。 val observable9 = Observable.just(1, 2, 3, 4, 5) observable9.flatMap { Observable.just(it * 2) }.subscribe { println(it) } //打印:2 4 6 8 10 共5行 //3.concatMap 将发射的数据进行转换,并且将转换后的数据合并后发射,但是和flatMap不同的是,concatMap会按照发射的顺序来发射数据。 val observable10 = Observable.just(1, 2, 3, 4, 5) observable10.concatMap { Observable.just(it * 2) }.subscribe { println(it) } //打印:2 4 6 8 10 共5行 //4.concat 将多个Observable发射的数据按照顺序合并后发射。 val observable11 = Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6)) observable11.subscribe { println(it) } //打印:1 2 3 4 5 6 共6行 //5.merge 将多个Observable发射的数据合并后发射。 val observable12 = Observable.merge(Observable.just(1, 2, 3), Observable.just(4, 5, 6)) observable12.subscribe { println(it) } //打印:1 2 3 4 5 6 共6行 //6.zip 将多个Observable发射的数据按照顺序合并后发射,并且合并后的数据数量为发射的数据数量最少的Observable。 val observable13 = Observable.zip(Observable.just(1, 2, 3), Observable.just(4, 5, 6,7)){ a, b -> a + b } observable13.subscribe { println(it) } //打印:5 7 9 共3行 //7.filter 过滤发射的数据,只有满足条件的数据才会被发射。 val observable14 = Observable.just(1, 2, 3, 4, 5) observable14.filter { it % 2 == 0 }.subscribe { println(it) } //打印:2 4 共2行 //8.take 只发射前n个数据。 val observable15 = Observable.just(1, 2, 3, 4, 5) observable15.take(3).subscribe { println(it) } //打印:1 2 3 共3行 //9.skip 跳过前n个数据。 val observable16 = Observable.just(1, 2, 3, 4, 5) observable16.skip(3).subscribe { println(it) } //打印:4 5 共2行 //10.takeWhile 只发射满足条件的数据。 val observable17 = Observable.just(1, 2, 3, 4, 5) observable17.takeWhile { it < 4 }.subscribe { println(it) } //打印:1 2 3 共3行 //11.skipWhile 跳过满足条件的数据。 val observable18 = Observable.just(1, 2, 3, 4, 5) observable18.skipWhile { it < 4 }.subscribe { println(it) } //打印:4 5 共2行 //12.takeUntil 只发射不满足条件的数据。 val observable19 = Observable.just(1, 2, 3, 4, 5) observable19.takeUntil { it == 4 }.subscribe { println(it) } //打印:1 2 3 共3行 //13.takeLast 只发射最后n个数据。 val observable21 = Observable.just(1, 2, 3, 4, 5) observable21.takeLast(3).subscribe { println(it) } //打印:3 4 5 共3行 //14.distinct 去重,只发射第一次出现的数据。 val observable22 = Observable.just(1, 2, 3, 4, 5, 1, 2, 3) observable22.distinct().subscribe { println(it) } //打印:1 2 3 4 5 共5行 // 三.线程调度 //1.subscribeOn 指定Observable执行的线程。 val observable23 = Observable.just(1, 2, 3, 4, 5) observable23.subscribeOn(Schedulers.io()).subscribe { println(it) } //打印:1 2 3 4 5 共5行 //2.observeOn 指定Observer执行的线程。 val observable24 = Observable.just(1, 2, 3, 4, 5) observable24.observeOn(Schedulers putation()).subscribe { println(it) } //打印:1 2 3 4 5 共5行 // 四.错误处理 Observable.error<Throwable>(RuntimeException("Error occurred")) .subscribe( { println("OnNext: $it") }, { println("OnError: ${it.message}") }, { println("OnComplete") } ) //打印:OnError: Error occurred }

五.处理生命周期

class MyActivity : AppCompatActivity() { private val compositeDisposable = CompositeDisposable() override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) val disposable = Observable.just(1, 2, 3) .subscribe { println(it) } compositeDisposable.add(disposable) } override fun onDestroy() { super.onDestroy() compositeDisposable.clear() // 取消所有订阅 } }
标签:

Kotlin中RxJava用法由讯客互联电脑硬件栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Kotlin中RxJava用法