|
21 | 21 | package thrift |
22 | 22 |
|
23 | 23 | import ( |
24 | | - "net" |
25 | | - "sync" |
26 | | - "time" |
| 24 | +"net" |
| 25 | +"sync" |
| 26 | +"time" |
27 | 27 | ) |
28 | 28 |
|
29 | 29 | type TServerSocket struct { |
30 | | - // TServerSocketListenerFactory abstracts how listeners are created. |
31 | | - listenerFactory func(net.Addr) (net.Listener, error) |
32 | | - addr net.Addr |
33 | | - clientTimeout time.Duration |
34 | | - |
35 | | - // Protects the listener and interrupted fields to make them thread safe. |
36 | | - mu sync.RWMutex |
37 | | - listener net.Listener |
38 | | - interrupted bool |
| 30 | +// TServerSocketListenerFactory abstracts how listeners are created. |
| 31 | +listenerFactory func(net.Addr) (net.Listener, error) |
| 32 | +addr net.Addr |
| 33 | +clientTimeout time.Duration |
| 34 | + |
| 35 | +// Protects the listener and interrupted fields to make them thread safe. |
| 36 | +mu sync.RWMutex |
| 37 | +listener net.Listener |
| 38 | +interrupted bool |
39 | 39 | } |
40 | 40 |
|
41 | 41 | func NewTServerSocket(listenAddr string) (*TServerSocket, error) { |
42 | | - return NewTServerSocketTimeout(listenAddr, 0) |
| 42 | +return NewTServerSocketTimeout(listenAddr, 0) |
43 | 43 | } |
44 | 44 |
|
45 | 45 | func NewTServerSocketTimeout(listenAddr string, clientTimeout time.Duration) (*TServerSocket, error) { |
46 | | - addr, err := net.ResolveTCPAddr("tcp", listenAddr) |
47 | | - if err != nil { |
48 | | - return nil, err |
49 | | - } |
| 46 | +addr, err := net.ResolveTCPAddr("tcp", listenAddr) |
| 47 | +if err != nil { |
| 48 | +return nil, err |
| 49 | +} |
50 | 50 |
|
51 | | - return NewTServerSocketFromAddrTimeout(addr, clientTimeout), nil |
| 51 | +return NewTServerSocketFromAddrTimeout(addr, clientTimeout), nil |
52 | 52 | } |
53 | 53 |
|
54 | 54 | // NewTServerSocketFromAddrTimeout returns TServerSocket |
55 | 55 | // Creates a TServerSocket from a net.Addr |
56 | 56 | func NewTServerSocketFromAddrTimeout(addr net.Addr, clientTimeout time.Duration) *TServerSocket { |
57 | | - listenerFactory := func(addr net.Addr) (net.Listener, error) { |
58 | | - return net.Listen(addr.Network(), addr.String()) |
59 | | - } |
| 57 | +listenerFactory := func(addr net.Addr) (net.Listener, error) { |
| 58 | +return net.Listen(addr.Network(), addr.String()) |
| 59 | +} |
60 | 60 |
|
61 | | - return NewTServerSocketFromFactoryTimeout(listenerFactory, addr, clientTimeout) |
| 61 | +return NewTServerSocketFromFactoryTimeout(listenerFactory, addr, clientTimeout) |
62 | 62 | } |
63 | 63 |
|
64 | 64 | // NewTServerSocketFromFactoryTimeout creates TServerSocket via a listener factory. |
65 | 65 | // |
66 | 66 | // Allows full customization (TLS, mocks, unix sockets, windows named pipes, etc.) |
67 | 67 | func NewTServerSocketFromFactoryTimeout(listenerFactory func(addr net.Addr) (listener net.Listener, err error), addr net.Addr, clientTimeout time.Duration) *TServerSocket { |
68 | | - return &TServerSocket{ |
69 | | - listenerFactory: factory, |
70 | | - addr: addr, |
71 | | - clientTimeout: clientTimeout, |
72 | | - } |
| 68 | +return &TServerSocket{ |
| 69 | +listenerFactory: factory, |
| 70 | +addr: addr, |
| 71 | +clientTimeout: clientTimeout, |
| 72 | +} |
73 | 73 | } |
74 | 74 |
|
75 | 75 | func (p *TServerSocket) try_listen(raise bool) error { |
76 | | - p.mu.Lock() |
77 | | - defer p.mu.Unlock() |
| 76 | +p.mu.Lock() |
| 77 | +defer p.mu.Unlock() |
78 | 78 |
|
79 | | - if p.listener != nil { |
80 | | - if (raise) { |
81 | | - return NewTTransportException(ALREADY_OPEN, "Server socket already open") |
82 | | - } |
83 | | - return nil |
84 | | - } |
| 79 | +if p.listener != nil { |
| 80 | +if (raise) { |
| 81 | +return NewTTransportException(ALREADY_OPEN, "Server socket already open") |
| 82 | +} |
| 83 | +return nil |
| 84 | +} |
85 | 85 |
|
86 | | - l, err := p.listenerFactory(p.addr) |
87 | | - if err != nil { |
88 | | - return err |
89 | | - } |
| 86 | +l, err := p.listenerFactory(p.addr) |
| 87 | +if err != nil { |
| 88 | +return err |
| 89 | +} |
90 | 90 |
|
91 | | - p.listener = l |
92 | | - p.interrupted = false |
93 | | - return nil |
| 91 | +p.listener = l |
| 92 | +p.interrupted = false |
| 93 | +return nil |
94 | 94 | } |
95 | 95 |
|
96 | 96 | // Open does try to listen and return on failure |
97 | 97 | // Connects the socket, creating a new socket object if necessary. |
98 | 98 | func (p *TServerSocket) Open() error { |
99 | | - return p.try_listen(true /* raise error if listening */) |
| 99 | +return p.try_listen(true /* raise error if listening */) |
100 | 100 | } |
101 | 101 |
|
102 | 102 | func (p *TServerSocket) Listen() error { |
103 | | - return p.try_listen(false /* do not raise error if listening */) |
| 103 | +return p.try_listen(false /* do not raise error if listening */) |
104 | 104 | } |
105 | 105 |
|
106 | 106 | func (p *TServerSocket) Accept() (TTransport, error) { |
107 | | - p.mu.RLock() |
108 | | - interrupted := p.interrupted |
109 | | - listener := p.listener |
110 | | - p.mu.RUnlock() |
| 107 | +p.mu.RLock() |
| 108 | +interrupted := p.interrupted |
| 109 | +listener := p.listener |
| 110 | +p.mu.RUnlock() |
111 | 111 |
|
112 | | - if interrupted { |
113 | | - return nil, errTransportInterrupted |
114 | | - } |
| 112 | +if interrupted { |
| 113 | +return nil, errTransportInterrupted |
| 114 | +} |
115 | 115 |
|
116 | | - if listener == nil { |
117 | | - return nil, NewTTransportException(NOT_OPEN, "No underlying server socket") |
118 | | - } |
| 116 | +if listener == nil { |
| 117 | +return nil, NewTTransportException(NOT_OPEN, "No underlying server socket") |
| 118 | +} |
119 | 119 |
|
120 | | - conn, err := listener.Accept() |
121 | | - if err != nil { |
122 | | - return nil, NewTTransportExceptionFromError(err) |
123 | | - } |
124 | | - return NewTSocketFromConnTimeout(conn, p.clientTimeout), nil |
| 120 | +conn, err := listener.Accept() |
| 121 | +if err != nil { |
| 122 | +return nil, NewTTransportExceptionFromError(err) |
| 123 | +} |
| 124 | +return NewTSocketFromConnTimeout(conn, p.clientTimeout), nil |
125 | 125 | } |
126 | 126 |
|
127 | 127 | // IsListening returns listener != nil |
128 | 128 | // Checks whether the socket is listening. |
129 | 129 | func (p *TServerSocket) IsListening() bool { |
130 | | - p.mu.RLock() |
131 | | - defer p.mu.RUnlock() |
132 | | - return p.listener != nil |
| 130 | +p.mu.RLock() |
| 131 | +defer p.mu.RUnlock() |
| 132 | +return p.listener != nil |
133 | 133 | } |
134 | 134 |
|
135 | 135 | func (p *TServerSocket) Addr() net.Addr { |
136 | | - p.mu.RLock() |
137 | | - defer p.mu.RUnlock() |
| 136 | +p.mu.RLock() |
| 137 | +defer p.mu.RUnlock() |
138 | 138 |
|
139 | | - if p.listener != nil { |
140 | | - return p.listener.Addr() |
141 | | - } |
142 | | - return p.addr |
| 139 | +if p.listener != nil { |
| 140 | +return p.listener.Addr() |
| 141 | +} |
| 142 | +return p.addr |
143 | 143 | } |
144 | 144 |
|
145 | 145 | func (p *TServerSocket) try_close(interrupt bool) error { |
146 | | - p.mu.Lock() |
147 | | - defer p.mu.Unlock() |
148 | | - if (interrupt){ |
149 | | - p.interrupted = true |
150 | | - } |
| 146 | +p.mu.Lock() |
| 147 | +defer p.mu.Unlock() |
| 148 | +if (interrupt){ |
| 149 | +p.interrupted = true |
| 150 | +} |
151 | 151 |
|
152 | | - var err error = nil |
153 | | - if p.listener != nil { |
154 | | - err = p.listener.Close() |
155 | | - p.listener = nil |
156 | | - } |
157 | | - return err |
| 152 | +var err error = nil |
| 153 | +if p.listener != nil { |
| 154 | +err = p.listener.Close() |
| 155 | +p.listener = nil |
| 156 | +} |
| 157 | +return err |
158 | 158 | } |
159 | 159 |
|
160 | 160 | func (p *TServerSocket) Close() error { |
161 | | - return p.try_close(false /* do not set interrupted flag */) |
| 161 | +return p.try_close(false /* do not set interrupted flag */) |
162 | 162 | } |
163 | 163 |
|
164 | 164 | func (p *TServerSocket) Interrupt() error { |
165 | | - return p.try_close(true /* set interrupted flag */) |
| 165 | +return p.try_close(true /* set interrupted flag */) |
166 | 166 | } |
0 commit comments