...
1 package server
2
3 import (
4 "bufio"
5 "bytes"
6 "log"
7 "net"
8 "sync"
9 "time"
10
11 "code.rocket9labs.com/tslocum/bgammon"
12 )
13
14 var _ bgammon.Client = &socketClient{}
15
16 type socketClient struct {
17 conn net.Conn
18 events chan []byte
19 commands chan<- []byte
20 terminated bool
21 wgEvents sync.WaitGroup
22 verbose bool
23 }
24
25 func newSocketClient(conn net.Conn, commands chan<- []byte, events chan []byte, verbose bool) *socketClient {
26 return &socketClient{
27 conn: conn,
28 events: events,
29 commands: commands,
30 verbose: verbose,
31 }
32 }
33
34 func (c *socketClient) HandleReadWrite() {
35 if c.terminated {
36 return
37 }
38
39 closeWrite := make(chan struct{}, 1)
40
41 go c.writeEvents(closeWrite)
42 c.readCommands()
43
44 closeWrite <- struct{}{}
45 }
46
47 func (c *socketClient) Write(message []byte) {
48 if c.terminated {
49 return
50 }
51
52 c.wgEvents.Add(1)
53 c.events <- message
54 }
55
56 func (c *socketClient) readCommands() {
57 setTimeout := func() {
58 err := c.conn.SetReadDeadline(time.Now().Add(clientTimeout))
59 if err != nil {
60 c.Terminate(err.Error())
61 return
62 }
63 }
64
65 setTimeout()
66 var scanner = bufio.NewScanner(c.conn)
67 for scanner.Scan() {
68 if c.terminated {
69 return
70 }
71
72 if scanner.Err() != nil {
73 c.Terminate(scanner.Err().Error())
74 return
75 }
76
77 buf := make([]byte, len(scanner.Bytes()))
78 copy(buf, scanner.Bytes())
79 c.commands <- buf
80
81 if c.verbose {
82 logClientRead(scanner.Bytes())
83 }
84
85 setTimeout()
86 }
87 }
88
89 func (c *socketClient) writeEvents(closeWrite chan struct{}) {
90 setTimeout := func() {
91 err := c.conn.SetWriteDeadline(time.Now().Add(clientTimeout))
92 if err != nil {
93 c.Terminate(err.Error())
94 return
95 }
96 }
97
98 setTimeout()
99 var event []byte
100 for {
101 select {
102 case <-closeWrite:
103 for {
104 select {
105 case <-c.events:
106 c.wgEvents.Done()
107 default:
108 return
109 }
110 }
111 case event = <-c.events:
112 }
113
114 if c.terminated {
115 c.wgEvents.Done()
116 continue
117 }
118
119 setTimeout()
120 _, err := c.conn.Write(append(event, '\n'))
121 if err != nil {
122 c.Terminate(err.Error())
123 c.wgEvents.Done()
124 continue
125 }
126
127 if c.verbose && !bytes.HasPrefix(event, []byte(`{"Type":"ping"`)) && !bytes.HasPrefix(event, []byte(`{"Type":"list"`)) {
128 log.Printf("-> %s", event)
129 }
130 c.wgEvents.Done()
131 }
132 }
133
134 func (c *socketClient) Terminate(reason string) {
135 if c.terminated {
136 return
137 }
138 c.terminated = true
139 c.conn.Close()
140 }
141
142 func (c *socketClient) Terminated() bool {
143 return c.terminated
144 }
145
View as plain text