...

Source file src/code.rocket9labs.com/tslocum/bgammon/pkg/server/client_websocket.go

Documentation: code.rocket9labs.com/tslocum/bgammon/pkg/server

     1  package server
     2  
     3  import (
     4  	"bytes"
     5  	"log"
     6  	"net"
     7  	"net/http"
     8  	"sync"
     9  	"time"
    10  
    11  	"code.rocket9labs.com/tslocum/bgammon"
    12  	"github.com/gobwas/ws"
    13  	"github.com/gobwas/ws/wsutil"
    14  )
    15  
    16  var _ bgammon.Client = &webSocketClient{}
    17  
    18  type webSocketClient struct {
    19  	conn       net.Conn
    20  	events     chan []byte
    21  	commands   chan<- []byte
    22  	terminated bool
    23  	wgEvents   sync.WaitGroup
    24  	verbose    bool
    25  }
    26  
    27  func newWebSocketClient(r *http.Request, w http.ResponseWriter, commands chan<- []byte, events chan []byte, verbose bool) *webSocketClient {
    28  	conn, _, _, err := ws.UpgradeHTTP(r, w)
    29  	if err != nil {
    30  		return nil
    31  	}
    32  
    33  	return &webSocketClient{
    34  		conn:     conn,
    35  		events:   events,
    36  		commands: commands,
    37  		verbose:  verbose,
    38  	}
    39  }
    40  
    41  func (c *webSocketClient) HandleReadWrite() {
    42  	if c.terminated {
    43  		return
    44  	}
    45  
    46  	closeWrite := make(chan struct{}, 1)
    47  
    48  	go c.writeEvents(closeWrite)
    49  	c.readCommands()
    50  
    51  	closeWrite <- struct{}{}
    52  }
    53  
    54  func (c *webSocketClient) Write(message []byte) {
    55  	if c.terminated {
    56  		return
    57  	}
    58  
    59  	c.wgEvents.Add(1)
    60  	c.events <- message
    61  }
    62  
    63  func (c *webSocketClient) readCommands() {
    64  	setTimeout := func() {
    65  		err := c.conn.SetReadDeadline(time.Now().Add(clientTimeout))
    66  		if err != nil {
    67  			c.Terminate(err.Error())
    68  			return
    69  		}
    70  	}
    71  
    72  	for {
    73  		if c.terminated {
    74  			return
    75  		}
    76  
    77  		setTimeout()
    78  		msg, op, err := wsutil.ReadClientData(c.conn)
    79  		if err != nil {
    80  			c.Terminate(err.Error())
    81  			return
    82  		} else if op != ws.OpText {
    83  			continue
    84  		}
    85  
    86  		buf := make([]byte, len(msg))
    87  		copy(buf, msg)
    88  		c.commands <- buf
    89  
    90  		if c.verbose {
    91  			logClientRead(msg)
    92  		}
    93  	}
    94  }
    95  
    96  func (c *webSocketClient) writeEvents(closeWrite chan struct{}) {
    97  	setTimeout := func() {
    98  		err := c.conn.SetWriteDeadline(time.Now().Add(clientTimeout))
    99  		if err != nil {
   100  			c.Terminate(err.Error())
   101  			return
   102  		}
   103  	}
   104  
   105  	setTimeout()
   106  	var event []byte
   107  	for {
   108  		select {
   109  		case <-closeWrite:
   110  			for {
   111  				select {
   112  				case <-c.events:
   113  					c.wgEvents.Done()
   114  				default:
   115  					return
   116  				}
   117  			}
   118  		case event = <-c.events:
   119  		}
   120  
   121  		if c.terminated {
   122  			c.wgEvents.Done()
   123  			continue
   124  		}
   125  
   126  		setTimeout()
   127  		err := wsutil.WriteServerMessage(c.conn, ws.OpText, event)
   128  		if err != nil {
   129  			c.Terminate(err.Error())
   130  			c.wgEvents.Done()
   131  			continue
   132  		}
   133  
   134  		if c.verbose && !bytes.HasPrefix(event, []byte(`{"Type":"ping"`)) && !bytes.HasPrefix(event, []byte(`{"Type":"list"`)) {
   135  			log.Printf("-> %s", event)
   136  		}
   137  		c.wgEvents.Done()
   138  	}
   139  }
   140  
   141  func (c *webSocketClient) Terminate(reason string) {
   142  	if c.terminated {
   143  		return
   144  	}
   145  	c.terminated = true
   146  	c.conn.Close()
   147  }
   148  
   149  func (c *webSocketClient) Terminated() bool {
   150  	return c.terminated
   151  }
   152  

View as plain text