socketcreate.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. // socket创建模块
  2. // 秒寻科技
  3. // zt 2024-08-30
  4. package socketcreate
  5. import (
  6. "bufio"
  7. "bytes"
  8. "fmt"
  9. "io"
  10. "ipsomc/module/ps/psmodel"
  11. "ipsomc/module/ps/psul"
  12. "ipsomc/module/socket/socketsend"
  13. "ipsomc/public"
  14. "ipsomc/util"
  15. "net"
  16. "os"
  17. )
  18. type SocketCreate struct {
  19. socketSendApi socketsend.SocketSend
  20. psUlApi psul.PsUl
  21. }
  22. // 创建UDP Socket
  23. func (obj *SocketCreate) CreateUdpSocket() {
  24. myViper := util.GetViper()
  25. hostaddress := myViper.GetString("socket.hostaddress") //主机地址
  26. hostport := myViper.GetInt("socket.hostport") //主机端口
  27. hostUrl := hostaddress + ":" + fmt.Sprintf("%d", hostport) //拼接url
  28. // 解析UPD地址和端口
  29. udpAddr, err := net.ResolveUDPAddr("udp", hostUrl)
  30. if err != nil {
  31. fmt.Println(err.Error())
  32. os.Exit(1)
  33. }
  34. // 创建一个UDP连接并监听端口
  35. udpConn, err := net.ListenUDP("udp", udpAddr)
  36. if err != nil {
  37. fmt.Println(err.Error())
  38. os.Exit(1)
  39. }
  40. defer udpConn.Close()
  41. obj.socketSendApi.SaveUdpConnHandle(udpConn)
  42. // 无限循环接收数据
  43. dataBuffer := make([]byte, 0, 2048) //接收缓存
  44. for {
  45. //将数据读到临时buffer中(ReadFromUDP方法会阻塞,直到有数据到达)
  46. tmpBuffer := make([]byte, 1024) //临时缓存
  47. n, clientAddr, err := udpConn.ReadFromUDP(tmpBuffer)
  48. if err != nil {
  49. fmt.Println(err.Error())
  50. return
  51. }
  52. dataBuffer = append(dataBuffer, tmpBuffer[:n]...) //把数据追加到数据缓存中
  53. //成帧处理,并且把数据发送给协议栈
  54. for len(dataBuffer) > psmodel.PS_FRAME_MIN_LEN { //数据长度大于最小帧长
  55. if dataBuffer[0] == psmodel.PS_AP_END_FLAG {
  56. // 查找数据包的结尾
  57. endIndex := bytes.IndexByte(dataBuffer[1:], psmodel.PS_AP_END_FLAG)
  58. if endIndex != -1 {
  59. // 找到了完整的数据包
  60. dataPacket := dataBuffer[0 : endIndex+2] // 包括结尾的0x7E
  61. dataBuffer = dataBuffer[endIndex+2:] // 移除已处理的数据包,包括结尾的0x7E
  62. //开启goroutine,将数据报文发送给协议栈模块
  63. go obj.psUlApi.PsUl(clientAddr, dataPacket, len(dataPacket))
  64. fmt.Printf("Received %d bytes from %s\n", len(dataPacket), clientAddr)
  65. } else {
  66. // 没有找到结尾,等待更多数据
  67. break
  68. }
  69. } else {
  70. // 丢弃非法数据(数据以0x7E开头),并等待更多数据
  71. dataBuffer = dataBuffer[1:]
  72. }
  73. }
  74. }
  75. }
  76. // 创建TCP Socket
  77. func (obj *SocketCreate) CreateTcpSocket() {
  78. myViper := util.GetViper()
  79. hostaddress := myViper.GetString("socket.hostaddress") //主机地址
  80. hostport := myViper.GetInt("socket.hostport") //主机端口
  81. hostUrl := hostaddress + ":" + fmt.Sprintf("%d", hostport) //拼接url
  82. // 解析地址和端口
  83. tcpAddr, err := net.ResolveTCPAddr("tcp", hostUrl)
  84. if err != nil {
  85. fmt.Println(err.Error())
  86. os.Exit(1)
  87. }
  88. // 监听TCP地址
  89. listener, err := net.ListenTCP("tcp", tcpAddr)
  90. if err != nil {
  91. fmt.Println(err.Error())
  92. os.Exit(1)
  93. }
  94. defer listener.Close()
  95. // 监听客户端连接
  96. for {
  97. conn, err := listener.AcceptTCP() //接收客户端连接
  98. if err != nil {
  99. fmt.Println("Error accepting:", err.Error())
  100. continue
  101. }
  102. // 开启goroutine,处理该连接
  103. go obj.HandleTcpConnection(conn)
  104. }
  105. }
  106. // 处理每一个TCP连接
  107. func (obj *SocketCreate) HandleTcpConnection(conn *net.TCPConn) {
  108. defer conn.Close()
  109. // conn.SetKeepAlive(false)
  110. // conn.SetKeepAlivePeriod(5 * time.Minute) // 每隔5分钟检测连接
  111. //保存tcp连接句柄
  112. address := conn.RemoteAddr().String()
  113. obj.socketSendApi.SaveTcpConnHandle(conn, address)
  114. //监听TCP连接上的数据
  115. reader := bufio.NewReader(conn)
  116. dataBuffer := make([]byte, 0, 2048) //接收缓存
  117. for {
  118. tmpBuffer := make([]byte, 1024) //临时缓存
  119. n, err := reader.Read(tmpBuffer) //从客户端读取数据
  120. if err != nil {
  121. if err == io.EOF {
  122. fmt.Println("客户端已关闭链接:", err.Error())
  123. } else {
  124. fmt.Println("从客户端读取数据错误:", err.Error())
  125. }
  126. obj.socketSendApi.DeleteTcpConnHandle(address)
  127. break
  128. }
  129. dataBuffer = append(dataBuffer, tmpBuffer[:n]...) //把数据追加到数据缓存中
  130. //成帧处理,并且把数据发送给协议栈
  131. for len(dataBuffer) > psmodel.PS_FRAME_MIN_LEN { //数据长度大于最小帧长
  132. // 查找第一个合法帧头(0x7E)
  133. startIndex := bytes.IndexByte(dataBuffer, psmodel.PS_AP_END_FLAG)
  134. if startIndex == -1 {
  135. // 无合法帧头,清空缓存
  136. dataBuffer = dataBuffer[:0]
  137. break
  138. } else if startIndex > 0 {
  139. // 丢弃帧头前的无效数据
  140. dataBuffer = dataBuffer[startIndex:]
  141. }
  142. // 继续查找帧尾(第二个0x7E)
  143. endIndex := bytes.IndexByte(dataBuffer[1:], psmodel.PS_AP_END_FLAG)
  144. if endIndex == -1 {
  145. // 未找到帧尾,等待更多数据
  146. break
  147. }
  148. // 提取完整数据包(包含头尾的0x7E)
  149. dataPacket := dataBuffer[0 : endIndex+2]
  150. dataBuffer = dataBuffer[endIndex+2:]
  151. //开启goroutine,将数据报文发送给协议栈模块
  152. go obj.psUlApi.PsUlTcp(address, dataPacket, len(dataPacket))
  153. //获得系统工作模式
  154. sysWorkMode := public.PublicSysWorkModeGet()
  155. if sysWorkMode == 0 {
  156. fmt.Printf("Received %d bytes from %s\n", len(dataPacket), address)
  157. }
  158. }
  159. }
  160. }