1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package com.allanbank.mongodb.client.connection.proxy;
22
23 import java.beans.PropertyChangeEvent;
24 import java.beans.PropertyChangeListener;
25 import java.beans.PropertyChangeSupport;
26 import java.io.IOException;
27 import java.util.Arrays;
28 import java.util.HashSet;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ConcurrentMap;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.atomic.AtomicReference;
37
38 import com.allanbank.mongodb.Callback;
39 import com.allanbank.mongodb.MongoClientConfiguration;
40 import com.allanbank.mongodb.MongoDbException;
41 import com.allanbank.mongodb.ReadPreference;
42 import com.allanbank.mongodb.client.Message;
43 import com.allanbank.mongodb.client.callback.ReplyCallback;
44 import com.allanbank.mongodb.client.connection.Connection;
45 import com.allanbank.mongodb.client.connection.ReconnectStrategy;
46 import com.allanbank.mongodb.client.state.Cluster;
47 import com.allanbank.mongodb.error.ConnectionLostException;
48 import com.allanbank.mongodb.util.log.Log;
49 import com.allanbank.mongodb.util.log.LogFactory;
50
51
52
53
54
55
56
57
58
59
60
61 public abstract class AbstractProxyMultipleConnection<K> implements Connection {
62
63
64 private static final Log LOG = LogFactory
65 .getLog(AbstractProxyMultipleConnection.class);
66
67
68 protected final Cluster myCluster;
69
70
71 protected final MongoClientConfiguration myConfig;
72
73
74 protected final PropertyChangeSupport myEventSupport;
75
76
77 protected final ProxiedConnectionFactory myFactory;
78
79
80 protected final AtomicReference<Connection> myLastUsedConnection;
81
82
83 protected final PropertyChangeListener myListener;
84
85
86 protected volatile K myMainKey;
87
88
89 protected final AtomicBoolean myOpen;
90
91
92 protected final AtomicBoolean myShutdown;
93
94
95 final ConcurrentMap<K, Connection> myConnections;
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111 public AbstractProxyMultipleConnection(final Connection proxiedConnection,
112 final K server, final Cluster cluster,
113 final ProxiedConnectionFactory factory,
114 final MongoClientConfiguration config) {
115 myMainKey = server;
116 myCluster = cluster;
117 myFactory = factory;
118 myConfig = config;
119
120 myOpen = new AtomicBoolean(true);
121 myShutdown = new AtomicBoolean(false);
122 myEventSupport = new PropertyChangeSupport(this);
123 myConnections = new ConcurrentHashMap<K, Connection>();
124 myLastUsedConnection = new AtomicReference<Connection>(
125 proxiedConnection);
126
127 myListener = new ClusterAndConnectionListener();
128 myCluster.addListener(myListener);
129
130 if (proxiedConnection != null) {
131 cacheConnection(server, proxiedConnection);
132 }
133 }
134
135
136
137
138
139
140
141 @Override
142 public void addPropertyChangeListener(final PropertyChangeListener listener) {
143 myEventSupport.addPropertyChangeListener(listener);
144 }
145
146
147
148
149
150
151 @Override
152 public void close() {
153
154 myOpen.set(false);
155 myCluster.removeListener(myListener);
156
157 for (final Connection conn : myConnections.values()) {
158 try {
159 conn.removePropertyChangeListener(myListener);
160 conn.close();
161 }
162 catch (final IOException ioe) {
163 LOG.warn(ioe, "Could not close the connection: {}", conn);
164 }
165 }
166 }
167
168
169
170
171
172
173
174
175
176 @Override
177 public void flush() throws IOException {
178 for (final Connection conn : myConnections.values()) {
179 try {
180 conn.flush();
181 }
182 catch (final IOException ioe) {
183 LOG.warn(ioe, "Could not flush the connection: {}", conn);
184 }
185 }
186 }
187
188
189
190
191
192
193
194
195 @Override
196 public int getPendingCount() {
197 return myLastUsedConnection.get().getPendingCount();
198 }
199
200
201
202
203
204
205
206 @Override
207 public boolean isAvailable() {
208 return isOpen() && !isShuttingDown();
209 }
210
211
212
213
214
215
216
217 @Override
218 public boolean isIdle() {
219 return myLastUsedConnection.get().isIdle();
220 }
221
222
223
224
225
226
227
228 @Override
229 public boolean isOpen() {
230 return myOpen.get();
231 }
232
233
234
235
236
237
238
239 @Override
240 public boolean isShuttingDown() {
241 return myShutdown.get();
242 }
243
244
245
246
247
248
249
250 @Override
251 public void raiseErrors(final MongoDbException exception) {
252 for (final Connection conn : myConnections.values()) {
253 conn.raiseErrors(exception);
254 }
255 }
256
257
258
259
260
261
262
263 @Override
264 public void removePropertyChangeListener(
265 final PropertyChangeListener listener) {
266 myEventSupport.removePropertyChangeListener(listener);
267 }
268
269
270
271
272
273
274
275
276
277
278 @Override
279 public void send(final Message message1, final Message message2,
280 final ReplyCallback replyCallback) throws MongoDbException {
281
282 if (!isAvailable()) {
283 throw new ConnectionLostException("Connection shutting down.");
284 }
285
286 final List<K> servers = findPotentialKeys(message1, message2);
287 if (!trySend(servers, message1, message2, replyCallback)) {
288 throw new MongoDbException(
289 "Could not send the messages to any of the potential servers.");
290 }
291 }
292
293
294
295
296
297
298
299
300
301
302 @Override
303 public void send(final Message message, final ReplyCallback replyCallback)
304 throws MongoDbException {
305 send(message, null, replyCallback);
306 }
307
308
309
310
311
312
313
314 @Override
315 public void shutdown(final boolean force) {
316 myShutdown.set(true);
317 for (final Connection conn : myConnections.values()) {
318 conn.shutdown(force);
319 }
320 }
321
322
323
324
325
326
327
328 @Override
329 public String toString() {
330 return getConnectionType() + "(" + myLastUsedConnection.get() + ")";
331 }
332
333
334
335
336
337
338
339 @Override
340 public void waitForClosed(final int timeout, final TimeUnit timeoutUnits) {
341 final long millis = timeoutUnits.toMillis(timeout);
342 long now = System.currentTimeMillis();
343 final long deadline = now + millis;
344
345 for (final Connection conn : myConnections.values()) {
346 if (now < deadline) {
347 conn.waitForClosed((int) (deadline - now),
348 TimeUnit.MILLISECONDS);
349 now = System.currentTimeMillis();
350 }
351 }
352 }
353
354
355
356
357
358
359
360
361
362
363
364
365 protected Connection cacheConnection(final K server, final Connection conn) {
366 final Connection existing = myConnections.putIfAbsent(server, conn);
367 if (existing != null) {
368 conn.shutdown(true);
369 return existing;
370 }
371
372
373 conn.addPropertyChangeListener(myListener);
374
375 return conn;
376 }
377
378
379
380
381
382
383
384
385
386
387 protected abstract Connection connect(final K server);
388
389
390
391
392
393
394
395
396
397 protected Connection connection(final K server) {
398 return myConnections.get(server);
399 }
400
401
402
403
404
405
406
407
408
409
410 protected MongoDbException createReconnectFailure(final Message message1,
411 final Message message2) {
412 final StringBuilder builder = new StringBuilder(
413 "Could not find any servers for the following set of read preferences: ");
414 final Set<ReadPreference> seen = new HashSet<ReadPreference>();
415 for (final Message message : Arrays.asList(message1, message2)) {
416 if (message != null) {
417 final ReadPreference prefs = message.getReadPreference();
418 if (seen.add(prefs)) {
419 if (seen.size() > 1) {
420 builder.append(", ");
421 }
422 builder.append(prefs);
423 }
424 }
425 }
426 builder.append('.');
427
428 return new MongoDbException(builder.toString());
429 }
430
431
432
433
434
435
436
437
438
439
440
441
442
443 protected void doSend(final Connection conn, final Message message1,
444 final Message message2, final ReplyCallback reply) {
445
446
447 myLastUsedConnection.lazySet(conn);
448
449 if (message2 == null) {
450 conn.send(message1, reply);
451 }
452 else {
453 conn.send(message1, message2, reply);
454 }
455 }
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471 protected abstract List<K> findPotentialKeys(final Message message1,
472 final Message message2) throws MongoDbException;
473
474
475
476
477
478
479 protected abstract String getConnectionType();
480
481
482
483
484
485
486
487
488 protected synchronized void handleConnectionClosed(
489 final Connection connection) {
490
491 if (!myOpen.get()) {
492 return;
493 }
494
495 final K server = findKeyForConnection(connection);
496
497 try {
498
499
500
501
502 final K primary = myMainKey;
503 if ((myConnections.size() == 1)
504 && (!server.equals(primary) || connection.isShuttingDown())) {
505
506
507 removeCachedConnection(server, connection);
508 shutdown(true);
509
510 myEventSupport.firePropertyChange(Connection.OPEN_PROP_NAME,
511 true, isOpen());
512 }
513
514
515 else if (server.equals(primary) && isOpen()) {
516
517 myMainKey = null;
518
519 LOG.info("Primary MongoDB Connection closed: {}({}). "
520 + "Will try to reconnect.", getConnectionType(),
521 connection);
522
523
524 final ConnectionInfo<K> newConn = reconnectMain();
525 if (newConn != null) {
526 removeCachedConnection(server, connection);
527 updateMain(newConn);
528 }
529
530
531
532 else if (myConnections.size() == 1) {
533
534 removeCachedConnection(server, connection);
535 shutdown(false);
536
537 myEventSupport.firePropertyChange(
538 Connection.OPEN_PROP_NAME, true, isOpen());
539 }
540 }
541
542 else {
543 LOG.debug("MongoDB Connection closed: {}({}).",
544 getConnectionType(), connection);
545 }
546 }
547 finally {
548
549 removeCachedConnection(server, connection);
550 connection.raiseErrors(new ConnectionLostException(
551 "Connection closed."));
552 }
553 }
554
555
556
557
558
559
560 protected abstract ConnectionInfo<K> reconnectMain();
561
562
563
564
565
566
567
568
569
570 protected void removeCachedConnection(final Object key,
571 final Connection connection) {
572 Connection conn = connection;
573 if (connection == null) {
574 conn = myConnections.remove(key);
575 }
576 else if (!myConnections.remove(key, connection)) {
577
578 conn = null;
579 }
580
581 if (conn != null) {
582 conn.removePropertyChangeListener(myListener);
583 conn.shutdown(true);
584 }
585 }
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601 protected boolean trySend(final List<K> servers, final Message message1,
602 final Message message2, final ReplyCallback reply) {
603 for (final K server : servers) {
604
605 Connection conn = myConnections.get(server);
606
607
608 if (conn == null) {
609
610 conn = connect(server);
611 }
612 else if (!conn.isAvailable()) {
613
614 removeCachedConnection(server, conn);
615
616 final ReconnectStrategy strategy = myFactory
617 .getReconnectStrategy();
618 conn = strategy.reconnect(conn);
619 if (conn != null) {
620 conn = cacheConnection(server, conn);
621 }
622 }
623
624 if (conn != null) {
625 doSend(conn, message1, message2, reply);
626 return true;
627 }
628 }
629
630 return false;
631 }
632
633
634
635
636
637
638
639 protected void updateMain(final ConnectionInfo<K> newConn) {
640 myMainKey = newConn.getConnectionKey();
641
642
643
644 cacheConnection(newConn.getConnectionKey(), newConn.getConnection());
645 }
646
647
648
649
650
651
652
653
654 private K findKeyForConnection(final Connection connection) {
655 for (final Map.Entry<K, Connection> entry : myConnections.entrySet()) {
656 if (entry.getValue() == connection) {
657 return entry.getKey();
658 }
659 }
660 return null;
661 }
662
663
664
665
666
667
668
669
670
671 protected final class ClusterAndConnectionListener implements
672 PropertyChangeListener {
673 @Override
674 public void propertyChange(final PropertyChangeEvent event) {
675 final String propName = event.getPropertyName();
676 if (Cluster.SERVER_PROP.equals(propName)
677 && (event.getNewValue() == null)) {
678
679 removeCachedConnection(event.getOldValue(), null);
680 }
681 else if (Connection.OPEN_PROP_NAME.equals(event.getPropertyName())
682 && Boolean.FALSE.equals(event.getNewValue())) {
683 handleConnectionClosed((Connection) event.getSource());
684 }
685 }
686
687 }
688 }