1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package com.allanbank.mongodb.client;
21
22 import java.beans.PropertyChangeEvent;
23 import java.beans.PropertyChangeListener;
24 import java.io.Closeable;
25 import java.io.IOException;
26 import java.lang.reflect.Constructor;
27 import java.util.ArrayList;
28 import java.util.List;
29 import java.util.concurrent.BlockingQueue;
30 import java.util.concurrent.CopyOnWriteArrayList;
31 import java.util.concurrent.LinkedBlockingQueue;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicLong;
34
35 import com.allanbank.mongodb.Durability;
36 import com.allanbank.mongodb.MongoClientConfiguration;
37 import com.allanbank.mongodb.MongoCursorControl;
38 import com.allanbank.mongodb.MongoDbException;
39 import com.allanbank.mongodb.MongoIterator;
40 import com.allanbank.mongodb.ReadPreference;
41 import com.allanbank.mongodb.StreamCallback;
42 import com.allanbank.mongodb.bson.Document;
43 import com.allanbank.mongodb.bson.DocumentAssignable;
44 import com.allanbank.mongodb.bson.NumericElement;
45 import com.allanbank.mongodb.bson.element.StringElement;
46 import com.allanbank.mongodb.client.callback.CursorStreamingCallback;
47 import com.allanbank.mongodb.client.connection.Connection;
48 import com.allanbank.mongodb.client.connection.ConnectionFactory;
49 import com.allanbank.mongodb.client.connection.ReconnectStrategy;
50 import com.allanbank.mongodb.client.connection.bootstrap.BootstrapConnectionFactory;
51 import com.allanbank.mongodb.client.state.Cluster;
52 import com.allanbank.mongodb.error.CannotConnectException;
53 import com.allanbank.mongodb.error.ConnectionLostException;
54 import com.allanbank.mongodb.util.IOUtils;
55 import com.allanbank.mongodb.util.log.Log;
56 import com.allanbank.mongodb.util.log.LogFactory;
57
58
59
60
61
62
63
64
65
66 public class ClientImpl extends AbstractClient {
67
68
69 protected static final Log LOG = LogFactory.getLog(ClientImpl.class);
70
71
72
73
74
75
76
77
78 protected static ConnectionFactory resolveBootstrap(
79 final MongoClientConfiguration config) {
80 ConnectionFactory result;
81 try {
82 final String name = "com.allanbank.mongodb.extensions.bootstrap.ExtensionsBootstrapConnectionFactory";
83 final Class<?> clazz = Class.forName(name);
84 final Constructor<?> constructor = clazz
85 .getConstructor(MongoClientConfiguration.class);
86
87 result = (ConnectionFactory) constructor.newInstance(config);
88 }
89
90 catch (final RuntimeException e) {
91 throw e;
92 }
93 catch (final Exception e) {
94 result = new BootstrapConnectionFactory(config);
95 }
96
97 return result;
98 }
99
100
101 private int myActiveReconnects;
102
103
104 private final MongoClientConfiguration myConfig;
105
106
107 private final ConnectionFactory myConnectionFactory;
108
109
110 private final PropertyChangeListener myConnectionListener;
111
112
113 private final List<Connection> myConnections;
114
115
116 private final BlockingQueue<Connection> myConnectionsToClose;
117
118
119 private final AtomicLong myNextConnectionSequence = new AtomicLong(0);
120
121
122
123
124
125
126
127 public ClientImpl(final MongoClientConfiguration config) {
128 this(config, resolveBootstrap(config));
129 }
130
131
132
133
134
135
136
137
138
139 public ClientImpl(final MongoClientConfiguration config,
140 final ConnectionFactory connectionFactory) {
141 myConfig = config;
142 myConnectionFactory = connectionFactory;
143 myConnections = new CopyOnWriteArrayList<Connection>();
144 myConnectionsToClose = new LinkedBlockingQueue<Connection>();
145 myConnectionListener = new ConnectionListener();
146 myActiveReconnects = 0;
147 }
148
149
150
151
152
153
154
155
156
157 @Override
158 public void close() {
159
160 super.close();
161
162 while (!myConnections.isEmpty()) {
163 try {
164 final Connection conn = myConnections.remove(0);
165 myConnectionsToClose.add(conn);
166 conn.shutdown(false);
167 }
168 catch (final ArrayIndexOutOfBoundsException aiob) {
169
170
171
172 aiob.getCause();
173 }
174 }
175
176
177 final List<Connection> conns = new ArrayList<Connection>(
178 myConnectionsToClose);
179 for (final Connection conn : conns) {
180 conn.waitForClosed(myConfig.getReadTimeout(), TimeUnit.MILLISECONDS);
181 if (conn.isOpen()) {
182
183 close(conn);
184 }
185 }
186
187
188 IOUtils.close(myConnectionFactory);
189 }
190
191
192
193
194
195
196
197 @Override
198 public ClusterStats getClusterStats() {
199 return myConnectionFactory.getClusterStats();
200 }
201
202
203
204
205
206
207
208
209 @Override
210 public ClusterType getClusterType() {
211 return myConnectionFactory.getClusterType();
212 }
213
214
215
216
217
218
219
220
221 @Override
222 public MongoClientConfiguration getConfig() {
223 return myConfig;
224 }
225
226
227
228
229
230
231 public int getConnectionCount() {
232 return myConnections.size();
233 }
234
235
236
237
238
239
240
241
242
243 @Override
244 public Durability getDefaultDurability() {
245 return myConfig.getDefaultDurability();
246 }
247
248
249
250
251
252
253
254
255
256 @Override
257 public ReadPreference getDefaultReadPreference() {
258 return myConfig.getDefaultReadPreference();
259 }
260
261
262
263
264
265
266
267
268
269
270 public boolean isCursorDocument(final Document doc) {
271 return (doc.getElements().size() == 5)
272 && (doc.get(StringElement.class,
273 MongoCursorControl.NAME_SPACE_FIELD) != null)
274 && (doc.get(NumericElement.class,
275 MongoCursorControl.CURSOR_ID_FIELD) != null)
276 && (doc.get(StringElement.class,
277 MongoCursorControl.SERVER_FIELD) != null)
278 && (doc.get(NumericElement.class,
279 MongoCursorControl.BATCH_SIZE_FIELD) != null)
280 && (doc.get(NumericElement.class,
281 MongoCursorControl.LIMIT_FIELD) != null);
282 }
283
284
285
286
287 @Override
288 public MongoIterator<Document> restart(
289 final DocumentAssignable cursorDocument)
290 throws IllegalArgumentException {
291 final Document cursorDoc = cursorDocument.asDocument();
292
293 if (isCursorDocument(cursorDoc)) {
294 final MongoIteratorImpl iter = new MongoIteratorImpl(cursorDoc,
295 this);
296 iter.restart();
297
298 return iter;
299 }
300
301 throw new IllegalArgumentException(
302 "Cannot restart without a well formed cursor document: "
303 + cursorDoc);
304 }
305
306
307
308
309 @Override
310 public MongoCursorControl restart(final StreamCallback<Document> results,
311 final DocumentAssignable cursorDocument)
312 throws IllegalArgumentException {
313 final Document cursorDoc = cursorDocument.asDocument();
314
315 if (isCursorDocument(cursorDoc)) {
316 final CursorStreamingCallback cb = new CursorStreamingCallback(
317 this, cursorDoc, results);
318 cb.restart();
319
320 return cb;
321 }
322 throw new IllegalArgumentException(
323 "Cannot restart without a well formed cursor document: "
324 + cursorDoc);
325 }
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346 @Override
347 protected Connection findConnection(final Message message1,
348 final Message message2) throws MongoDbException {
349
350 final int limit = Math.max(1, myConfig.getMaxConnectionCount());
351 if (limit < myConnections.size()) {
352 synchronized (myConnectionFactory) {
353
354 while (limit < myConnections.size()) {
355 try {
356 final Connection conn = myConnections.remove(0);
357 myConnectionsToClose.add(conn);
358 conn.shutdown(false);
359 }
360 catch (final ArrayIndexOutOfBoundsException aiob) {
361
362
363 aiob.getCause();
364 }
365 }
366 }
367 }
368
369
370 final Connection conn = searchConnection(message1, message2, true);
371
372 if (conn == null) {
373 throw new CannotConnectException(
374 "Could not create a connection to the server.");
375 }
376
377 return conn;
378 }
379
380
381
382
383
384
385
386
387 protected void handleConnectionClosed(final Connection connection) {
388
389 if (myConnections.contains(connection)) {
390
391 if (connection.isShuttingDown() && myConnections.remove(connection)) {
392
393 if (myConnections.size() < myConfig.getMinConnectionCount()) {
394 LOG.debug(
395 "MongoDB Connection closed: {}. Will try to reconnect.",
396 connection);
397 reconnect(connection);
398 }
399 else {
400 LOG.info("MongoDB Connection closed: {}", connection);
401 connection
402 .removePropertyChangeListener(myConnectionListener);
403 connection.raiseErrors(new ConnectionLostException(
404 "Connection shutdown."));
405 }
406 }
407 else {
408
409 LOG.info("Unexpected MongoDB Connection closed: " + connection
410 + ". Will try to reconnect.");
411 reconnect(connection);
412 }
413 }
414 else if (myConnectionsToClose.remove(connection)) {
415 LOG.debug("MongoDB Connection closed: {}", connection);
416 connection.removePropertyChangeListener(myConnectionListener);
417 }
418 else {
419 LOG.info("Unknown MongoDB Connection closed: {}", connection);
420 connection.removePropertyChangeListener(myConnectionListener);
421 }
422 }
423
424
425
426
427
428
429
430 protected void reconnect(final Connection connection) {
431 final ReconnectStrategy strategy = myConnectionFactory
432 .getReconnectStrategy();
433
434 try {
435 synchronized (this) {
436 myActiveReconnects += 1;
437 }
438
439 final Connection newConnection = strategy.reconnect(connection);
440 if (newConnection != null) {
441
442 myConnections.add(newConnection);
443 newConnection.addPropertyChangeListener(myConnectionListener);
444 }
445 }
446 finally {
447 myConnections.remove(connection);
448 connection.removePropertyChangeListener(myConnectionListener);
449
450
451
452 final MongoDbException exception = new ConnectionLostException(
453 "Connection lost to MongoDB: " + connection);
454 connection.raiseErrors(exception);
455
456 synchronized (this) {
457 myActiveReconnects -= 1;
458 notifyAll();
459 }
460 }
461 }
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498 protected Connection searchConnection(final Message message1,
499 final Message message2, final boolean waitForReconnect)
500 throws MongoDbException {
501
502 Connection conn = findIdleConnection();
503 if (conn == null) {
504 conn = tryCreateConnection();
505 if (conn == null) {
506 conn = findMostIdleConnection();
507 if ((conn == null) && waitForReconnect) {
508 conn = waitForReconnect(message1, message2);
509 }
510 }
511 }
512
513 return conn;
514 }
515
516
517
518
519
520
521
522 private void close(final Connection conn) {
523 try {
524 conn.close();
525 }
526 catch (final IOException ioe) {
527 LOG.warn(ioe, "Error closing connection to MongoDB: {}", conn);
528 }
529 finally {
530 myConnections.remove(conn);
531 myConnectionsToClose.remove(conn);
532
533 conn.removePropertyChangeListener(myConnectionListener);
534 }
535 }
536
537
538
539
540
541
542
543 private Connection findIdleConnection() {
544 if (!myConnections.isEmpty()) {
545
546 final long connSequence = myNextConnectionSequence.get();
547 for (int loop = 0; loop < Math.min(2, myConnections.size()); ++loop) {
548
549
550
551 final long sequence = Math.abs(connSequence + loop);
552 final int size = myConnections.size();
553 final int index = (int) (sequence % size);
554 try {
555 final Connection conn = myConnections.get(index);
556 if (conn.isAvailable() && (conn.getPendingCount() == 0)) {
557 return conn;
558 }
559 }
560 catch (final ArrayIndexOutOfBoundsException aiob) {
561
562
563 aiob.getCause();
564 }
565 }
566 }
567
568 return null;
569 }
570
571
572
573
574
575
576
577 private Connection findMostIdleConnection() {
578 if (!myConnections.isEmpty()) {
579 final long next = (myConnections.size() <= 1) ? 1
580 : myNextConnectionSequence.incrementAndGet();
581 final long previous = next - 1;
582
583 Connection previousConn = null;
584 Connection nextConn = null;
585 while ((previousConn == null) || (nextConn == null)) {
586 try {
587 final int size = myConnections.size();
588 previousConn = myConnections.get((int) (previous % size));
589 nextConn = myConnections.get((int) (next % size));
590 }
591 catch (final ArrayIndexOutOfBoundsException aiob) {
592
593
594 aiob.getCause();
595 }
596 }
597
598 if (previousConn == nextConn) {
599 if (previousConn.isAvailable()) {
600 return previousConn;
601 }
602 }
603 else if (previousConn.isAvailable()) {
604 if (nextConn.isAvailable()) {
605 if (previousConn.getPendingCount() < nextConn
606 .getPendingCount()) {
607 return previousConn;
608 }
609 return nextConn;
610 }
611 }
612 else if (nextConn.isAvailable()) {
613 return nextConn;
614 }
615 }
616
617 return null;
618 }
619
620
621
622
623
624
625
626 private Connection tryCreateConnection() {
627 if (myConnections.size() < myConfig.getMaxConnectionCount()) {
628 synchronized (myConnectionFactory) {
629 final int limit = Math.max(1, myConfig.getMaxConnectionCount());
630 if (myConnections.size() < limit) {
631 try {
632 final Connection conn = myConnectionFactory.connect();
633
634 myConnections.add(conn);
635
636
637 conn.addPropertyChangeListener(myConnectionListener);
638
639 return conn;
640 }
641 catch (final IOException ioe) {
642 LOG.warn(ioe, "Could not create a connection.");
643 }
644 }
645 }
646 }
647
648 return null;
649 }
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666 private Connection waitForReconnect(final Message message1,
667 final Message message2) {
668 Connection conn = null;
669 boolean wasReconnecting = false;
670 synchronized (this) {
671 wasReconnecting = (0 < myActiveReconnects);
672 if (wasReconnecting) {
673 long now = System.currentTimeMillis();
674 final long deadline = (myConfig.getReconnectTimeout() <= 0) ? Long.MAX_VALUE
675 : now + myConfig.getReconnectTimeout();
676
677 while ((now < deadline) && (0 < myActiveReconnects)) {
678 try {
679 LOG.debug("Waiting for reconnect to MongoDB.");
680 wait(deadline - now);
681
682 now = System.currentTimeMillis();
683 }
684 catch (final InterruptedException e) {
685
686 }
687 }
688 }
689 }
690
691 if (wasReconnecting) {
692
693 conn = searchConnection(message1, message2, false);
694 }
695 return conn;
696 }
697
698
699
700
701
702
703
704 protected class ConnectionListener implements PropertyChangeListener {
705
706
707
708
709 public ConnectionListener() {
710 super();
711 }
712
713
714
715
716
717
718
719 @Override
720 public void propertyChange(final PropertyChangeEvent event) {
721 if (Connection.OPEN_PROP_NAME.equals(event.getPropertyName())
722 && Boolean.FALSE.equals(event.getNewValue())) {
723 handleConnectionClosed((Connection) event.getSource());
724 }
725 }
726 }
727 }