1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package com.allanbank.mongodb.client.connection.socket;
21
22 import java.io.IOException;
23 import java.net.SocketException;
24
25 import com.allanbank.mongodb.MongoClientConfiguration;
26 import com.allanbank.mongodb.MongoDbException;
27 import com.allanbank.mongodb.bson.io.BufferingBsonOutputStream;
28 import com.allanbank.mongodb.bson.io.RandomAccessOutputStream;
29 import com.allanbank.mongodb.bson.io.StringDecoderCache;
30 import com.allanbank.mongodb.bson.io.StringEncoderCache;
31 import com.allanbank.mongodb.client.Message;
32 import com.allanbank.mongodb.client.callback.AddressAware;
33 import com.allanbank.mongodb.client.callback.ReplyCallback;
34 import com.allanbank.mongodb.client.message.BuildInfo;
35 import com.allanbank.mongodb.client.message.PendingMessage;
36 import com.allanbank.mongodb.client.message.PendingMessageQueue;
37 import com.allanbank.mongodb.client.state.Server;
38 import com.allanbank.mongodb.client.state.ServerUpdateCallback;
39 import com.allanbank.mongodb.util.IOUtils;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 public class TwoThreadSocketConnection extends AbstractSocketConnection {
59
60
61 protected final BufferingBsonOutputStream myBsonOut;
62
63
64 protected final PendingMessageQueue myToSendQueue;
65
66
67 private final Thread myReceiver;
68
69
70 private final Thread mySender;
71
72
73
74
75
76
77
78
79
80
81
82
83
84 public TwoThreadSocketConnection(final Server server,
85 final MongoClientConfiguration config) throws SocketException,
86 IOException {
87 this(server, config, new StringEncoderCache(), new StringDecoderCache());
88 }
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106 public TwoThreadSocketConnection(final Server server,
107 final MongoClientConfiguration config,
108 final StringEncoderCache encoderCache,
109 final StringDecoderCache decoderCache) throws SocketException,
110 IOException {
111 super(server, config, encoderCache, decoderCache);
112
113 myBsonOut = new BufferingBsonOutputStream(new RandomAccessOutputStream(
114 encoderCache));
115
116 myToSendQueue = new PendingMessageQueue(
117 config.getMaxPendingOperationsPerConnection(),
118 config.getLockType());
119
120 myReceiver = config.getThreadFactory().newThread(
121 new ReceiveRunnable(this));
122 myReceiver.setDaemon(true);
123 myReceiver.setName("MongoDB " + mySocket.getLocalPort() + "<--"
124 + myServer.getCanonicalName());
125
126 mySender = config.getThreadFactory().newThread(new SendRunnable());
127 mySender.setDaemon(true);
128 mySender.setName("MongoDB " + mySocket.getLocalPort() + "-->"
129 + myServer.getCanonicalName());
130 }
131
132
133
134
135 @Override
136 public void close() throws IOException {
137 final boolean wasOpen = myOpen.get();
138 myOpen.set(false);
139
140 mySender.interrupt();
141 myReceiver.interrupt();
142
143 try {
144 if (Thread.currentThread() != mySender) {
145 mySender.join();
146 }
147 }
148 catch (final InterruptedException ie) {
149
150 }
151 finally {
152
153
154 myOutput.close();
155 myInput.close();
156 mySocket.close();
157 }
158
159 try {
160 if (Thread.currentThread() != myReceiver) {
161 myReceiver.join();
162 }
163 }
164 catch (final InterruptedException ie) {
165
166 }
167
168 myEventSupport.firePropertyChange(OPEN_PROP_NAME, wasOpen, false);
169 }
170
171
172
173
174 @Override
175 public int getPendingCount() {
176 return super.getPendingCount() + myToSendQueue.size();
177 }
178
179
180
181
182
183
184
185 @Override
186 public boolean isIdle() {
187 return super.isIdle() && myToSendQueue.isEmpty();
188 }
189
190
191
192
193
194
195
196 @Override
197 public void raiseErrors(final MongoDbException exception) {
198 final PendingMessage message = new PendingMessage();
199 while (myToSendQueue.poll(message)) {
200 raiseError(exception, message.getReplyCallback());
201 }
202
203 super.raiseErrors(exception);
204 }
205
206
207
208
209 @Override
210 public void send(final Message message1, final Message message2,
211 final ReplyCallback replyCallback) throws MongoDbException {
212
213 validate(message1, message2);
214
215 if (replyCallback instanceof AddressAware) {
216 ((AddressAware) replyCallback).setAddress(myServer
217 .getCanonicalName());
218 }
219
220 try {
221 myToSendQueue.put(message1, null, message2, replyCallback);
222 }
223 catch (final InterruptedException e) {
224 throw new MongoDbException(e);
225 }
226 }
227
228
229
230
231 @Override
232 public void send(final Message message, final ReplyCallback replyCallback)
233 throws MongoDbException {
234
235 validate(message, null);
236
237 if (replyCallback instanceof AddressAware) {
238 ((AddressAware) replyCallback).setAddress(myServer
239 .getCanonicalName());
240 }
241
242 try {
243 myToSendQueue.put(message, replyCallback);
244 }
245 catch (final InterruptedException e) {
246 throw new MongoDbException(e);
247 }
248 }
249
250
251
252
253 @Override
254 public void start() {
255 myReceiver.start();
256 mySender.start();
257
258 if (myServer.needBuildInfo()) {
259 send(new BuildInfo(), new ServerUpdateCallback(myServer));
260 }
261 }
262
263
264
265
266
267
268 protected class SendRunnable implements Runnable {
269
270
271 private boolean myNeedToFlush = false;
272
273
274 private final PendingMessage myPendingMessage = new PendingMessage();
275
276
277
278
279
280
281
282
283
284
285
286
287
288 @Override
289 public void run() {
290 boolean sawError = false;
291 try {
292 while (myOpen.get() && !sawError) {
293 try {
294 sendOne();
295 }
296 catch (final InterruptedException ie) {
297
298
299 raiseError(ie, myPendingMessage.getReplyCallback());
300 }
301 catch (final IOException ioe) {
302 myLog.warn(ioe, "I/O Error sending a message.");
303 raiseError(ioe, myPendingMessage.getReplyCallback());
304 sawError = true;
305 }
306 catch (final RuntimeException re) {
307 myLog.warn(re, "Runtime error sending a message.");
308 raiseError(re, myPendingMessage.getReplyCallback());
309 sawError = true;
310 }
311 catch (final Error error) {
312 myLog.error(error, "Error sending a message.");
313 raiseError(error, myPendingMessage.getReplyCallback());
314 sawError = true;
315 }
316 finally {
317 myPendingMessage.clear();
318 }
319 }
320 }
321 finally {
322
323 try {
324 if (myOpen.get()) {
325 doFlush();
326 }
327 }
328 catch (final IOException ioe) {
329 myLog.warn(ioe, "I/O Error on final flush of messages.");
330 }
331 finally {
332
333 IOUtils.close(TwoThreadSocketConnection.this);
334 }
335 }
336 }
337
338
339
340
341
342
343
344 protected final void doFlush() throws IOException {
345 if (myNeedToFlush) {
346 flush();
347 myNeedToFlush = false;
348 }
349 }
350
351
352
353
354
355
356
357
358
359
360 protected final void sendOne() throws InterruptedException, IOException {
361 boolean took = false;
362 if (myNeedToFlush) {
363 took = myToSendQueue.poll(myPendingMessage);
364 }
365 else {
366 myToSendQueue.take(myPendingMessage);
367 took = true;
368 }
369
370 if (took) {
371 myNeedToFlush = true;
372
373 myPendingMessage.getMessage().write(
374 myPendingMessage.getMessageId(), myBsonOut);
375
376 send(myPendingMessage, myBsonOut.getOutput());
377
378
379
380
381
382 myPendingMessage.clear();
383 }
384 else {
385 doFlush();
386 }
387 }
388 }
389 }