ModBusManager.kt 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package com.grkj.iscs.modbus
  2. import com.google.gson.Gson
  3. import com.google.gson.reflect.TypeToken
  4. import com.grkj.iscs.MyApplication
  5. import com.grkj.iscs.extentions.crc16
  6. import com.grkj.iscs.extentions.toHexStrings
  7. import com.grkj.iscs.util.Executor
  8. import com.grkj.iscs.util.SPUtils
  9. import com.grkj.iscs.util.jvmSeconds
  10. import com.grkj.iscs.util.log.LogUtil
  11. import com.grkj.iscs.view.fragment.DockTestFragment
  12. import java.util.concurrent.LinkedBlockingQueue
  13. /**
  14. * ModBus 协议管理器
  15. */
  16. class ModBusManager(
  17. // 串口管理器
  18. val portManager: PortManager,
  19. // 是否输出详细信息
  20. val verbose: Boolean = false
  21. ) {
  22. @Volatile
  23. private var running = true
  24. /**
  25. * 正在发送的任务
  26. */
  27. @Volatile
  28. private var sending: FrameTask? = null
  29. /**
  30. * 等待发送队列
  31. */
  32. private val pendings = LinkedBlockingQueue<FrameTask>()
  33. /**
  34. * 线程锁
  35. */
  36. private val lock = Any()
  37. private var thread: Thread? = null
  38. var mSlaveAddressList = mutableListOf<Byte>()
  39. init {
  40. portManager.listen { res ->
  41. if (verbose) {
  42. LogUtil.i("接收:${res.toHexStrings()}")
  43. }
  44. synchronized(lock) {
  45. sending?.run {
  46. if (match(res) && running) {
  47. done?.let { it(res) }
  48. sending = null
  49. } else {
  50. LogUtil.w("响应: ${res.toHexStrings()}未匹配, running:${running}")
  51. }
  52. }
  53. }
  54. }
  55. // 初始化地址池
  56. val dockConfigJson = SPUtils.getDockConfig(MyApplication.instance?.applicationContext!!)
  57. if (!dockConfigJson.isNullOrEmpty()) {
  58. val tempList: MutableList<DockTestFragment.DockTestBean> =
  59. Gson().fromJson(
  60. dockConfigJson,
  61. object : TypeToken<MutableList<DockTestFragment.DockTestBean>>() {}.type
  62. )
  63. mSlaveAddressList.addAll(tempList.map { it.address })
  64. }
  65. }
  66. /**
  67. * 发送队列的消息
  68. */
  69. private fun takePendingToSend() {
  70. if (sending == null) {
  71. sending = pendings.take()
  72. }
  73. if (!running) {
  74. return
  75. }
  76. sending?.run {
  77. waitIfNecessary()
  78. }
  79. synchronized(lock) {
  80. sending?.run {
  81. if (shouldSend()) {
  82. if (portManager.send(req)) {
  83. afterSent()
  84. if (verbose) {
  85. LogUtil.i("发送:${req.toHexStrings()}")
  86. }
  87. } else {
  88. LogUtil.w("无法与主控板通讯")
  89. }
  90. } else {
  91. LogUtil.i("未响应: ${req.toHexStrings()}")
  92. // 放弃处理,回调空数据
  93. done?.let { it(byteArrayOf()) }
  94. sending = null
  95. }
  96. }
  97. }
  98. }
  99. /**
  100. * 循环发送给所有从机
  101. * @param frame 发送报文
  102. * @param listener 每轮发送完后的数据监听
  103. * @param delayMills 每轮发送的间隔
  104. */
  105. fun repeatSendToAll(
  106. frame: MBFrame,
  107. interrupt: (() -> List<Boolean>)? = null,
  108. listener: (res: List<ByteArray>) -> Unit,
  109. delayMills: Long
  110. ): ModBusManager {
  111. val keep = interrupt?.invoke()?.run { !this[0] } ?: false
  112. if (keep) {
  113. sendToAll(frame) {
  114. if (running) {
  115. listener(it)
  116. Executor.delayOnIO({
  117. if (running) {
  118. repeatSendToAll(frame, interrupt, listener, delayMills)
  119. }
  120. }, delayMills)
  121. }
  122. }
  123. } else {
  124. Executor.delayOnIO({
  125. if (running) {
  126. repeatSendToAll(frame, interrupt, listener, delayMills)
  127. }
  128. }, delayMills)
  129. }
  130. return this
  131. }
  132. /**
  133. * 发送给所有从机
  134. * @param frame 发送报文
  135. * @param done 所有从机都发送完成后的回调
  136. */
  137. fun sendToAll(frame: MBFrame, done: ((res: List<ByteArray>) -> Unit)? = null) {
  138. if (mSlaveAddressList.size == 0) {
  139. done?.let { it(listOf()) }
  140. return
  141. }
  142. sendUp(0, frame, done, ArrayList())
  143. }
  144. private fun sendUp(
  145. index: Int,
  146. frame: MBFrame,
  147. done: ((res: List<ByteArray>) -> Unit)?,
  148. resList: ArrayList<ByteArray>
  149. ) {
  150. sendTo(mSlaveAddressList[index], frame) { res ->
  151. resList.add(res)
  152. if (index >= mSlaveAddressList.size - 1) {
  153. // 已经发送完
  154. if (running) {
  155. done?.let { it(resList) }
  156. }
  157. } else {
  158. // 发送给下一个从机
  159. sendUp(index + 1, frame, done, resList)
  160. }
  161. }
  162. }
  163. /**
  164. * 发送给序号为 index 的从机
  165. * @param slaveAddress 从机地址
  166. * @param frame 发送报文
  167. * @param done 完成回调
  168. */
  169. fun sendTo(
  170. slaveAddress: Byte,
  171. frame: MBFrame,
  172. allowRetransmission: Boolean = true,
  173. minSendIntervalNanoSeconds: Int = MODBUS_MIN_SEND_INTERVAL,
  174. done: ((res: ByteArray) -> Unit)? = null
  175. ) {
  176. if (mSlaveAddressList.size <= 0) {
  177. LogUtil.i("sendTo($slaveAddress), 地址池大小为0, 返回空数据")
  178. done?.invoke(byteArrayOf())
  179. return
  180. }
  181. if (mSlaveAddressList.none { it == slaveAddress }) {
  182. throw IllegalArgumentException("slaveAddress [${slaveAddress}] is not configed")
  183. }
  184. val task = FrameTask(frame.compile(slaveAddress), done)
  185. task.allowRetransmission = allowRetransmission
  186. task.minSendInterval = minSendIntervalNanoSeconds
  187. pendings.add(task)
  188. }
  189. /**
  190. * 开始通信
  191. */
  192. fun start() {
  193. thread = Thread {
  194. while (running) {
  195. try {
  196. takePendingToSend()
  197. } catch (e: InterruptedException) {
  198. }
  199. }
  200. }
  201. thread?.isDaemon = true
  202. thread?.start()
  203. }
  204. /**
  205. * 是否运行
  206. */
  207. fun isRunning(): Boolean {
  208. return running
  209. }
  210. /**
  211. * 停止运行
  212. */
  213. fun stop() {
  214. running = false
  215. thread?.interrupt()
  216. portManager.close()
  217. }
  218. }