1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
|
20 | |
|
21 | |
package com.allanbank.mongodb.client.connection.proxy; |
22 | |
|
23 | |
import java.beans.PropertyChangeEvent; |
24 | |
import java.beans.PropertyChangeListener; |
25 | |
import java.beans.PropertyChangeSupport; |
26 | |
import java.io.IOException; |
27 | |
import java.util.Arrays; |
28 | |
import java.util.HashSet; |
29 | |
import java.util.List; |
30 | |
import java.util.Map; |
31 | |
import java.util.Set; |
32 | |
import java.util.concurrent.ConcurrentHashMap; |
33 | |
import java.util.concurrent.ConcurrentMap; |
34 | |
import java.util.concurrent.TimeUnit; |
35 | |
import java.util.concurrent.atomic.AtomicBoolean; |
36 | |
import java.util.concurrent.atomic.AtomicReference; |
37 | |
|
38 | |
import com.allanbank.mongodb.Callback; |
39 | |
import com.allanbank.mongodb.MongoClientConfiguration; |
40 | |
import com.allanbank.mongodb.MongoDbException; |
41 | |
import com.allanbank.mongodb.ReadPreference; |
42 | |
import com.allanbank.mongodb.client.Message; |
43 | |
import com.allanbank.mongodb.client.callback.ReplyCallback; |
44 | |
import com.allanbank.mongodb.client.connection.Connection; |
45 | |
import com.allanbank.mongodb.client.connection.ReconnectStrategy; |
46 | |
import com.allanbank.mongodb.client.state.Cluster; |
47 | |
import com.allanbank.mongodb.error.ConnectionLostException; |
48 | |
import com.allanbank.mongodb.util.log.Log; |
49 | |
import com.allanbank.mongodb.util.log.LogFactory; |
50 | |
|
51 | |
|
52 | |
|
53 | |
|
54 | |
|
55 | |
|
56 | |
|
57 | |
|
58 | |
|
59 | |
|
60 | |
|
61 | |
public abstract class AbstractProxyMultipleConnection<K> implements Connection { |
62 | |
|
63 | |
|
64 | 1 | private static final Log LOG = LogFactory |
65 | |
.getLog(AbstractProxyMultipleConnection.class); |
66 | |
|
67 | |
|
68 | |
protected final Cluster myCluster; |
69 | |
|
70 | |
|
71 | |
protected final MongoClientConfiguration myConfig; |
72 | |
|
73 | |
|
74 | |
protected final PropertyChangeSupport myEventSupport; |
75 | |
|
76 | |
|
77 | |
protected final ProxiedConnectionFactory myFactory; |
78 | |
|
79 | |
|
80 | |
protected final AtomicReference<Connection> myLastUsedConnection; |
81 | |
|
82 | |
|
83 | |
protected final PropertyChangeListener myListener; |
84 | |
|
85 | |
|
86 | |
protected volatile K myMainKey; |
87 | |
|
88 | |
|
89 | |
protected final AtomicBoolean myOpen; |
90 | |
|
91 | |
|
92 | |
protected final AtomicBoolean myShutdown; |
93 | |
|
94 | |
|
95 | |
final ConcurrentMap<K, Connection> myConnections; |
96 | |
|
97 | |
|
98 | |
|
99 | |
|
100 | |
|
101 | |
|
102 | |
|
103 | |
|
104 | |
|
105 | |
|
106 | |
|
107 | |
|
108 | |
|
109 | |
|
110 | |
|
111 | |
public AbstractProxyMultipleConnection(final Connection proxiedConnection, |
112 | |
final K server, final Cluster cluster, |
113 | |
final ProxiedConnectionFactory factory, |
114 | 52 | final MongoClientConfiguration config) { |
115 | 52 | myMainKey = server; |
116 | 52 | myCluster = cluster; |
117 | 52 | myFactory = factory; |
118 | 52 | myConfig = config; |
119 | |
|
120 | 52 | myOpen = new AtomicBoolean(true); |
121 | 52 | myShutdown = new AtomicBoolean(false); |
122 | 52 | myEventSupport = new PropertyChangeSupport(this); |
123 | 52 | myConnections = new ConcurrentHashMap<K, Connection>(); |
124 | 52 | myLastUsedConnection = new AtomicReference<Connection>( |
125 | |
proxiedConnection); |
126 | |
|
127 | 52 | myListener = new ClusterAndConnectionListener(); |
128 | 52 | myCluster.addListener(myListener); |
129 | |
|
130 | 52 | if (proxiedConnection != null) { |
131 | 49 | cacheConnection(server, proxiedConnection); |
132 | |
} |
133 | 52 | } |
134 | |
|
135 | |
|
136 | |
|
137 | |
|
138 | |
|
139 | |
|
140 | |
|
141 | |
@Override |
142 | |
public void addPropertyChangeListener(final PropertyChangeListener listener) { |
143 | 1 | myEventSupport.addPropertyChangeListener(listener); |
144 | 1 | } |
145 | |
|
146 | |
|
147 | |
|
148 | |
|
149 | |
|
150 | |
|
151 | |
@Override |
152 | |
public void close() { |
153 | |
|
154 | 51 | myOpen.set(false); |
155 | 51 | myCluster.removeListener(myListener); |
156 | |
|
157 | 51 | for (final Connection conn : myConnections.values()) { |
158 | |
try { |
159 | 56 | conn.removePropertyChangeListener(myListener); |
160 | 56 | conn.close(); |
161 | |
} |
162 | 1 | catch (final IOException ioe) { |
163 | 1 | LOG.warn(ioe, "Could not close the connection: {}", conn); |
164 | 55 | } |
165 | 56 | } |
166 | 51 | } |
167 | |
|
168 | |
|
169 | |
|
170 | |
|
171 | |
|
172 | |
|
173 | |
|
174 | |
|
175 | |
|
176 | |
@Override |
177 | |
public void flush() throws IOException { |
178 | 2 | for (final Connection conn : myConnections.values()) { |
179 | |
try { |
180 | 4 | conn.flush(); |
181 | |
} |
182 | 1 | catch (final IOException ioe) { |
183 | 1 | LOG.warn(ioe, "Could not flush the connection: {}", conn); |
184 | 3 | } |
185 | 4 | } |
186 | 2 | } |
187 | |
|
188 | |
|
189 | |
|
190 | |
|
191 | |
|
192 | |
|
193 | |
|
194 | |
|
195 | |
@Override |
196 | |
public int getPendingCount() { |
197 | 1 | return myLastUsedConnection.get().getPendingCount(); |
198 | |
} |
199 | |
|
200 | |
|
201 | |
|
202 | |
|
203 | |
|
204 | |
|
205 | |
|
206 | |
@Override |
207 | |
public boolean isAvailable() { |
208 | 19 | return isOpen() && !isShuttingDown(); |
209 | |
} |
210 | |
|
211 | |
|
212 | |
|
213 | |
|
214 | |
|
215 | |
|
216 | |
|
217 | |
@Override |
218 | |
public boolean isIdle() { |
219 | 1 | return myLastUsedConnection.get().isIdle(); |
220 | |
} |
221 | |
|
222 | |
|
223 | |
|
224 | |
|
225 | |
|
226 | |
|
227 | |
|
228 | |
@Override |
229 | |
public boolean isOpen() { |
230 | 23 | return myOpen.get(); |
231 | |
} |
232 | |
|
233 | |
|
234 | |
|
235 | |
|
236 | |
|
237 | |
|
238 | |
|
239 | |
@Override |
240 | |
public boolean isShuttingDown() { |
241 | 19 | return myShutdown.get(); |
242 | |
} |
243 | |
|
244 | |
|
245 | |
|
246 | |
|
247 | |
|
248 | |
|
249 | |
|
250 | |
@Override |
251 | |
public void raiseErrors(final MongoDbException exception) { |
252 | 1 | for (final Connection conn : myConnections.values()) { |
253 | 1 | conn.raiseErrors(exception); |
254 | 1 | } |
255 | 1 | } |
256 | |
|
257 | |
|
258 | |
|
259 | |
|
260 | |
|
261 | |
|
262 | |
|
263 | |
@Override |
264 | |
public void removePropertyChangeListener( |
265 | |
final PropertyChangeListener listener) { |
266 | 1 | myEventSupport.removePropertyChangeListener(listener); |
267 | 1 | } |
268 | |
|
269 | |
|
270 | |
|
271 | |
|
272 | |
|
273 | |
|
274 | |
|
275 | |
|
276 | |
|
277 | |
|
278 | |
@Override |
279 | |
public void send(final Message message1, final Message message2, |
280 | |
final ReplyCallback replyCallback) throws MongoDbException { |
281 | |
|
282 | 19 | if (!isAvailable()) { |
283 | 0 | throw new ConnectionLostException("Connection shutting down."); |
284 | |
} |
285 | |
|
286 | 19 | final List<K> servers = findPotentialKeys(message1, message2); |
287 | 14 | if (!trySend(servers, message1, message2, replyCallback)) { |
288 | 1 | throw new MongoDbException( |
289 | |
"Could not send the messages to any of the potential servers."); |
290 | |
} |
291 | 13 | } |
292 | |
|
293 | |
|
294 | |
|
295 | |
|
296 | |
|
297 | |
|
298 | |
|
299 | |
|
300 | |
|
301 | |
|
302 | |
@Override |
303 | |
public void send(final Message message, final ReplyCallback replyCallback) |
304 | |
throws MongoDbException { |
305 | 14 | send(message, null, replyCallback); |
306 | 11 | } |
307 | |
|
308 | |
|
309 | |
|
310 | |
|
311 | |
|
312 | |
|
313 | |
|
314 | |
@Override |
315 | |
public void shutdown(final boolean force) { |
316 | 2 | myShutdown.set(true); |
317 | 2 | for (final Connection conn : myConnections.values()) { |
318 | 1 | conn.shutdown(force); |
319 | 1 | } |
320 | 2 | } |
321 | |
|
322 | |
|
323 | |
|
324 | |
|
325 | |
|
326 | |
|
327 | |
|
328 | |
@Override |
329 | |
public String toString() { |
330 | 3 | return getConnectionType() + "(" + myLastUsedConnection.get() + ")"; |
331 | |
} |
332 | |
|
333 | |
|
334 | |
|
335 | |
|
336 | |
|
337 | |
|
338 | |
|
339 | |
@Override |
340 | |
public void waitForClosed(final int timeout, final TimeUnit timeoutUnits) { |
341 | 2 | final long millis = timeoutUnits.toMillis(timeout); |
342 | 2 | long now = System.currentTimeMillis(); |
343 | 2 | final long deadline = now + millis; |
344 | |
|
345 | 2 | for (final Connection conn : myConnections.values()) { |
346 | 2 | if (now < deadline) { |
347 | 1 | conn.waitForClosed((int) (deadline - now), |
348 | |
TimeUnit.MILLISECONDS); |
349 | 1 | now = System.currentTimeMillis(); |
350 | |
} |
351 | 2 | } |
352 | 2 | } |
353 | |
|
354 | |
|
355 | |
|
356 | |
|
357 | |
|
358 | |
|
359 | |
|
360 | |
|
361 | |
|
362 | |
|
363 | |
|
364 | |
|
365 | |
protected Connection cacheConnection(final K server, final Connection conn) { |
366 | 62 | final Connection existing = myConnections.putIfAbsent(server, conn); |
367 | 62 | if (existing != null) { |
368 | 1 | conn.shutdown(true); |
369 | 1 | return existing; |
370 | |
} |
371 | |
|
372 | |
|
373 | 61 | conn.addPropertyChangeListener(myListener); |
374 | |
|
375 | 61 | return conn; |
376 | |
} |
377 | |
|
378 | |
|
379 | |
|
380 | |
|
381 | |
|
382 | |
|
383 | |
|
384 | |
|
385 | |
|
386 | |
|
387 | |
protected abstract Connection connect(final K server); |
388 | |
|
389 | |
|
390 | |
|
391 | |
|
392 | |
|
393 | |
|
394 | |
|
395 | |
|
396 | |
|
397 | |
protected Connection connection(final K server) { |
398 | 1 | return myConnections.get(server); |
399 | |
} |
400 | |
|
401 | |
|
402 | |
|
403 | |
|
404 | |
|
405 | |
|
406 | |
|
407 | |
|
408 | |
|
409 | |
|
410 | |
protected MongoDbException createReconnectFailure(final Message message1, |
411 | |
final Message message2) { |
412 | 5 | final StringBuilder builder = new StringBuilder( |
413 | |
"Could not find any servers for the following set of read preferences: "); |
414 | 5 | final Set<ReadPreference> seen = new HashSet<ReadPreference>(); |
415 | 5 | for (final Message message : Arrays.asList(message1, message2)) { |
416 | 10 | if (message != null) { |
417 | 7 | final ReadPreference prefs = message.getReadPreference(); |
418 | 7 | if (seen.add(prefs)) { |
419 | 6 | if (seen.size() > 1) { |
420 | 2 | builder.append(", "); |
421 | |
} |
422 | 6 | builder.append(prefs); |
423 | |
} |
424 | |
} |
425 | 10 | } |
426 | 5 | builder.append('.'); |
427 | |
|
428 | 5 | return new MongoDbException(builder.toString()); |
429 | |
} |
430 | |
|
431 | |
|
432 | |
|
433 | |
|
434 | |
|
435 | |
|
436 | |
|
437 | |
|
438 | |
|
439 | |
|
440 | |
|
441 | |
|
442 | |
|
443 | |
protected void doSend(final Connection conn, final Message message1, |
444 | |
final Message message2, final ReplyCallback reply) { |
445 | |
|
446 | |
|
447 | 13 | myLastUsedConnection.lazySet(conn); |
448 | |
|
449 | 13 | if (message2 == null) { |
450 | 11 | conn.send(message1, reply); |
451 | |
} |
452 | |
else { |
453 | 2 | conn.send(message1, message2, reply); |
454 | |
} |
455 | 13 | } |
456 | |
|
457 | |
|
458 | |
|
459 | |
|
460 | |
|
461 | |
|
462 | |
|
463 | |
|
464 | |
|
465 | |
|
466 | |
|
467 | |
|
468 | |
|
469 | |
|
470 | |
|
471 | |
protected abstract List<K> findPotentialKeys(final Message message1, |
472 | |
final Message message2) throws MongoDbException; |
473 | |
|
474 | |
|
475 | |
|
476 | |
|
477 | |
|
478 | |
|
479 | |
protected abstract String getConnectionType(); |
480 | |
|
481 | |
|
482 | |
|
483 | |
|
484 | |
|
485 | |
|
486 | |
|
487 | |
|
488 | |
protected synchronized void handleConnectionClosed( |
489 | |
final Connection connection) { |
490 | |
|
491 | 3 | if (!myOpen.get()) { |
492 | 0 | return; |
493 | |
} |
494 | |
|
495 | 3 | final K server = findKeyForConnection(connection); |
496 | |
|
497 | |
try { |
498 | |
|
499 | |
|
500 | |
|
501 | |
|
502 | 3 | final K primary = myMainKey; |
503 | 3 | if ((myConnections.size() == 1) |
504 | |
&& (!server.equals(primary) || connection.isShuttingDown())) { |
505 | |
|
506 | |
|
507 | 0 | removeCachedConnection(server, connection); |
508 | 0 | shutdown(true); |
509 | |
|
510 | 0 | myEventSupport.firePropertyChange(Connection.OPEN_PROP_NAME, |
511 | |
true, isOpen()); |
512 | |
} |
513 | |
|
514 | |
|
515 | 3 | else if (server.equals(primary) && isOpen()) { |
516 | |
|
517 | 3 | myMainKey = null; |
518 | |
|
519 | 3 | LOG.info("Primary MongoDB Connection closed: {}({}). " |
520 | |
+ "Will try to reconnect.", getConnectionType(), |
521 | |
connection); |
522 | |
|
523 | |
|
524 | 3 | final ConnectionInfo<K> newConn = reconnectMain(); |
525 | 3 | if (newConn != null) { |
526 | 1 | removeCachedConnection(server, connection); |
527 | 1 | updateMain(newConn); |
528 | |
} |
529 | |
|
530 | |
|
531 | |
|
532 | 2 | else if (myConnections.size() == 1) { |
533 | |
|
534 | 1 | removeCachedConnection(server, connection); |
535 | 1 | shutdown(false); |
536 | |
|
537 | 1 | myEventSupport.firePropertyChange( |
538 | |
Connection.OPEN_PROP_NAME, true, isOpen()); |
539 | |
} |
540 | 3 | } |
541 | |
|
542 | |
else { |
543 | 0 | LOG.debug("MongoDB Connection closed: {}({}).", |
544 | |
getConnectionType(), connection); |
545 | |
} |
546 | |
} |
547 | |
finally { |
548 | |
|
549 | 3 | removeCachedConnection(server, connection); |
550 | 3 | connection.raiseErrors(new ConnectionLostException( |
551 | |
"Connection closed.")); |
552 | 3 | } |
553 | 3 | } |
554 | |
|
555 | |
|
556 | |
|
557 | |
|
558 | |
|
559 | |
|
560 | |
protected abstract ConnectionInfo<K> reconnectMain(); |
561 | |
|
562 | |
|
563 | |
|
564 | |
|
565 | |
|
566 | |
|
567 | |
|
568 | |
|
569 | |
|
570 | |
protected void removeCachedConnection(final Object key, |
571 | |
final Connection connection) { |
572 | 7 | Connection conn = connection; |
573 | 7 | if (connection == null) { |
574 | 1 | conn = myConnections.remove(key); |
575 | |
} |
576 | 6 | else if (!myConnections.remove(key, connection)) { |
577 | |
|
578 | 2 | conn = null; |
579 | |
} |
580 | |
|
581 | 7 | if (conn != null) { |
582 | 5 | conn.removePropertyChangeListener(myListener); |
583 | 5 | conn.shutdown(true); |
584 | |
} |
585 | 7 | } |
586 | |
|
587 | |
|
588 | |
|
589 | |
|
590 | |
|
591 | |
|
592 | |
|
593 | |
|
594 | |
|
595 | |
|
596 | |
|
597 | |
|
598 | |
|
599 | |
|
600 | |
|
601 | |
protected boolean trySend(final List<K> servers, final Message message1, |
602 | |
final Message message2, final ReplyCallback reply) { |
603 | 14 | for (final K server : servers) { |
604 | |
|
605 | 56 | Connection conn = myConnections.get(server); |
606 | |
|
607 | |
|
608 | 56 | if (conn == null) { |
609 | |
|
610 | 52 | conn = connect(server); |
611 | |
} |
612 | 4 | else if (!conn.isAvailable()) { |
613 | |
|
614 | 1 | removeCachedConnection(server, conn); |
615 | |
|
616 | 1 | final ReconnectStrategy strategy = myFactory |
617 | |
.getReconnectStrategy(); |
618 | 1 | conn = strategy.reconnect(conn); |
619 | 1 | if (conn != null) { |
620 | 1 | conn = cacheConnection(server, conn); |
621 | |
} |
622 | |
} |
623 | |
|
624 | 56 | if (conn != null) { |
625 | 13 | doSend(conn, message1, message2, reply); |
626 | 13 | return true; |
627 | |
} |
628 | 43 | } |
629 | |
|
630 | 1 | return false; |
631 | |
} |
632 | |
|
633 | |
|
634 | |
|
635 | |
|
636 | |
|
637 | |
|
638 | |
|
639 | |
protected void updateMain(final ConnectionInfo<K> newConn) { |
640 | 1 | myMainKey = newConn.getConnectionKey(); |
641 | |
|
642 | |
|
643 | |
|
644 | 1 | cacheConnection(newConn.getConnectionKey(), newConn.getConnection()); |
645 | 1 | } |
646 | |
|
647 | |
|
648 | |
|
649 | |
|
650 | |
|
651 | |
|
652 | |
|
653 | |
|
654 | |
private K findKeyForConnection(final Connection connection) { |
655 | 3 | for (final Map.Entry<K, Connection> entry : myConnections.entrySet()) { |
656 | 3 | if (entry.getValue() == connection) { |
657 | 3 | return entry.getKey(); |
658 | |
} |
659 | 0 | } |
660 | 0 | return null; |
661 | |
} |
662 | |
|
663 | |
|
664 | |
|
665 | |
|
666 | |
|
667 | |
|
668 | |
|
669 | |
|
670 | |
|
671 | 52 | protected final class ClusterAndConnectionListener implements |
672 | |
PropertyChangeListener { |
673 | |
@Override |
674 | |
public void propertyChange(final PropertyChangeEvent event) { |
675 | 11 | final String propName = event.getPropertyName(); |
676 | 11 | if (Cluster.SERVER_PROP.equals(propName) |
677 | |
&& (event.getNewValue() == null)) { |
678 | |
|
679 | 1 | removeCachedConnection(event.getOldValue(), null); |
680 | |
} |
681 | 10 | else if (Connection.OPEN_PROP_NAME.equals(event.getPropertyName()) |
682 | |
&& Boolean.FALSE.equals(event.getNewValue())) { |
683 | 3 | handleConnectionClosed((Connection) event.getSource()); |
684 | |
} |
685 | 11 | } |
686 | |
|
687 | |
} |
688 | |
} |