前言

本文介绍项目开发中使用到rxjava的情形,以及详细的代码。


一、rxjava是什么?

RxJava是一个基于事件流实现异步操作的框架,其作用是实现异步操作,类似于Android中的AsyncTask。它是在Java虚拟机(JVM)上使用可观测的序列来构建异步的、基于事件的程序。RxJava结合了观察者模式,迭代器模式和函数式的精华,最早由Netflix公司用于减少REST调用次数,后迁移到Java平台,并得到了广泛的应用。

RxJava的一些主要特点包括支持Java 8 Lambda表达式,支持异步和同步编程,具有单一依赖关系,以及简洁、优雅的代码风格。此外,RxJava还解决了“回调地狱”问题,异步处理不再需要回调一层套一层,而是用链式调用的方式完成不同线程的回调。

对于Android开发者来说,RxJava在开发过程中常与RxAndroid一同使用,RxAndroid是针对RxJava在Android平台上使用的响应式扩展组件。然而,尽管RxJava带来了编程上的便利,但其复杂性也使得一些开发者对其持有保留态度。

二、使用步骤

1.引入库

代码如下(示例):

implementation 'io.reactivex.rxjava2:rxjava:2.2.21'

2.rxjava复杂的异步处理

实现方式一
class MainActivity : AppCompatActivity() {
    private val subHandle = SubHandle()
 
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
 
        subHandle.mConsumer = null
        subHandle.connect().subscribe()
    }
 
    fun btRead(view: View) {
        subHandle.handleStatus {
            subHandle.read()
        }
    }
 
    /**
     * 刚进入页面就进行连接,点击按钮的时候,有几种状态:
     * 1、连接失败--重新开始连接,
     *      1.1 连接成功 --调阅读的方法
     *      1.2 连接失败 --UI进行提示失败
     * 2、连接中
     *      2.1 连接成功 --调阅读的方法
     *      2.2 连接失败 --UI进行提示失败
     * 3、连接成功 --调阅读的方法
     */
    class SubHandle {
        var mConsumer: ((Int) -> Unit)? = null
        private var status = AtomicInteger(-1) // 0连接失败 1正在连接中 2连接成功
        private var disposable: Disposable? = null
 
        fun connect(): Observable<Int> {
            status.set(1)
            Log.e("TAG", "=连接=")
            return Observable.interval(5, TimeUnit.SECONDS)
                .take(1)
                .map {
                    val random = Random(System.currentTimeMillis())
                    val randomNumber = random.nextInt(3) // 生成一个0到2之间的随机整数
                    Log.e("TAG", "==funA输出$randomNumber")
                    randomNumber
                }
                .subscribeOn(Schedulers.io())
                .doOnNext {
                    if (it == 2) {
                        status.set(2)
                        mConsumer?.invoke(status.get())
                    } else {
                        status.set(0)
                        Log.e("TAG", "连接阅读器失败,给UI提示")
                    }
                }
        }
 
        fun handleStatus(consumer: (Int) -> Unit) {
            mConsumer = consumer
            when (status.get()) {
                0 -> {
                    Log.e("TAG", "连接失败过,正重试连接")
                    disposable?.dispose()
                    disposable = connect().subscribe()
                }
                1 -> Log.e("TAG", "正在连接")
                2 -> mConsumer?.invoke(status.get())
            }
        }
 
        fun read() {
            Log.e("TAG", "开始阅读")
        }
    }
}
实现方式二
class MainActivity : AppCompatActivity() {
    private var canRead = false
    private var connectStatus = 0 //1 代表 SUCC, 2 代表 FAIL, 0 代表 CONNECTING

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        connect()
    }

    private fun connect() {
        Log.e("TAG", "=连接=")
        Thread(Runnable {
            Thread.sleep(5000) // 休眠5秒钟

            Observable.just(randomStatus())
                .doOnNext { connectStatus = it }
                .filter {
                    Log.e("TAG", "it状态" + it)
                    it == 1 && canRead
                }
                .subscribeOn(Schedulers.io())
                .doOnNext { read() }
                .subscribe()

        }).start()

    }

    fun btRead(view: View) {
        canRead = true
        Log.e("TAG", "点击按钮" + connectStatus)
        when (connectStatus) {
            1 -> read() //  1 代表 SUCC
            2 -> connect() //  2 代表 FAIL
            else -> {}
        }
    }

    private fun read() {
        Log.e("TAG", "开始阅读")
    }

    private fun randomStatus(): Int {
        val random = Random(System.currentTimeMillis())
        return random.nextInt(3)  //生成一个0到2之间的随机整数
    }
}

3、连续多个弹窗的处理

使用rxjava实现:

class MainActivity : AppCompatActivity() {
    private val compositeDisposable = CompositeDisposable()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

    }

    @SuppressLint("CheckResult")
    fun btRead(view: View) {
        Log.e("TAG", "jjjjjj")
        showFirstDialog()
            .flatMap { showSecondDialog() }
            .flatMap { showThirdDialog() }.subscribe({
                Log.e("TAG", "3个弹窗都选了确定")
            }, { error ->
                Log.e("TAG", "点击了取消$error")
            })
    }

    private fun showFirstDialog(): Observable<Unit> {
        return Observable.create<Unit> { emitter ->
            val dialog = AlertDialog.Builder(this)
                .setMessage("第一个弹窗")
                .setPositiveButton("确定") { _, _ ->
                    emitter.onNext(Unit) // 发送事件,表示点击了确定按钮
                }
                .setNegativeButton("取消") { _, _ ->
                    emitter.onError(Throwable("1取消")) // 发送错误事件,表示点击了取消按钮
                }
                .setOnCancelListener {
                    emitter.onError(Throwable("1取消")) // 发送错误事件,表示点击了返回键
                }
                .create()
            dialog.show()
            emitter.setCancellable { dialog.dismiss() } // 在取消订阅时关闭弹窗
        }
    }

    private fun showSecondDialog(): Observable<Unit> {
        return Observable.create<Unit> { emitter ->
            val dialog = AlertDialog.Builder(this)
                .setMessage("第二个弹窗")
                .setPositiveButton("确定") { _, _ ->
                    emitter.onNext(Unit)
                }
                .setNegativeButton("取消") { _, _ ->
                    emitter.onError(Throwable("2取消"))
                }
                .setOnCancelListener {
                    emitter.onError(Throwable("2取消"))
                }
                .create()
            dialog.show()
            emitter.setCancellable { dialog.dismiss() }
        }
    }

    private fun showThirdDialog(): Observable<Unit> {
        return Observable.create<Unit> { emitter ->
            val dialog = AlertDialog.Builder(this)
                .setMessage("第三个弹窗")
                .setPositiveButton("确定") { _, _ ->
                    emitter.onNext(Unit)
                }
                .setNegativeButton("取消") { _, _ ->
                    emitter.onError(Throwable("3取消"))
                }
                .setOnCancelListener {
                    emitter.onError(Throwable("3取消"))
                }
                .create()
            dialog.show()
            emitter.setCancellable { dialog.dismiss() }
        }
    }

}

协程实现:

fun btRead(view: View) {
        lifecycleScope.launch {
            try {
                showAlertDialog(this@MainActivity, "提示1", "第一个弹窗")
                showAlertDialog(this@MainActivity, "提示1", "第二个弹窗")
                showAlertDialog(this@MainActivity, "提示1", "第三个弹窗")
            } catch (e: Exception) {
                Log.e("showAlertDialog", "2222111发生异常")
            }
        }
    }

    private suspend fun showAlertDialog(context: Context, title: String, message: String): Boolean =
        suspendCancellableCoroutine { ctn ->
            val activityRef = WeakReference(context as MainActivity)
            val alertDialog = AlertDialog.Builder(context)
                .setTitle(title)
                .setMessage(message)
                .setPositiveButton("确定") { dialog, _ ->
                    // 点击确定按钮的逻辑处理
                    dialog.dismiss()
                    activityRef.get()?.let {
                        ctn.resume(true) {}
                    }
                }
                .setNegativeButton("取消") { dialog, _ ->
                    // 点击取消按钮的逻辑处理
                    dialog.dismiss()
                    activityRef.get()?.let {
                        ctn.resumeWithException(Exception(message + "取消"))
                    }
                }
                .setOnCancelListener {
                    activityRef.get()?.let {
                        ctn.resumeWithException(Exception("蒙层取消"))
                    }
                }.create()
            alertDialog.show()
        }

总结

RxJava是一个基于Java语言的Reactive Extensions库,它用于实现异步编程和流式处理,通过将事件和数据流以数据序列的形式进行处理,提高了代码的可读性和可维护性。

01-12 15:37