1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
|
20 | |
package com.allanbank.mongodb.client.connection.socket; |
21 | |
|
22 | |
import java.io.IOException; |
23 | |
import java.net.SocketException; |
24 | |
|
25 | |
import com.allanbank.mongodb.MongoClientConfiguration; |
26 | |
import com.allanbank.mongodb.MongoDbException; |
27 | |
import com.allanbank.mongodb.bson.io.BufferingBsonOutputStream; |
28 | |
import com.allanbank.mongodb.bson.io.RandomAccessOutputStream; |
29 | |
import com.allanbank.mongodb.bson.io.StringDecoderCache; |
30 | |
import com.allanbank.mongodb.bson.io.StringEncoderCache; |
31 | |
import com.allanbank.mongodb.client.Message; |
32 | |
import com.allanbank.mongodb.client.callback.AddressAware; |
33 | |
import com.allanbank.mongodb.client.callback.ReplyCallback; |
34 | |
import com.allanbank.mongodb.client.message.BuildInfo; |
35 | |
import com.allanbank.mongodb.client.message.PendingMessage; |
36 | |
import com.allanbank.mongodb.client.message.PendingMessageQueue; |
37 | |
import com.allanbank.mongodb.client.state.Server; |
38 | |
import com.allanbank.mongodb.client.state.ServerUpdateCallback; |
39 | |
import com.allanbank.mongodb.util.IOUtils; |
40 | |
|
41 | |
|
42 | |
|
43 | |
|
44 | |
|
45 | |
|
46 | |
|
47 | |
|
48 | |
|
49 | |
|
50 | |
|
51 | |
|
52 | |
|
53 | |
|
54 | |
|
55 | |
|
56 | |
|
57 | |
|
58 | |
public class TwoThreadSocketConnection extends AbstractSocketConnection { |
59 | |
|
60 | |
|
61 | |
protected final BufferingBsonOutputStream myBsonOut; |
62 | |
|
63 | |
|
64 | |
protected final PendingMessageQueue myToSendQueue; |
65 | |
|
66 | |
|
67 | |
private final Thread myReceiver; |
68 | |
|
69 | |
|
70 | |
private final Thread mySender; |
71 | |
|
72 | |
|
73 | |
|
74 | |
|
75 | |
|
76 | |
|
77 | |
|
78 | |
|
79 | |
|
80 | |
|
81 | |
|
82 | |
|
83 | |
|
84 | |
public TwoThreadSocketConnection(final Server server, |
85 | |
final MongoClientConfiguration config) throws SocketException, |
86 | |
IOException { |
87 | 39 | this(server, config, new StringEncoderCache(), new StringDecoderCache()); |
88 | 36 | } |
89 | |
|
90 | |
|
91 | |
|
92 | |
|
93 | |
|
94 | |
|
95 | |
|
96 | |
|
97 | |
|
98 | |
|
99 | |
|
100 | |
|
101 | |
|
102 | |
|
103 | |
|
104 | |
|
105 | |
|
106 | |
public TwoThreadSocketConnection(final Server server, |
107 | |
final MongoClientConfiguration config, |
108 | |
final StringEncoderCache encoderCache, |
109 | |
final StringDecoderCache decoderCache) throws SocketException, |
110 | |
IOException { |
111 | 40 | super(server, config, encoderCache, decoderCache); |
112 | |
|
113 | 37 | myBsonOut = new BufferingBsonOutputStream(new RandomAccessOutputStream( |
114 | |
encoderCache)); |
115 | |
|
116 | 37 | myToSendQueue = new PendingMessageQueue( |
117 | |
config.getMaxPendingOperationsPerConnection(), |
118 | |
config.getLockType()); |
119 | |
|
120 | 37 | myReceiver = config.getThreadFactory().newThread( |
121 | |
new ReceiveRunnable(this)); |
122 | 37 | myReceiver.setDaemon(true); |
123 | 37 | myReceiver.setName("MongoDB " + mySocket.getLocalPort() + "<--" |
124 | |
+ myServer.getCanonicalName()); |
125 | |
|
126 | 37 | mySender = config.getThreadFactory().newThread(new SendRunnable()); |
127 | 37 | mySender.setDaemon(true); |
128 | 37 | mySender.setName("MongoDB " + mySocket.getLocalPort() + "-->" |
129 | |
+ myServer.getCanonicalName()); |
130 | 37 | } |
131 | |
|
132 | |
|
133 | |
|
134 | |
|
135 | |
@Override |
136 | |
public void close() throws IOException { |
137 | 150 | final boolean wasOpen = myOpen.get(); |
138 | 150 | myOpen.set(false); |
139 | |
|
140 | 150 | mySender.interrupt(); |
141 | 150 | myReceiver.interrupt(); |
142 | |
|
143 | |
try { |
144 | 150 | if (Thread.currentThread() != mySender) { |
145 | 113 | mySender.join(); |
146 | |
} |
147 | |
} |
148 | 13 | catch (final InterruptedException ie) { |
149 | |
|
150 | |
} |
151 | |
finally { |
152 | |
|
153 | |
|
154 | 150 | myOutput.close(); |
155 | 150 | myInput.close(); |
156 | 150 | mySocket.close(); |
157 | 150 | } |
158 | |
|
159 | |
try { |
160 | 150 | if (Thread.currentThread() != myReceiver) { |
161 | 76 | myReceiver.join(); |
162 | |
} |
163 | |
} |
164 | 37 | catch (final InterruptedException ie) { |
165 | |
|
166 | 113 | } |
167 | |
|
168 | 150 | myEventSupport.firePropertyChange(OPEN_PROP_NAME, wasOpen, false); |
169 | 150 | } |
170 | |
|
171 | |
|
172 | |
|
173 | |
|
174 | |
@Override |
175 | |
public int getPendingCount() { |
176 | 2 | return super.getPendingCount() + myToSendQueue.size(); |
177 | |
} |
178 | |
|
179 | |
|
180 | |
|
181 | |
|
182 | |
|
183 | |
|
184 | |
|
185 | |
@Override |
186 | |
public boolean isIdle() { |
187 | 10 | return super.isIdle() && myToSendQueue.isEmpty(); |
188 | |
} |
189 | |
|
190 | |
|
191 | |
|
192 | |
|
193 | |
|
194 | |
|
195 | |
|
196 | |
@Override |
197 | |
public void raiseErrors(final MongoDbException exception) { |
198 | 1 | final PendingMessage message = new PendingMessage(); |
199 | 2 | while (myToSendQueue.poll(message)) { |
200 | 1 | raiseError(exception, message.getReplyCallback()); |
201 | |
} |
202 | |
|
203 | 1 | super.raiseErrors(exception); |
204 | 1 | } |
205 | |
|
206 | |
|
207 | |
|
208 | |
|
209 | |
@Override |
210 | |
public void send(final Message message1, final Message message2, |
211 | |
final ReplyCallback replyCallback) throws MongoDbException { |
212 | |
|
213 | 1 | validate(message1, message2); |
214 | |
|
215 | 1 | if (replyCallback instanceof AddressAware) { |
216 | 0 | ((AddressAware) replyCallback).setAddress(myServer |
217 | |
.getCanonicalName()); |
218 | |
} |
219 | |
|
220 | |
try { |
221 | 1 | myToSendQueue.put(message1, null, message2, replyCallback); |
222 | |
} |
223 | 0 | catch (final InterruptedException e) { |
224 | 0 | throw new MongoDbException(e); |
225 | 1 | } |
226 | 1 | } |
227 | |
|
228 | |
|
229 | |
|
230 | |
|
231 | |
@Override |
232 | |
public void send(final Message message, final ReplyCallback replyCallback) |
233 | |
throws MongoDbException { |
234 | |
|
235 | 34 | validate(message, null); |
236 | |
|
237 | 34 | if (replyCallback instanceof AddressAware) { |
238 | 0 | ((AddressAware) replyCallback).setAddress(myServer |
239 | |
.getCanonicalName()); |
240 | |
} |
241 | |
|
242 | |
try { |
243 | 34 | myToSendQueue.put(message, replyCallback); |
244 | |
} |
245 | 0 | catch (final InterruptedException e) { |
246 | 0 | throw new MongoDbException(e); |
247 | 34 | } |
248 | 34 | } |
249 | |
|
250 | |
|
251 | |
|
252 | |
|
253 | |
@Override |
254 | |
public void start() { |
255 | 37 | myReceiver.start(); |
256 | 37 | mySender.start(); |
257 | |
|
258 | 37 | if (myServer.needBuildInfo()) { |
259 | 0 | send(new BuildInfo(), new ServerUpdateCallback(myServer)); |
260 | |
} |
261 | 37 | } |
262 | |
|
263 | |
|
264 | |
|
265 | |
|
266 | |
|
267 | |
|
268 | 37 | protected class SendRunnable implements Runnable { |
269 | |
|
270 | |
|
271 | 37 | private boolean myNeedToFlush = false; |
272 | |
|
273 | |
|
274 | 37 | private final PendingMessage myPendingMessage = new PendingMessage(); |
275 | |
|
276 | |
|
277 | |
|
278 | |
|
279 | |
|
280 | |
|
281 | |
|
282 | |
|
283 | |
|
284 | |
|
285 | |
|
286 | |
|
287 | |
|
288 | |
@Override |
289 | |
public void run() { |
290 | 37 | boolean sawError = false; |
291 | |
try { |
292 | 135 | while (myOpen.get() && !sawError) { |
293 | |
try { |
294 | 98 | sendOne(); |
295 | |
} |
296 | 33 | catch (final InterruptedException ie) { |
297 | |
|
298 | |
|
299 | 33 | raiseError(ie, myPendingMessage.getReplyCallback()); |
300 | |
} |
301 | 1 | catch (final IOException ioe) { |
302 | 1 | myLog.warn(ioe, "I/O Error sending a message."); |
303 | 1 | raiseError(ioe, myPendingMessage.getReplyCallback()); |
304 | 1 | sawError = true; |
305 | |
} |
306 | 1 | catch (final RuntimeException re) { |
307 | 1 | myLog.warn(re, "Runtime error sending a message."); |
308 | 1 | raiseError(re, myPendingMessage.getReplyCallback()); |
309 | 1 | sawError = true; |
310 | |
} |
311 | 1 | catch (final Error error) { |
312 | 1 | myLog.error(error, "Error sending a message."); |
313 | 1 | raiseError(error, myPendingMessage.getReplyCallback()); |
314 | 1 | sawError = true; |
315 | |
} |
316 | |
finally { |
317 | 98 | myPendingMessage.clear(); |
318 | 98 | } |
319 | |
} |
320 | |
} |
321 | |
finally { |
322 | |
|
323 | 0 | try { |
324 | 37 | if (myOpen.get()) { |
325 | 3 | doFlush(); |
326 | |
} |
327 | |
} |
328 | 0 | catch (final IOException ioe) { |
329 | 0 | myLog.warn(ioe, "I/O Error on final flush of messages."); |
330 | |
} |
331 | |
finally { |
332 | |
|
333 | 37 | IOUtils.close(TwoThreadSocketConnection.this); |
334 | 37 | } |
335 | 37 | } |
336 | 37 | } |
337 | |
|
338 | |
|
339 | |
|
340 | |
|
341 | |
|
342 | |
|
343 | |
|
344 | |
protected final void doFlush() throws IOException { |
345 | 33 | if (myNeedToFlush) { |
346 | 33 | flush(); |
347 | 33 | myNeedToFlush = false; |
348 | |
} |
349 | 33 | } |
350 | |
|
351 | |
|
352 | |
|
353 | |
|
354 | |
|
355 | |
|
356 | |
|
357 | |
|
358 | |
|
359 | |
|
360 | |
protected final void sendOne() throws InterruptedException, IOException { |
361 | 98 | boolean took = false; |
362 | 98 | if (myNeedToFlush) { |
363 | 31 | took = myToSendQueue.poll(myPendingMessage); |
364 | |
} |
365 | |
else { |
366 | 67 | myToSendQueue.take(myPendingMessage); |
367 | 34 | took = true; |
368 | |
} |
369 | |
|
370 | 65 | if (took) { |
371 | 35 | myNeedToFlush = true; |
372 | |
|
373 | 35 | myPendingMessage.getMessage().write( |
374 | |
myPendingMessage.getMessageId(), myBsonOut); |
375 | |
|
376 | 32 | send(myPendingMessage, myBsonOut.getOutput()); |
377 | |
|
378 | |
|
379 | |
|
380 | |
|
381 | |
|
382 | 32 | myPendingMessage.clear(); |
383 | |
} |
384 | |
else { |
385 | 30 | doFlush(); |
386 | |
} |
387 | 62 | } |
388 | |
} |
389 | |
} |