1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package com.allanbank.mongodb.client.connection.socket;
21
22 import java.io.IOException;
23 import java.lang.ref.Reference;
24 import java.lang.ref.SoftReference;
25 import java.net.Socket;
26 import java.net.SocketException;
27
28 import com.allanbank.mongodb.MongoClientConfiguration;
29 import com.allanbank.mongodb.MongoDbException;
30 import com.allanbank.mongodb.bson.io.BufferingBsonOutputStream;
31 import com.allanbank.mongodb.bson.io.RandomAccessOutputStream;
32 import com.allanbank.mongodb.bson.io.StringDecoderCache;
33 import com.allanbank.mongodb.bson.io.StringEncoderCache;
34 import com.allanbank.mongodb.client.Message;
35 import com.allanbank.mongodb.client.callback.AddressAware;
36 import com.allanbank.mongodb.client.callback.ReplyCallback;
37 import com.allanbank.mongodb.client.message.BuildInfo;
38 import com.allanbank.mongodb.client.message.PendingMessage;
39 import com.allanbank.mongodb.client.state.Server;
40 import com.allanbank.mongodb.client.state.ServerUpdateCallback;
41 import com.allanbank.mongodb.util.IOUtils;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61 public class SocketConnection extends AbstractSocketConnection {
62
63
64
65
66
67 private final ThreadLocal<Reference<BufferingBsonOutputStream>> myBuffers;
68
69
70 private final Thread myReceiver;
71
72
73 private final Sequence mySendSequence;
74
75
76
77
78
79
80
81
82
83
84
85
86
87 public SocketConnection(final Server server,
88 final MongoClientConfiguration config) throws SocketException,
89 IOException {
90 this(server, config, new StringEncoderCache(),
91 new StringDecoderCache(),
92 new ThreadLocal<Reference<BufferingBsonOutputStream>>());
93 }
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115 public SocketConnection(final Server server,
116 final MongoClientConfiguration config,
117 final StringEncoderCache encoderCache,
118 final StringDecoderCache decoderCache,
119 final ThreadLocal<Reference<BufferingBsonOutputStream>> buffers)
120 throws SocketException, IOException {
121 super(server, config, encoderCache, decoderCache);
122
123 myBuffers = buffers;
124
125 mySendSequence = new Sequence(1L, config.getLockType());
126
127 myReceiver = config.getThreadFactory().newThread(
128 new ReceiveRunnable(this));
129 myReceiver.setDaemon(true);
130 myReceiver.setName("MongoDB " + mySocket.getLocalPort() + "<--"
131 + myServer.getCanonicalName());
132 }
133
134
135
136
137 @Override
138 public void close() throws IOException {
139 if (myOpen.compareAndSet(true, false)) {
140 myReceiver.interrupt();
141
142
143
144 myOutput.close();
145 myInput.close();
146 mySocket.close();
147
148 try {
149 if (Thread.currentThread() != myReceiver) {
150 myReceiver.join();
151 }
152 }
153 catch (final InterruptedException ie) {
154
155 }
156
157 myEventSupport.firePropertyChange(OPEN_PROP_NAME, true, false);
158 }
159 }
160
161
162
163
164
165
166
167 @Override
168 public int getPendingCount() {
169 return super.getPendingCount() + mySendSequence.getWaitersCount();
170 }
171
172
173
174
175
176
177
178 @Override
179 public boolean isIdle() {
180 return super.isIdle() && mySendSequence.isIdle();
181 }
182
183
184
185
186 @Override
187 public void send(final Message message1, final Message message2,
188 final ReplyCallback replyCallback) throws MongoDbException {
189
190 validate(message1, message2);
191
192 if (replyCallback instanceof AddressAware) {
193 ((AddressAware) replyCallback).setAddress(myServer
194 .getCanonicalName());
195 }
196
197 final int count = (message2 == null) ? 1 : 2;
198 final long seq = mySendSequence.reserve(count);
199 final long end = seq + count;
200
201 boolean sawError = false;
202 final PendingMessage pendingMessage = new PendingMessage();
203 try {
204
205
206
207 final Reference<BufferingBsonOutputStream> outRef = myBuffers.get();
208 BufferingBsonOutputStream out = (outRef != null) ? outRef.get()
209 : null;
210 if (out == null) {
211 out = new BufferingBsonOutputStream(
212 new RandomAccessOutputStream(myEncoderCache));
213 myBuffers
214 .set(new SoftReference<BufferingBsonOutputStream>(out));
215 }
216
217 message1.write((int) (seq & 0xFFFFFF), out);
218 if (message2 != null) {
219 message2.write((int) ((seq + 1) & 0xFFFFFF), out);
220 }
221
222
223 mySendSequence.waitFor(seq);
224
225 if (count == 1) {
226 pendingMessage.set((int) (seq & 0xFFFFFF), message1,
227 replyCallback);
228 send(pendingMessage, out.getOutput());
229 }
230 else {
231 pendingMessage.set((int) ((seq + 1) & 0xFFFFFF), message2,
232 replyCallback);
233 send(pendingMessage, out.getOutput());
234 }
235
236
237 if (mySendSequence.noWaiter(end)) {
238 if (myReceiver != Thread.currentThread()) {
239 flush();
240 }
241 else {
242 markReaderNeedsToFlush();
243 }
244 }
245 }
246 catch (final InterruptedException ie) {
247
248
249 raiseError(ie, pendingMessage.getReplyCallback());
250 }
251 catch (final IOException ioe) {
252 myLog.warn(ioe, "I/O Error sending a message.");
253 raiseError(ioe, pendingMessage.getReplyCallback());
254 sawError = true;
255 }
256 catch (final RuntimeException re) {
257 myLog.warn(re, "Runtime error sending a message.");
258 raiseError(re, pendingMessage.getReplyCallback());
259 sawError = true;
260 }
261 catch (final Error error) {
262 myLog.error(error, "Error sending a message.");
263 raiseError(error, pendingMessage.getReplyCallback());
264 sawError = true;
265 }
266 finally {
267 pendingMessage.clear();
268 mySendSequence.release(seq, end);
269
270 if (sawError) {
271
272 try {
273 if (myOpen.get()) {
274 flush();
275 }
276 }
277 catch (final IOException ioe) {
278 myLog.warn(ioe, "I/O Error on final flush of messages.");
279 }
280 finally {
281
282 IOUtils.close(SocketConnection.this);
283 }
284 }
285 }
286 }
287
288
289
290
291 @Override
292 public void send(final Message message, final ReplyCallback replyCallback)
293 throws MongoDbException {
294 send(message, null, replyCallback);
295 }
296
297
298
299
300 @Override
301 public void start() {
302 myReceiver.start();
303
304 if (myServer.needBuildInfo()) {
305 send(new BuildInfo(), new ServerUpdateCallback(myServer));
306 }
307 }
308 }