socketcreate.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. // socket创建模块
  2. // 秒寻科技
  3. // zt 2024-08-30
  4. package socketcreate
  5. import (
  6. "bufio"
  7. "bytes"
  8. "fmt"
  9. "io"
  10. "ipsomc/module/ps/psmodel"
  11. "ipsomc/module/ps/psul"
  12. "ipsomc/module/socket/socketsend"
  13. "ipsomc/util"
  14. "net"
  15. "os"
  16. )
  17. type SocketCreate struct {
  18. socketSendApi socketsend.SocketSend
  19. psUlApi psul.PsUl
  20. }
  21. // 创建UDP Socket
  22. func (obj *SocketCreate) CreateUdpSocket() {
  23. myViper := util.GetViper()
  24. hostaddress := myViper.GetString("socket.hostaddress") //主机地址
  25. hostport := myViper.GetInt("socket.hostport") //主机端口
  26. hostUrl := hostaddress + ":" + fmt.Sprintf("%d", hostport) //拼接url
  27. // 解析UPD地址和端口
  28. udpAddr, err := net.ResolveUDPAddr("udp", hostUrl)
  29. if err != nil {
  30. fmt.Println(err.Error())
  31. os.Exit(1)
  32. }
  33. // 创建一个UDP连接并监听端口
  34. udpConn, err := net.ListenUDP("udp", udpAddr)
  35. if err != nil {
  36. fmt.Println(err.Error())
  37. os.Exit(1)
  38. }
  39. defer udpConn.Close()
  40. obj.socketSendApi.SaveUdpConnHandle(udpConn)
  41. // 无限循环接收数据
  42. dataBuffer := make([]byte, 0, 2048) //接收缓存
  43. for {
  44. //将数据读到临时buffer中(ReadFromUDP方法会阻塞,直到有数据到达)
  45. tmpBuffer := make([]byte, 1024) //临时缓存
  46. n, clientAddr, err := udpConn.ReadFromUDP(tmpBuffer)
  47. if err != nil {
  48. fmt.Println(err.Error())
  49. return
  50. }
  51. dataBuffer = append(dataBuffer, tmpBuffer[:n]...) //把数据追加到数据缓存中
  52. //成帧处理,并且把数据发送给协议栈
  53. for len(dataBuffer) > psmodel.PS_FRAME_MIN_LEN { //数据长度大于最小帧长
  54. if dataBuffer[0] == psmodel.PS_AP_END_FLAG {
  55. // 查找数据包的结尾
  56. endIndex := bytes.IndexByte(dataBuffer[1:], psmodel.PS_AP_END_FLAG)
  57. if endIndex != -1 {
  58. // 找到了完整的数据包
  59. dataPacket := dataBuffer[0 : endIndex+2] // 包括结尾的0x7E
  60. dataBuffer = dataBuffer[endIndex+2:] // 移除已处理的数据包,包括结尾的0x7E
  61. //开启goroutine,将数据报文发送给协议栈模块
  62. go obj.psUlApi.PsUl(clientAddr, dataPacket, len(dataPacket))
  63. fmt.Printf("Received %d bytes from %s\n", len(dataPacket), clientAddr)
  64. } else {
  65. // 没有找到结尾,等待更多数据
  66. break
  67. }
  68. } else {
  69. // 丢弃非法数据(数据以0x7E开头),并等待更多数据
  70. dataBuffer = dataBuffer[1:]
  71. }
  72. }
  73. }
  74. }
  75. // 创建TCP Socket
  76. func (obj *SocketCreate) CreateTcpSocket() {
  77. myViper := util.GetViper()
  78. hostaddress := myViper.GetString("socket.hostaddress") //主机地址
  79. hostport := myViper.GetInt("socket.hostport") //主机端口
  80. hostUrl := hostaddress + ":" + fmt.Sprintf("%d", hostport) //拼接url
  81. // 解析地址和端口
  82. tcpAddr, err := net.ResolveTCPAddr("tcp", hostUrl)
  83. if err != nil {
  84. fmt.Println(err.Error())
  85. os.Exit(1)
  86. }
  87. // 监听TCP地址
  88. listener, err := net.ListenTCP("tcp", tcpAddr)
  89. if err != nil {
  90. fmt.Println(err.Error())
  91. os.Exit(1)
  92. }
  93. defer listener.Close()
  94. // 监听客户端连接
  95. for {
  96. conn, err := listener.AcceptTCP() //接收客户端连接
  97. if err != nil {
  98. fmt.Println("Error accepting:", err.Error())
  99. continue
  100. }
  101. // 开启goroutine,处理该连接
  102. go obj.HandleTcpConnection(conn)
  103. }
  104. }
  105. // 处理每一个TCP连接
  106. func (obj *SocketCreate) HandleTcpConnection(conn *net.TCPConn) {
  107. defer conn.Close()
  108. //保存tcp连接句柄
  109. address := conn.RemoteAddr().String()
  110. obj.socketSendApi.SaveTcpConnHandle(conn, address)
  111. //监听TCP连接上的数据
  112. reader := bufio.NewReader(conn)
  113. dataBuffer := make([]byte, 0, 2048) //接收缓存
  114. for {
  115. tmpBuffer := make([]byte, 1024) //临时缓存
  116. n, err := reader.Read(tmpBuffer) //从客户端读取数据
  117. if err != nil && err != io.EOF {
  118. fmt.Println("Error reading:", err.Error())
  119. obj.socketSendApi.DeleteTcpConnHandle(address) //删除一个tcp连接
  120. break
  121. }
  122. dataBuffer = append(dataBuffer, tmpBuffer[:n]...) //把数据追加到数据缓存中
  123. //成帧处理,并且把数据发送给协议栈
  124. for len(dataBuffer) > psmodel.PS_FRAME_MIN_LEN { //数据长度大于最小帧长
  125. if dataBuffer[0] == psmodel.PS_AP_END_FLAG {
  126. // 查找数据包的结尾
  127. endIndex := bytes.IndexByte(dataBuffer[1:], psmodel.PS_AP_END_FLAG)
  128. if endIndex != -1 {
  129. // 找到了完整的数据包
  130. dataPacket := dataBuffer[0 : endIndex+2] // 包括结尾的0x7E
  131. dataBuffer = dataBuffer[endIndex+2:] // 移除已处理的数据包,包括结尾的0x7E
  132. //开启goroutine,将数据报文发送给协议栈模块
  133. go obj.psUlApi.PsUlTcp(address, dataPacket, len(dataPacket))
  134. fmt.Printf("Received %d bytes from %s\n", len(dataPacket), address)
  135. } else {
  136. // 没有找到结尾,等待更多数据
  137. break
  138. }
  139. } else {
  140. // 丢弃非法数据(数据以0x7E开头),并等待更多数据
  141. dataBuffer = dataBuffer[1:]
  142. }
  143. }
  144. }
  145. }