Bladeren bron

refactor(Modbus):
- `ModBusManager`: 重构Modbus通信模型,从并发发送和异步回调匹配,改为严格串行的“发送-等待-处理”模式。
- 引入`CompletableDeferred`作为同步锁,确保每个请求在收到响应或超时后,才开始处理下一个请求,解决了因并发和时序问题导致的响应错乱。
- 统一由发送协程(`startStrictSerialWorker`)处理所有任务的生命周期,包括发送、等待、超时和回调触发,回调线程只负责匹配和解锁等待器。
- 优化了串口时序参数(`turnaroundDelayMs`, `interFrameGapMs`, `defaultTimeoutMs`),使其与`RS485LockManager`的行为对齐,提高了通信稳定性。
- 简化了串口回调逻辑(`onUartPayload`),仅用于匹配当前发送的帧并释放信号,移除了复杂的业务处理。
- `ModBusController`: 移除`switchStatus`方法中的调试日志。

周文健 6 maanden geleden
bovenliggende
commit
fe76a88469

+ 1 - 2
data/src/main/java/com/grkj/data/hardware/modbus/ModBusController.kt

@@ -377,8 +377,7 @@ object ModBusController {
      * 开关量更新
      */
     fun switchStatus(res: Any, done: () -> Unit) {
-        logger.info("开关板:${(res as ByteArray).toHexStrings()}")
-        if (res.isEmpty()) {
+        if ((res as ByteArray).isEmpty()) {
             var tipStr = I18nManager.t("no_response_board_exists") + " : "
             val addressList = mutableListOf<String>()
 

+ 101 - 116
data/src/main/java/com/grkj/data/hardware/modbus/ModBusManager.kt

@@ -11,6 +11,7 @@ import com.grkj.shared.config.Constants
 import com.grkj.shared.utils.extension.toHexStrings
 import com.sik.sikcore.extension.getMMKVData
 import com.sik.sikcore.thread.ThreadUtils
+import kotlinx.coroutines.CompletableDeferred
 import kotlinx.coroutines.CoroutineScope
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.Job
@@ -18,14 +19,18 @@ import kotlinx.coroutines.channels.Channel
 import kotlinx.coroutines.delay
 import kotlinx.coroutines.isActive
 import kotlinx.coroutines.launch
+import kotlinx.coroutines.withTimeoutOrNull
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 import kotlin.coroutines.cancellation.CancellationException
-import kotlin.math.log
 
 /**
- * ModBus 协议管理器(协程版)
- * 逻辑与原版一致:按顺序从队列取任务,发送请求,并在回调中匹配响应。
+ * ModBus 协议管理器(严格串行版)
+ *
+ * 变更点:
+ * 1) 采用 RS485LockManager 风格的“强串行挂起”模型:发送 → 等待匹配回包/超时 → 收尾 → 下一帧。
+ * 2) 统一由发送协程串行推进,回调线程只负责“匹配并放行”;禁止任何并发发送。
+ * 3) 去除与锁/钥匙等业务无关的逻辑,保留 ModBus 纯通信语义。
  */
 class ModBusManager(
     // 底层串口管理器
@@ -35,103 +40,105 @@ class ModBusManager(
 ) {
     private val logger: Logger = LoggerFactory.getLogger(ModBusManager::class.java)
 
-    /** 待发送队列 */
+    /** 待发送任务队列(严格串行) */
     private val pendings = Channel<FrameTask>(Channel.UNLIMITED)
 
-    /**
-     * 作用域
-     */
+    /** 作用域 */
     protected val scope = CoroutineScope(Dispatchers.Default + Job())
 
-    private var sending: FrameTask? = null
-
-    private var running: Boolean = false
+    /** 当前正在处理的任务(仅用于匹配判断,不做并发) */
+    @Volatile private var sending: FrameTask? = null
 
+    @Volatile private var running: Boolean = false
     private var job: Job? = null
 
     /** 从机地址池 */
     var mSlaveAddressList = mutableListOf<Byte>()
 
-    /**
-     * 通道
-     */
+    /** 底层通道 */
     private var channel: IHardwareChannel? = null
 
-    // 时序
+    // ======== 串口时序/等待参数(与 RS485LockManager 对齐) ========
     private val turnaroundDelayMs = 2L
     private val interFrameGapMs = 2L
     private val defaultTimeoutMs = 520L
 
-    @Volatile
-    private var txWorkerStarted = false
+    /** 当前包回执等待器(匹配成功后由回调放行) */
+    @Volatile private var currentAckWaiter: CompletableDeferred<ByteArray?>? = null
 
     init {
-        // 初始化地址池
+        // 初始化从机地址池
         val dockConfig = MMKVConstants.KEY_DOCK_CONFIG.getMMKVData("[]")
         logger.info("基座配置: ${dockConfig}")
-        dockConfig.takeIf { it.isNotEmpty() }
-            ?.let { json ->
-                val type = object : TypeToken<List<DockBean>>() {}.type
-                val list: List<DockBean> = Gson().fromJson(json, type)
-                mSlaveAddressList.addAll(list.map { it.addr })
-            }
+        dockConfig.takeIf { it.isNotEmpty() }?.let { json ->
+            val type = object : TypeToken<List<DockBean>>() {}.type
+            val list: List<DockBean> = Gson().fromJson(json, type)
+            mSlaveAddressList.addAll(list.map { it.addr })
+        }
     }
 
-    /**
-     * 从队列中取任务并发送
-     */
-    private fun takePendingToSend() {
-        if (txWorkerStarted) return
-        txWorkerStarted = true
-
+    // ======== 核心:严格串行发送 worker ========
+    private fun startStrictSerialWorker() {
         scope.launch(Dispatchers.IO) {
-            try {
-                for (req in pendings) {
-                    // 提前中断检查(避免无意义等待)
-                    if (!running || !isActive) break
-
-                    // 本次要发送的就是这条,明确绑定,别混着用 sending?.run 和外层 req
-                    sending = req
-                    try {
-                        // 半双工节拍:先 turnaround 再 IFG
-                        if (turnaroundDelayMs > 0) delay(turnaroundDelayMs)
-                        if (interFrameGapMs > 0) delay(interFrameGapMs)
-
-                        if (!running || !isActive) break
-
-                        // 只判断这条是否应发
-                        if (sending?.shouldSend() == true) {
-                            val ok = (channel?.write(sending!!.req, 0, sending!!.req.size) == true)
-                            if (ok) {
-                                sending?.afterSent()
-                                if (verbose) logger.debug("发送:${sending!!.req.toHexStrings()}")
-                            } else {
-                                logger.warn("无法与主控板通讯")
-                                // 失败反馈给上层(按你现有约定用空数组)
-                                sending?.done?.invoke(byteArrayOf())
-                            }
-                        } else {
-                            logger.info("未响应: ${sending?.req?.toHexStrings()}")
-                            sending?.done?.invoke(byteArrayOf())
-                        }
-
-                        // 发送后再给个帧间隔,避免挤占下一帧
-                        if (interFrameGapMs > 0) delay(interFrameGapMs)
-                    } finally {
-                        // 无论成功/失败/异常都清空,保证下一条能被消费
-                    }
+            while (isActive) {
+                val task = try { pendings.receive() } catch (_: CancellationException) { break }
+                if (!running) { task.done?.invoke(byteArrayOf()); continue }
+
+                // 起始间隔
+                if (turnaroundDelayMs > 0) delay(turnaroundDelayMs)
+                if (interFrameGapMs > 0) delay(interFrameGapMs)
+                if (!running) { task.done?.invoke(byteArrayOf()); continue }
+
+                // 设置当前任务
+                sending = task
+
+                // 为该任务建立一次性的回包等待器
+                val waiter = CompletableDeferred<ByteArray?>()
+                currentAckWaiter = waiter
+
+                // 下发
+                val bytes = task.req
+                val wrote = channel?.write(bytes, 0, bytes.size) == true
+                if (!wrote) {
+                    logger.warn("无法与主控板通讯:写失败 -> ${bytes.toHexStrings()}")
+                    task.done?.invoke(byteArrayOf())
+                    sending = null
+                    if (interFrameGapMs > 0) delay(interFrameGapMs)
+                    continue
                 }
-            } catch (ce: CancellationException) {
-                // 协程取消正常退出
-            } finally {
-                txWorkerStarted = false
+                logger.debug("发送:${bytes.toHexStrings()}")
+                task.afterSent()
+
+                // 等待匹配回包或超时
+                val payload = withTimeoutOrNull(defaultTimeoutMs) { waiter.await() }
+                if (payload == null) {
+                    logger.info("超时未响应:${bytes.toHexStrings()}")
+                    task.done?.invoke(byteArrayOf())
+                } else {
+                    // 严格串行:仅在 worker 中触发回调
+                    task.done?.invoke(payload)
+                }
+
+                // 任务收尾
+                sending = null
+                if (interFrameGapMs > 0) delay(interFrameGapMs)
             }
         }
     }
 
-    /**
-     * 开始通信(协程替代线程)
-     */
+    // ======== 回调:仅负责匹配并放行,不做业务 ========
+    private fun onUartPayload(payload: ByteArray) {
+        if (!running) return
+        val cur = sending
+        if (cur != null && cur.match(payload)) {
+            // 匹配成功 -> 放行当前等待器,由 worker 统一收尾
+            currentAckWaiter?.complete(payload)
+        } else if (verbose) {
+            logger.warn("未匹配响应: ${payload.toHexStrings()}, running=$running")
+        }
+    }
+
+    // ======== 生命周期 ========
     fun start() {
         job = scope.launch {
             val bindOK = client.bind()
@@ -142,38 +149,23 @@ class ModBusManager(
             val cfg = HwConfig(
                 transportUri = "serial://?dev=/dev/ttyS4&baud=115200&data_bits=8&stop_bits=1&parity=N",
                 codec = ProtocolId.RAW,
-                readTimeoutMs = 500
+                readTimeoutMs = 100
             )
-            channel = client.open(cfg) { sessionId, payload ->
-                // 串口监听,回调在单独线程中执行
-                if (verbose) logger.debug("接收:${payload.toHexStrings()}")
-                sending?.run {
-                    if (match(payload) && running) {
-                        done?.invoke(payload)
-                        sending = null
-                    } else {
-                        logger.warn("响应: ${payload.toHexStrings()} 未匹配, running: $running")
-                        sending?.done?.invoke(byteArrayOf())
-                    }
-                }
-            }
+            channel = client.open(cfg) { _, payload -> onUartPayload(payload) }
             running = true
-            takePendingToSend()
+            startStrictSerialWorker()
         }
     }
 
-    /**
-     * 停止运行并关闭串口
-     */
     fun stop() {
         running = false
         job?.cancel()
         client.unbind()
     }
 
-    /**
-     * 提交单个请求任务
-     */
+    // ======== 对外 API:保持一致,但内部严格串行 ========
+
+    /** 提交单个请求任务 */
     fun sendTo(
         slaveAddress: Byte,
         frame: MBFrame,
@@ -186,24 +178,29 @@ class ModBusManager(
             done?.invoke(byteArrayOf())
             return
         }
-        if (slaveAddress !in mSlaveAddressList) throw IllegalArgumentException("slaveAddress[$slaveAddress] 未配置")
+        if (slaveAddress !in mSlaveAddressList)
+            throw IllegalArgumentException("slaveAddress[$slaveAddress] 未配置")
+
         val task = FrameTask(frame.compile(slaveAddress), done).apply {
             this.allowRetransmission = allowRetransmission
             this.minSendInterval = minSendIntervalNanoSeconds
         }
-        pendings.trySend(task)
+
+        // 入队,交由严格串行 worker 处理
+        val r = pendings.trySend(task)
+        if (!r.isSuccess) {
+            scope.launch(Dispatchers.IO) {
+                try { pendings.send(task) } catch (_: CancellationException) { done?.invoke(byteArrayOf()) }
+            }
+        }
     }
 
-    /**
-     * 按顺序发送给所有从机,汇总结果后回调
-     */
+    /** 按顺序发送给所有从机,汇总结果后回调(同样串行) */
     fun sendToAll(
         frame: MBFrame,
         done: ((List<ByteArray>) -> Unit)? = null
     ) {
-        if (mSlaveAddressList.isEmpty()) {
-            done?.invoke(emptyList()); return
-        }
+        if (mSlaveAddressList.isEmpty()) { done?.invoke(emptyList()); return }
         sendUp(0, frame, done, mutableListOf())
     }
 
@@ -221,9 +218,7 @@ class ModBusManager(
         }
     }
 
-    /**
-     * 循环发送给所有从机
-     */
+    /** 循环发送给所有从机(保持接口,内部依旧严格串行) */
     fun repeatSendToAll(
         frame: MBFrame,
         interrupt: (() -> List<Boolean>)? = null,
@@ -236,23 +231,13 @@ class ModBusManager(
                 if (running) {
                     listener(it)
                     ThreadUtils.runOnIODelayed(delayMills) {
-                        if (running) repeatSendToAll(
-                            frame,
-                            interrupt,
-                            listener,
-                            delayMills
-                        )
+                        if (running) repeatSendToAll(frame, interrupt, listener, delayMills)
                     }
                 }
             }
         } else {
             ThreadUtils.runOnIODelayed(delayMills) {
-                if (running) repeatSendToAll(
-                    frame,
-                    interrupt,
-                    listener,
-                    delayMills
-                )
+                if (running) repeatSendToAll(frame, interrupt, listener, delayMills)
             }
         }
         return this