...

Source file src/code.rocket9labs.com/tslocum/bgammon/pkg/server/client_socket.go

Documentation: code.rocket9labs.com/tslocum/bgammon/pkg/server

     1  package server
     2  
     3  import (
     4  	"bufio"
     5  	"bytes"
     6  	"log"
     7  	"net"
     8  	"sync"
     9  	"time"
    10  
    11  	"code.rocket9labs.com/tslocum/bgammon"
    12  )
    13  
    14  var _ bgammon.Client = &socketClient{}
    15  
    16  type socketClient struct {
    17  	conn       net.Conn
    18  	events     chan []byte
    19  	commands   chan<- []byte
    20  	terminated bool
    21  	wgEvents   sync.WaitGroup
    22  	verbose    bool
    23  }
    24  
    25  func newSocketClient(conn net.Conn, commands chan<- []byte, events chan []byte, verbose bool) *socketClient {
    26  	return &socketClient{
    27  		conn:     conn,
    28  		events:   events,
    29  		commands: commands,
    30  		verbose:  verbose,
    31  	}
    32  }
    33  
    34  func (c *socketClient) HandleReadWrite() {
    35  	if c.terminated {
    36  		return
    37  	}
    38  
    39  	closeWrite := make(chan struct{}, 1)
    40  
    41  	go c.writeEvents(closeWrite)
    42  	c.readCommands()
    43  
    44  	closeWrite <- struct{}{}
    45  }
    46  
    47  func (c *socketClient) Write(message []byte) {
    48  	if c.terminated {
    49  		return
    50  	}
    51  
    52  	c.wgEvents.Add(1)
    53  	c.events <- message
    54  }
    55  
    56  func (c *socketClient) readCommands() {
    57  	setTimeout := func() {
    58  		err := c.conn.SetReadDeadline(time.Now().Add(clientTimeout))
    59  		if err != nil {
    60  			c.Terminate(err.Error())
    61  			return
    62  		}
    63  	}
    64  
    65  	setTimeout()
    66  	var scanner = bufio.NewScanner(c.conn)
    67  	for scanner.Scan() {
    68  		if c.terminated {
    69  			return
    70  		}
    71  
    72  		if scanner.Err() != nil {
    73  			c.Terminate(scanner.Err().Error())
    74  			return
    75  		}
    76  
    77  		buf := make([]byte, len(scanner.Bytes()))
    78  		copy(buf, scanner.Bytes())
    79  		c.commands <- buf
    80  
    81  		if c.verbose {
    82  			logClientRead(scanner.Bytes())
    83  		}
    84  
    85  		setTimeout()
    86  	}
    87  }
    88  
    89  func (c *socketClient) writeEvents(closeWrite chan struct{}) {
    90  	setTimeout := func() {
    91  		err := c.conn.SetWriteDeadline(time.Now().Add(clientTimeout))
    92  		if err != nil {
    93  			c.Terminate(err.Error())
    94  			return
    95  		}
    96  	}
    97  
    98  	setTimeout()
    99  	var event []byte
   100  	for {
   101  		select {
   102  		case <-closeWrite:
   103  			for {
   104  				select {
   105  				case <-c.events:
   106  					c.wgEvents.Done()
   107  				default:
   108  					return
   109  				}
   110  			}
   111  		case event = <-c.events:
   112  		}
   113  
   114  		if c.terminated {
   115  			c.wgEvents.Done()
   116  			continue
   117  		}
   118  
   119  		setTimeout()
   120  		_, err := c.conn.Write(append(event, '\n'))
   121  		if err != nil {
   122  			c.Terminate(err.Error())
   123  			c.wgEvents.Done()
   124  			continue
   125  		}
   126  
   127  		if c.verbose && !bytes.HasPrefix(event, []byte(`{"Type":"ping"`)) && !bytes.HasPrefix(event, []byte(`{"Type":"list"`)) {
   128  			log.Printf("-> %s", event)
   129  		}
   130  		c.wgEvents.Done()
   131  	}
   132  }
   133  
   134  func (c *socketClient) Terminate(reason string) {
   135  	if c.terminated {
   136  		return
   137  	}
   138  	c.terminated = true
   139  	c.conn.Close()
   140  }
   141  
   142  func (c *socketClient) Terminated() bool {
   143  	return c.terminated
   144  }
   145  

View as plain text