Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
ReplicaSetReconnectStrategy |
|
| 5.0;5 | ||||
ReplicaSetReconnectStrategy$1 |
|
| 5.0;5 |
1 | /* | |
2 | * #%L | |
3 | * ReplicaSetReconnectStrategy.java - mongodb-async-driver - Allanbank Consulting, Inc. | |
4 | * %% | |
5 | * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc. | |
6 | * %% | |
7 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
8 | * you may not use this file except in compliance with the License. | |
9 | * You may obtain a copy of the License at | |
10 | * | |
11 | * http://www.apache.org/licenses/LICENSE-2.0 | |
12 | * | |
13 | * Unless required by applicable law or agreed to in writing, software | |
14 | * distributed under the License is distributed on an "AS IS" BASIS, | |
15 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
16 | * See the License for the specific language governing permissions and | |
17 | * limitations under the License. | |
18 | * #L% | |
19 | */ | |
20 | ||
21 | package com.allanbank.mongodb.client.connection.rs; | |
22 | ||
23 | import static java.util.concurrent.TimeUnit.MILLISECONDS; | |
24 | ||
25 | import java.io.IOException; | |
26 | import java.util.Collections; | |
27 | import java.util.HashMap; | |
28 | import java.util.List; | |
29 | import java.util.Map; | |
30 | import java.util.Set; | |
31 | import java.util.concurrent.ConcurrentHashMap; | |
32 | import java.util.concurrent.ExecutionException; | |
33 | import java.util.concurrent.Future; | |
34 | import java.util.concurrent.TimeUnit; | |
35 | import java.util.concurrent.TimeoutException; | |
36 | import java.util.logging.Level; | |
37 | ||
38 | import com.allanbank.mongodb.MongoClientConfiguration; | |
39 | import com.allanbank.mongodb.bson.Document; | |
40 | import com.allanbank.mongodb.bson.Element; | |
41 | import com.allanbank.mongodb.bson.element.StringElement; | |
42 | import com.allanbank.mongodb.client.connection.Connection; | |
43 | import com.allanbank.mongodb.client.connection.ReconnectStrategy; | |
44 | import com.allanbank.mongodb.client.connection.proxy.ConnectionInfo; | |
45 | import com.allanbank.mongodb.client.message.IsMaster; | |
46 | import com.allanbank.mongodb.client.message.Reply; | |
47 | import com.allanbank.mongodb.client.state.AbstractReconnectStrategy; | |
48 | import com.allanbank.mongodb.client.state.Cluster; | |
49 | import com.allanbank.mongodb.client.state.Server; | |
50 | import com.allanbank.mongodb.client.state.ServerUpdateCallback; | |
51 | import com.allanbank.mongodb.util.IOUtils; | |
52 | import com.allanbank.mongodb.util.log.Log; | |
53 | import com.allanbank.mongodb.util.log.LogFactory; | |
54 | ||
55 | /** | |
56 | * ReplicaSetReconnectStrategy provides a {@link ReconnectStrategy} designed for | |
57 | * replica sets. The reconnect strategy attempts to locate the primary member of | |
58 | * the replica set by: | |
59 | * <ol> | |
60 | * <li>Querying each member of the replica set for the primary server.</li> | |
61 | * <li>Once a primary server has been identified by a member of the replica set | |
62 | * (the putative primary) the putative primary server is queried for the primary | |
63 | * server.</li> | |
64 | * <ol> | |
65 | * <li>If the putative primary concurs that it is the primary then the search | |
66 | * completes and the primary server's connection is used.</li> | |
67 | * <li>If the putative primary does not concur then the search continues | |
68 | * scanning each server in turn for the primary server.</li> | |
69 | * </ol> | |
70 | * | |
71 | * @api.no This class is <b>NOT</b> part of the drivers API. This class may be | |
72 | * mutated in incompatible ways between any two releases of the driver. | |
73 | * @copyright 2012-2014, Allanbank Consulting, Inc., All Rights Reserved | |
74 | */ | |
75 | 0 | public class ReplicaSetReconnectStrategy extends AbstractReconnectStrategy { |
76 | ||
77 | /** | |
78 | * The initial amount of time to pause waiting for a server to take over as | |
79 | * the primary. | |
80 | */ | |
81 | public static final int INITIAL_RECONNECT_PAUSE_TIME_MS = 10; | |
82 | ||
83 | /** | |
84 | * The Maximum amount of time to pause waiting for a server to take over as | |
85 | * the primary. | |
86 | */ | |
87 | public static final int MAX_RECONNECT_PAUSE_TIME_MS = 1000; | |
88 | ||
89 | /** The logger for the {@link ReplicaSetReconnectStrategy}. */ | |
90 | 1 | protected static final Log LOG = LogFactory |
91 | .getLog(ReplicaSetReconnectStrategy.class); | |
92 | ||
93 | /** The set of servers we cannot connect to. */ | |
94 | 23 | private final Set<Server> myDeadServers = Collections |
95 | .newSetFromMap(new ConcurrentHashMap<Server, Boolean>()); | |
96 | ||
97 | /** | |
98 | * Creates a new ReplicaSetReconnectStrategy. | |
99 | */ | |
100 | public ReplicaSetReconnectStrategy() { | |
101 | 23 | super(); |
102 | 23 | } |
103 | ||
104 | /** | |
105 | * {@inheritDoc} | |
106 | * <p> | |
107 | * Overridden to search for the primary server in the replica set. This will | |
108 | * only continue until the | |
109 | * {@link MongoClientConfiguration#getReconnectTimeout()} has expired. | |
110 | * </p> | |
111 | */ | |
112 | @Override | |
113 | public ReplicaSetConnection reconnect(final Connection oldConnection) { | |
114 | 4 | final ConnectionInfo<Server> info = reconnectPrimary(); |
115 | 4 | if (info != null) { |
116 | 2 | return new ReplicaSetConnection(info.getConnection(), |
117 | info.getConnectionKey(), getState(), | |
118 | getConnectionFactory(), getConfig(), this); | |
119 | } | |
120 | 2 | return null; |
121 | } | |
122 | ||
123 | /** | |
124 | * Overridden to search for the primary server in the replica set. This will | |
125 | * only continue until the | |
126 | * {@link MongoClientConfiguration#getReconnectTimeout()} has expired. | |
127 | * | |
128 | * @return The information for the primary connection or null if the | |
129 | * reconnect fails. | |
130 | */ | |
131 | public synchronized ConnectionInfo<Server> reconnectPrimary() { | |
132 | 4 | LOG.debug("Trying replica set reconnect."); |
133 | 4 | final Cluster state = getState(); |
134 | ||
135 | // Figure out a deadline for the reconnect. | |
136 | 4 | final int wait = getConfig().getReconnectTimeout(); |
137 | 4 | long now = System.currentTimeMillis(); |
138 | 4 | final long deadline = (wait <= 0) ? Long.MAX_VALUE : (now + wait); |
139 | ||
140 | 4 | final Map<Server, Future<Reply>> answers = new HashMap<Server, Future<Reply>>(); |
141 | 4 | final Map<Server, Connection> connections = new HashMap<Server, Connection>(); |
142 | ||
143 | // Clear any interrupts | |
144 | 4 | final boolean interrupted = Thread.interrupted(); |
145 | try { | |
146 | // First try a simple reconnect. | |
147 | 4 | for (final Server writable : state.getWritableServers()) { |
148 | 0 | if (verifyPutative(answers, connections, writable, deadline)) { |
149 | 0 | LOG.debug("New primary for replica set: {}.", |
150 | writable.getCanonicalName()); | |
151 | 0 | return createReplicaSetConnection(connections, writable); |
152 | } | |
153 | 0 | } |
154 | ||
155 | // How much time to pause for replies and waiting for a server | |
156 | // to become primary. | |
157 | 4 | int pauseTime = INITIAL_RECONNECT_PAUSE_TIME_MS; |
158 | 14 | while (now < deadline) { |
159 | // Ask all of the servers who they think the primary is. | |
160 | 12 | for (final Server server : state.getServers()) { |
161 | ||
162 | 36 | sendIsPrimary(answers, connections, server, false); |
163 | ||
164 | // Anyone replied yet? | |
165 | 36 | final ConnectionInfo<Server> newConn = checkForReply(state, |
166 | answers, connections, deadline); | |
167 | 36 | if (newConn != null) { |
168 | 0 | return newConn; |
169 | } | |
170 | ||
171 | // Loop to the next server. | |
172 | 36 | } |
173 | ||
174 | // Wait for a beat for a reply or a server to decide to be | |
175 | // master. | |
176 | 12 | sleep(pauseTime, MILLISECONDS); |
177 | 12 | pauseTime = Math.min(MAX_RECONNECT_PAUSE_TIME_MS, pauseTime |
178 | + pauseTime); | |
179 | ||
180 | // Check again for replies before trying to reconnect. | |
181 | 12 | final ConnectionInfo<Server> newConn = checkForReply(state, |
182 | answers, connections, deadline); | |
183 | 12 | if (newConn != null) { |
184 | 2 | return newConn; |
185 | } | |
186 | ||
187 | 10 | now = System.currentTimeMillis(); |
188 | 10 | } |
189 | } | |
190 | finally { | |
191 | // Shut down the connections we created. | |
192 | 4 | for (final Connection conn : connections.values()) { |
193 | 9 | conn.shutdown(true); |
194 | 9 | } |
195 | 4 | if (interrupted) { |
196 | 0 | Thread.currentThread().interrupt(); |
197 | } | |
198 | } | |
199 | 2 | return null; |
200 | } | |
201 | ||
202 | /** | |
203 | * Checks for a reply from a server. If one has been received then it tries | |
204 | * to confirm the primary server by asking it if it thinks it is the primary | |
205 | * server. | |
206 | * | |
207 | * @param state | |
208 | * The state of the cluster. | |
209 | * @param answers | |
210 | * The pending ({@link Future}) answers from each server. | |
211 | * @param connections | |
212 | * The connection to each server. | |
213 | * @param deadline | |
214 | * The deadline for the reconnect attempt. | |
215 | * @return The new connection if there was a reply and that server confirmed | |
216 | * it was the primary. | |
217 | */ | |
218 | protected ConnectionInfo<Server> checkForReply(final Cluster state, | |
219 | final Map<Server, Future<Reply>> answers, | |
220 | final Map<Server, Connection> connections, final long deadline) { | |
221 | 48 | final Map<Server, Future<Reply>> copy = new HashMap<Server, Future<Reply>>( |
222 | answers); | |
223 | 48 | for (final Map.Entry<Server, Future<Reply>> entry : copy.entrySet()) { |
224 | ||
225 | 98 | final Server server = entry.getKey(); |
226 | 98 | final Future<Reply> reply = entry.getValue(); |
227 | ||
228 | 98 | if (reply.isDone()) { |
229 | // Remove this reply. | |
230 | 34 | answers.remove(server); |
231 | ||
232 | // Check the result. | |
233 | 34 | final String putative = checkReply(reply, connections, server, |
234 | deadline); | |
235 | ||
236 | // Phase2 - Verify the putative server. | |
237 | 34 | if (putative != null) { |
238 | 7 | final Server putativeServer = getState().get(putative); |
239 | 7 | if (verifyPutative(answers, connections, putativeServer, |
240 | deadline)) { | |
241 | ||
242 | // Phase 3 - Setup a new replica set connection to the | |
243 | // primary and seed it with a secondary if there is a | |
244 | // suitable server. | |
245 | 2 | LOG.info("New primary for replica set: {}", putative); |
246 | 2 | updateUnknown(state, answers, connections); |
247 | 2 | return createReplicaSetConnection(connections, |
248 | putativeServer); | |
249 | } | |
250 | } | |
251 | 32 | } |
252 | else { | |
253 | 64 | LOG.debug("No reply yet from {}.", server); |
254 | } | |
255 | 96 | } |
256 | ||
257 | 46 | return null; |
258 | } | |
259 | ||
260 | /** | |
261 | * Extracts who the server thinks is the primary from the reply. | |
262 | * | |
263 | * @param replyFuture | |
264 | * The future to get the reply from. | |
265 | * @param connections | |
266 | * The map of connections. The connection will be closed on an | |
267 | * error. | |
268 | * @param server | |
269 | * The server. | |
270 | * @param deadline | |
271 | * The deadline for the reconnect attempt. | |
272 | * @return The name of the server the reply indicates is the primary, null | |
273 | * if there is no primary or any error. | |
274 | */ | |
275 | protected String checkReply(final Future<Reply> replyFuture, | |
276 | final Map<Server, Connection> connections, final Server server, | |
277 | final long deadline) { | |
278 | 41 | if (replyFuture != null) { |
279 | try { | |
280 | 41 | final Reply reply = replyFuture.get( |
281 | Math.max(0, deadline - System.currentTimeMillis()), | |
282 | TimeUnit.MILLISECONDS); | |
283 | ||
284 | 39 | final List<Document> results = reply.getResults(); |
285 | 39 | if (!results.isEmpty()) { |
286 | 39 | final Document doc = results.get(0); |
287 | ||
288 | // Get the name of the primary server. | |
289 | 39 | final Element primary = doc.get("primary"); |
290 | 39 | if (primary instanceof StringElement) { |
291 | 9 | return ((StringElement) primary).getValue(); |
292 | } | |
293 | } | |
294 | } | |
295 | 0 | catch (final InterruptedException e) { |
296 | // Just ignore the reply. | |
297 | } | |
298 | 2 | catch (final TimeoutException e) { |
299 | // Kill the associated connection. | |
300 | 2 | final Connection conn = connections.remove(server); |
301 | 2 | IOUtils.close(conn); |
302 | } | |
303 | 0 | catch (final ExecutionException e) { |
304 | // Kill the associated connection. | |
305 | 0 | final Connection conn = connections.remove(server); |
306 | 0 | IOUtils.close(conn); |
307 | 32 | } |
308 | } | |
309 | 32 | return null; |
310 | } | |
311 | ||
312 | /** | |
313 | * Sends a command to the server to return what it thinks the state of the | |
314 | * cluster is. This method will not re-request the information from the | |
315 | * server if there is already an outstanding request. | |
316 | * | |
317 | * @param answers | |
318 | * The pending ({@link Future}) answers from each server. | |
319 | * @param connections | |
320 | * The connection to each server. | |
321 | * @param server | |
322 | * The server to send the request to. | |
323 | * @param isPrimary | |
324 | * If true logs connection errors as warnings. Debug otherwise. | |
325 | * @return The future reply for the request sent to the server. | |
326 | */ | |
327 | protected Future<Reply> sendIsPrimary( | |
328 | final Map<Server, Future<Reply>> answers, | |
329 | final Map<Server, Connection> connections, final Server server, | |
330 | final boolean isPrimary) { | |
331 | 43 | Future<Reply> reply = null; |
332 | try { | |
333 | // Locate a connection to the server. | |
334 | 43 | Connection conn = connections.get(server); |
335 | 43 | if ((conn == null) || !conn.isAvailable()) { |
336 | 13 | conn = getConnectionFactory().connect(server, getConfig()); |
337 | 13 | connections.put(server, conn); |
338 | } | |
339 | ||
340 | // Only send to the server if there is not an outstanding | |
341 | // request. | |
342 | 43 | reply = answers.get(server); |
343 | 43 | if (reply == null) { |
344 | 43 | LOG.debug("Sending reconnect(rs) query to {}.", |
345 | server.getCanonicalName()); | |
346 | ||
347 | 43 | final ServerUpdateCallback replyCallback = new ServerUpdateCallback( |
348 | server); | |
349 | 43 | conn.send(new IsMaster(), replyCallback); |
350 | ||
351 | 43 | reply = replyCallback; |
352 | 43 | answers.put(server, reply); |
353 | ||
354 | 43 | myDeadServers.remove(server); |
355 | } | |
356 | } | |
357 | 0 | catch (final IOException e) { |
358 | // Nothing to do for now. Log at a debug level if this is not the | |
359 | // primary. Warn if we think it is the primary (and have not warned | |
360 | // before) | |
361 | 0 | final Level level = (isPrimary && myDeadServers.add(server)) ? Level.WARNING |
362 | : Level.FINE; | |
363 | 0 | LOG.log(level, e, "Cannot create a connection to '{}'.", server); |
364 | 43 | } |
365 | ||
366 | 43 | return reply; |
367 | } | |
368 | ||
369 | /** | |
370 | * Sleeps without throwing an exception. | |
371 | * | |
372 | * @param sleepTime | |
373 | * The amount of time to sleep. | |
374 | * @param units | |
375 | * The untis for the amount of time to sleep. | |
376 | */ | |
377 | protected void sleep(final int sleepTime, final TimeUnit units) { | |
378 | try { | |
379 | 12 | units.sleep(sleepTime); |
380 | } | |
381 | 0 | catch (final InterruptedException e) { |
382 | // Ignore. | |
383 | 12 | } |
384 | 12 | } |
385 | ||
386 | /** | |
387 | * Tries to verify that the suspected primary server is in fact the primary | |
388 | * server by asking it directly and synchronously. | |
389 | * | |
390 | * @param answers | |
391 | * The pending ({@link Future}) answers from each server. | |
392 | * @param connections | |
393 | * The connection to each server. | |
394 | * @param putativePrimary | |
395 | * The server we think is the primary. | |
396 | * @param deadline | |
397 | * The deadline for the reconnect attempt. | |
398 | * @return True if the server concurs that it is the primary. | |
399 | */ | |
400 | protected boolean verifyPutative(final Map<Server, Future<Reply>> answers, | |
401 | final Map<Server, Connection> connections, | |
402 | final Server putativePrimary, final long deadline) { | |
403 | ||
404 | 7 | LOG.debug("Verify putative server ({}) on reconnect(rs).", |
405 | putativePrimary); | |
406 | ||
407 | // Make sure we send a new request. The old reply might have been | |
408 | // before becoming the primary. | |
409 | 7 | answers.remove(putativePrimary); |
410 | ||
411 | // If the primary agrees that they are the primary then it is | |
412 | // probably true. | |
413 | 7 | final Future<Reply> reply = sendIsPrimary(answers, connections, |
414 | putativePrimary, true); | |
415 | 7 | final String primary = checkReply(reply, connections, putativePrimary, |
416 | deadline); | |
417 | 7 | if (putativePrimary.getCanonicalName().equals(primary)) { |
418 | 2 | return true; |
419 | } | |
420 | ||
421 | 5 | return false; |
422 | } | |
423 | ||
424 | /** | |
425 | * Creates the {@link ReplicaSetConnection} for the primary server. | |
426 | * | |
427 | * @param connections | |
428 | * The connection that are being managed. | |
429 | * @param primaryServer | |
430 | * The primary server. | |
431 | * @return The {@link ReplicaSetConnection}. | |
432 | */ | |
433 | private ConnectionInfo<Server> createReplicaSetConnection( | |
434 | final Map<Server, Connection> connections, | |
435 | final Server primaryServer) { | |
436 | 2 | final Connection primaryConn = connections.remove(primaryServer); |
437 | ||
438 | 2 | return new ConnectionInfo<Server>(primaryConn, primaryServer); |
439 | } | |
440 | ||
441 | /** | |
442 | * Tries to send messages to all of the members of the cluster in an | |
443 | * indeterminate state. | |
444 | * | |
445 | * @param state | |
446 | * The state of the cluster. | |
447 | * @param answers | |
448 | * The pending responses. | |
449 | * @param connections | |
450 | * The connection already created. | |
451 | */ | |
452 | private void updateUnknown(final Cluster state, | |
453 | final Map<Server, Future<Reply>> answers, | |
454 | final Map<Server, Connection> connections) { | |
455 | 2 | for (final Server server : state.getServers()) { |
456 | 6 | switch (server.getState()) { |
457 | case UNKNOWN: // Fall through. | |
458 | case UNAVAILABLE: { | |
459 | 0 | answers.remove(server); |
460 | 0 | sendIsPrimary(answers, connections, server, false); |
461 | 0 | break; |
462 | } | |
463 | case READ_ONLY: | |
464 | case WRITABLE: | |
465 | default: { | |
466 | // Known good. | |
467 | break; | |
468 | } | |
469 | } | |
470 | 6 | } |
471 | 2 | } |
472 | } |