Преглед изворни кода

refactor(优化):
- 转为协程

周文健 пре 1 година
родитељ
комит
1bd93a7fdf

+ 0 - 1
app/src/main/java/com/grkj/iscs/modbus/MBFrame.kt

@@ -6,7 +6,6 @@ import com.grkj.iscs.extentions.crc16
  * ModBus 数据帧
  */
 class MBFrame(
-
     // 类型
     val type: Byte,
     // 数据域:D1 和 D2

+ 97 - 142
app/src/main/java/com/grkj/iscs/modbus/ModBusManager.kt

@@ -3,106 +3,86 @@ package com.grkj.iscs.modbus
 import com.google.gson.Gson
 import com.google.gson.reflect.TypeToken
 import com.grkj.iscs.MyApplication
-import com.grkj.iscs.extentions.crc16
 import com.grkj.iscs.extentions.toHexStrings
 import com.grkj.iscs.util.Executor
 import com.grkj.iscs.util.SPUtils
-import com.grkj.iscs.util.jvmSeconds
 import com.grkj.iscs.util.log.LogUtil
 import com.grkj.iscs.view.fragment.DockTestFragment
-import java.util.concurrent.LinkedBlockingQueue
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.Channel
 
 /**
- * ModBus 协议管理器
+ * ModBus 协议管理器(协程版)
+ * 逻辑与原版一致:按顺序从队列取任务,发送请求,并在回调中匹配响应。
  */
 class ModBusManager(
-    // 串口管理器
+    // 底层串口管理器
     val portManager: PortManager,
     // 是否输出详细信息
     val verbose: Boolean = false
 ) {
+    @Volatile private var running = true
 
-    @Volatile
-    private var running = true
+    /** 正在发送的任务 */
+    @Volatile private var sending: FrameTask? = null
 
-    /**
-     * 正在发送的任务
-     */
-    @Volatile
-    private var sending: FrameTask? = null
-
-    /**
-     * 等待发送队列
-     */
-    private val pendings = LinkedBlockingQueue<FrameTask>()
+    /** 等待发送队列 */
+    private val pendings = Channel<FrameTask>(Channel.UNLIMITED)
 
-    /**
-     * 线程锁
-     */
+    /** 锁保护 sending */
     private val lock = Any()
 
+    private var job: Job? = null
 
-    private var thread: Thread? = null
-
+    /** 从机地址池 */
     var mSlaveAddressList = mutableListOf<Byte>()
 
-
     init {
+        // 串口监听,回调在单独线程中执行
         portManager.listen { res ->
-            if (verbose) {
-                LogUtil.i("接收:${res.toHexStrings()}")
-            }
+            if (verbose) LogUtil.i("接收:${res.toHexStrings()}")
             synchronized(lock) {
                 sending?.run {
                     if (match(res) && running) {
-                        done?.let { it(res) }
+                        done?.invoke(res)
                         sending = null
                     } else {
-                        LogUtil.w("响应: ${res.toHexStrings()}未匹配, running:${running}")
+                        LogUtil.w("响应: ${res.toHexStrings()} 未匹配, running: $running")
                     }
                 }
             }
         }
         // 初始化地址池
-        val dockConfigJson = SPUtils.getDockConfig(MyApplication.instance?.applicationContext!!)
-        if (!dockConfigJson.isNullOrEmpty()) {
-            val tempList: MutableList<DockTestFragment.DockTestBean> =
-                Gson().fromJson(
-                    dockConfigJson,
-                    object : TypeToken<MutableList<DockTestFragment.DockTestBean>>() {}.type
-                )
-            mSlaveAddressList.addAll(tempList.map { it.address })
-        }
+        SPUtils.getDockConfig(MyApplication.instance?.applicationContext!!)
+            ?.takeIf { it.isNotEmpty() }
+            ?.let { json ->
+                val type = object : TypeToken<List<DockTestFragment.DockTestBean>>() {}.type
+                val list: List<DockTestFragment.DockTestBean> = Gson().fromJson(json, type)
+                mSlaveAddressList.addAll(list.map { it.address })
+            }
     }
 
     /**
-     * 发送队列的消息
+     * 从队列中取任务并发送
      */
-    private fun takePendingToSend() {
+    private suspend fun takePendingToSend() {
         if (sending == null) {
-            sending = pendings.take()
-        }
-        if (!running) {
-            return
-        }
-        sending?.run {
-            waitIfNecessary()
+            sending = pendings.receive()
         }
+        if (!running) return
+        sending?.waitIfNecessary()
         synchronized(lock) {
             sending?.run {
                 if (shouldSend()) {
                     if (portManager.send(req)) {
                         afterSent()
-                        if (verbose) {
-                            LogUtil.i("发送:${req.toHexStrings()}")
-                        }
+                        if (verbose) LogUtil.i("发送:${req.toHexStrings()}")
                     } else {
                         LogUtil.w("无法与主控板通讯")
                     }
                 } else {
                     LogUtil.i("未响应: ${req.toHexStrings()}")
-                    // 放弃处理,回调空数据
-                    done?.let { it(byteArrayOf()) }
+                    done?.invoke(byteArrayOf())
                     sending = null
                 }
             }
@@ -110,129 +90,104 @@ class ModBusManager(
     }
 
     /**
-     * 循环发送给所有从机
-     * @param frame 发送报文
-     * @param listener 每轮发送完后的数据监听
-     * @param delayMills 每轮发送的间隔
+     * 开始通信(协程替代线程)
      */
-    fun repeatSendToAll(
-        frame: MBFrame,
-        interrupt: (() -> List<Boolean>)? = null,
-        listener: (res: List<ByteArray>) -> Unit,
-        delayMills: Long
-    ): ModBusManager {
-        val keep = interrupt?.invoke()?.run { !this[0] } ?: false
-        if (keep) {
-            sendToAll(frame) {
-                if (running) {
-                    listener(it)
-                    Executor.delayOnIO({
-                        if (running) {
-                            repeatSendToAll(frame, interrupt, listener, delayMills)
-                        }
-                    }, delayMills)
+    fun start() {
+        job = CoroutineScope(Dispatchers.IO).launch {
+            while (running) {
+                try {
+                    takePendingToSend()
+                } catch (_: CancellationException) {
+                    break
                 }
             }
-        } else {
-            Executor.delayOnIO({
-                if (running) {
-                    repeatSendToAll(frame, interrupt, listener, delayMills)
-                }
-            }, delayMills)
         }
-        return this
     }
 
-
     /**
-     * 发送给所有从机
-     * @param frame 发送报文
-     * @param done 所有从机都发送完成后的回调
+     * 停止运行并关闭串口
      */
-    fun sendToAll(frame: MBFrame, done: ((res: List<ByteArray>) -> Unit)? = null) {
-        if (mSlaveAddressList.size == 0) {
-            done?.let { it(listOf()) }
-            return
-        }
-        sendUp(0, frame, done, ArrayList())
+    fun stop() {
+        running = false
+        job?.cancel()
+        portManager.close()
     }
 
-    private fun sendUp(
-        index: Int,
-        frame: MBFrame,
-        done: ((res: List<ByteArray>) -> Unit)?,
-        resList: ArrayList<ByteArray>
-    ) {
-        sendTo(mSlaveAddressList[index], frame) { res ->
-            resList.add(res)
-            if (index >= mSlaveAddressList.size - 1) {
-                // 已经发送完
-                if (running) {
-                    done?.let { it(resList) }
-                }
-            } else {
-                // 发送给下一个从机
-                sendUp(index + 1, frame, done, resList)
-            }
-        }
-    }
+    /**
+     * 是否运行
+     */
+    fun isRunning(): Boolean = running
 
     /**
-     * 发送给序号为 index 的从机
-     * @param slaveAddress 从机地址
-     * @param frame 发送报文
-     * @param done 完成回调
+     * 提交单个请求任务
      */
     fun sendTo(
         slaveAddress: Byte,
         frame: MBFrame,
         allowRetransmission: Boolean = true,
         minSendIntervalNanoSeconds: Int = MODBUS_MIN_SEND_INTERVAL,
-        done: ((res: ByteArray) -> Unit)? = null
+        done: ((ByteArray) -> Unit)? = null
     ) {
-        if (mSlaveAddressList.size <= 0) {
-            LogUtil.i("sendTo($slaveAddress), 地址池大小为0, 返回空数据")
+        if (mSlaveAddressList.isEmpty()) {
+            LogUtil.i("sendTo($slaveAddress), 地址池为空, 返回空数据")
             done?.invoke(byteArrayOf())
             return
         }
-        if (mSlaveAddressList.none { it == slaveAddress }) {
-            throw IllegalArgumentException("slaveAddress [${slaveAddress}] is not configed")
+        if (slaveAddress !in mSlaveAddressList) throw IllegalArgumentException("slaveAddress[$slaveAddress] 未配置")
+        val task = FrameTask(frame.compile(slaveAddress), done).apply {
+            this.allowRetransmission = allowRetransmission
+            this.minSendInterval = minSendIntervalNanoSeconds
         }
-        val task = FrameTask(frame.compile(slaveAddress), done)
-        task.allowRetransmission = allowRetransmission
-        task.minSendInterval = minSendIntervalNanoSeconds
-        pendings.add(task)
+        pendings.trySend(task)
     }
 
     /**
-     * 开始通信
+     * 按顺序发送给所有从机,汇总结果后回调
      */
-    fun start() {
-        thread = Thread {
-            while (running) {
-                try {
-                    takePendingToSend()
-                } catch (e: InterruptedException) {
-                }
-            }
+    fun sendToAll(
+        frame: MBFrame,
+        done: ((List<ByteArray>) -> Unit)? = null
+    ) {
+        if (mSlaveAddressList.isEmpty()) {
+            done?.invoke(emptyList()); return
         }
-        thread?.isDaemon = true
-        thread?.start()
+        sendUp(0, frame, done, mutableListOf())
     }
 
-    /**
-     * 是否运行
-     */
-    fun isRunning(): Boolean {
-        return running
+    private fun sendUp(
+        index: Int,
+        frame: MBFrame,
+        done: ((List<ByteArray>) -> Unit)?,
+        results: MutableList<ByteArray>
+    ) {
+        sendTo(mSlaveAddressList[index], frame) { res ->
+            results.add(res)
+            if (index == mSlaveAddressList.lastIndex) {
+                if (running) done?.invoke(results)
+            } else sendUp(index + 1, frame, done, results)
+        }
     }
 
     /**
-     * 停止运行
+     * 循环发送给所有从机
      */
-    fun stop() {
-        running = false
-        thread?.interrupt()
-        portManager.close()
+    fun repeatSendToAll(
+        frame: MBFrame,
+        interrupt: (() -> List<Boolean>)? = null,
+        listener: (List<ByteArray>) -> Unit,
+        delayMills: Long
+    ): ModBusManager {
+        val keep = interrupt?.invoke()?.run { !this[0] } ?: false
+        if (keep) {
+            sendToAll(frame) {
+                if (running) {
+                    listener(it)
+                    Executor.delayOnIO({ if (running) repeatSendToAll(frame, interrupt, listener, delayMills) }, delayMills)
+                }
+            }
+        } else {
+            Executor.delayOnIO({ if (running) repeatSendToAll(frame, interrupt, listener, delayMills) }, delayMills)
+        }
+        return this
     }
-}
+}