...
1 package server
2
3 import (
4 "bytes"
5 "log"
6 "net"
7 "net/http"
8 "sync"
9 "time"
10
11 "code.rocket9labs.com/tslocum/bgammon"
12 "github.com/gobwas/ws"
13 "github.com/gobwas/ws/wsutil"
14 )
15
16 var _ bgammon.Client = &webSocketClient{}
17
18 type webSocketClient struct {
19 conn net.Conn
20 events chan []byte
21 commands chan<- []byte
22 terminated bool
23 wgEvents sync.WaitGroup
24 verbose bool
25 }
26
27 func newWebSocketClient(r *http.Request, w http.ResponseWriter, commands chan<- []byte, events chan []byte, verbose bool) *webSocketClient {
28 conn, _, _, err := ws.UpgradeHTTP(r, w)
29 if err != nil {
30 return nil
31 }
32
33 return &webSocketClient{
34 conn: conn,
35 events: events,
36 commands: commands,
37 verbose: verbose,
38 }
39 }
40
41 func (c *webSocketClient) HandleReadWrite() {
42 if c.terminated {
43 return
44 }
45
46 closeWrite := make(chan struct{}, 1)
47
48 go c.writeEvents(closeWrite)
49 c.readCommands()
50
51 closeWrite <- struct{}{}
52 }
53
54 func (c *webSocketClient) Write(message []byte) {
55 if c.terminated {
56 return
57 }
58
59 c.wgEvents.Add(1)
60 c.events <- message
61 }
62
63 func (c *webSocketClient) readCommands() {
64 setTimeout := func() {
65 err := c.conn.SetReadDeadline(time.Now().Add(clientTimeout))
66 if err != nil {
67 c.Terminate(err.Error())
68 return
69 }
70 }
71
72 for {
73 if c.terminated {
74 return
75 }
76
77 setTimeout()
78 msg, op, err := wsutil.ReadClientData(c.conn)
79 if err != nil {
80 c.Terminate(err.Error())
81 return
82 } else if op != ws.OpText {
83 continue
84 }
85
86 buf := make([]byte, len(msg))
87 copy(buf, msg)
88 c.commands <- buf
89
90 if c.verbose {
91 logClientRead(msg)
92 }
93 }
94 }
95
96 func (c *webSocketClient) writeEvents(closeWrite chan struct{}) {
97 setTimeout := func() {
98 err := c.conn.SetWriteDeadline(time.Now().Add(clientTimeout))
99 if err != nil {
100 c.Terminate(err.Error())
101 return
102 }
103 }
104
105 setTimeout()
106 var event []byte
107 for {
108 select {
109 case <-closeWrite:
110 for {
111 select {
112 case <-c.events:
113 c.wgEvents.Done()
114 default:
115 return
116 }
117 }
118 case event = <-c.events:
119 }
120
121 if c.terminated {
122 c.wgEvents.Done()
123 continue
124 }
125
126 setTimeout()
127 err := wsutil.WriteServerMessage(c.conn, ws.OpText, event)
128 if err != nil {
129 c.Terminate(err.Error())
130 c.wgEvents.Done()
131 continue
132 }
133
134 if c.verbose && !bytes.HasPrefix(event, []byte(`{"Type":"ping"`)) && !bytes.HasPrefix(event, []byte(`{"Type":"list"`)) {
135 log.Printf("-> %s", event)
136 }
137 c.wgEvents.Done()
138 }
139 }
140
141 func (c *webSocketClient) Terminate(reason string) {
142 if c.terminated {
143 return
144 }
145 c.terminated = true
146 c.conn.Close()
147 }
148
149 func (c *webSocketClient) Terminated() bool {
150 return c.terminated
151 }
152
View as plain text