1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package com.allanbank.mongodb.client.connection.rs;
22
23 import static java.util.concurrent.TimeUnit.MILLISECONDS;
24
25 import java.io.IOException;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Set;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.TimeoutException;
36 import java.util.logging.Level;
37
38 import com.allanbank.mongodb.MongoClientConfiguration;
39 import com.allanbank.mongodb.bson.Document;
40 import com.allanbank.mongodb.bson.Element;
41 import com.allanbank.mongodb.bson.element.StringElement;
42 import com.allanbank.mongodb.client.connection.Connection;
43 import com.allanbank.mongodb.client.connection.ReconnectStrategy;
44 import com.allanbank.mongodb.client.connection.proxy.ConnectionInfo;
45 import com.allanbank.mongodb.client.message.IsMaster;
46 import com.allanbank.mongodb.client.message.Reply;
47 import com.allanbank.mongodb.client.state.AbstractReconnectStrategy;
48 import com.allanbank.mongodb.client.state.Cluster;
49 import com.allanbank.mongodb.client.state.Server;
50 import com.allanbank.mongodb.client.state.ServerUpdateCallback;
51 import com.allanbank.mongodb.util.IOUtils;
52 import com.allanbank.mongodb.util.log.Log;
53 import com.allanbank.mongodb.util.log.LogFactory;
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 public class ReplicaSetReconnectStrategy extends AbstractReconnectStrategy {
76
77
78
79
80
81 public static final int INITIAL_RECONNECT_PAUSE_TIME_MS = 10;
82
83
84
85
86
87 public static final int MAX_RECONNECT_PAUSE_TIME_MS = 1000;
88
89
90 protected static final Log LOG = LogFactory
91 .getLog(ReplicaSetReconnectStrategy.class);
92
93
94 private final Set<Server> myDeadServers = Collections
95 .newSetFromMap(new ConcurrentHashMap<Server, Boolean>());
96
97
98
99
100 public ReplicaSetReconnectStrategy() {
101 super();
102 }
103
104
105
106
107
108
109
110
111
112 @Override
113 public ReplicaSetConnection reconnect(final Connection oldConnection) {
114 final ConnectionInfo<Server> info = reconnectPrimary();
115 if (info != null) {
116 return new ReplicaSetConnection(info.getConnection(),
117 info.getConnectionKey(), getState(),
118 getConnectionFactory(), getConfig(), this);
119 }
120 return null;
121 }
122
123
124
125
126
127
128
129
130
131 public synchronized ConnectionInfo<Server> reconnectPrimary() {
132 LOG.debug("Trying replica set reconnect.");
133 final Cluster state = getState();
134
135
136 final int wait = getConfig().getReconnectTimeout();
137 long now = System.currentTimeMillis();
138 final long deadline = (wait <= 0) ? Long.MAX_VALUE : (now + wait);
139
140 final Map<Server, Future<Reply>> answers = new HashMap<Server, Future<Reply>>();
141 final Map<Server, Connection> connections = new HashMap<Server, Connection>();
142
143
144 final boolean interrupted = Thread.interrupted();
145 try {
146
147 for (final Server writable : state.getWritableServers()) {
148 if (verifyPutative(answers, connections, writable, deadline)) {
149 LOG.debug("New primary for replica set: {}.",
150 writable.getCanonicalName());
151 return createReplicaSetConnection(connections, writable);
152 }
153 }
154
155
156
157 int pauseTime = INITIAL_RECONNECT_PAUSE_TIME_MS;
158 while (now < deadline) {
159
160 for (final Server server : state.getServers()) {
161
162 sendIsPrimary(answers, connections, server, false);
163
164
165 final ConnectionInfo<Server> newConn = checkForReply(state,
166 answers, connections, deadline);
167 if (newConn != null) {
168 return newConn;
169 }
170
171
172 }
173
174
175
176 sleep(pauseTime, MILLISECONDS);
177 pauseTime = Math.min(MAX_RECONNECT_PAUSE_TIME_MS, pauseTime
178 + pauseTime);
179
180
181 final ConnectionInfo<Server> newConn = checkForReply(state,
182 answers, connections, deadline);
183 if (newConn != null) {
184 return newConn;
185 }
186
187 now = System.currentTimeMillis();
188 }
189 }
190 finally {
191
192 for (final Connection conn : connections.values()) {
193 conn.shutdown(true);
194 }
195 if (interrupted) {
196 Thread.currentThread().interrupt();
197 }
198 }
199 return null;
200 }
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218 protected ConnectionInfo<Server> checkForReply(final Cluster state,
219 final Map<Server, Future<Reply>> answers,
220 final Map<Server, Connection> connections, final long deadline) {
221 final Map<Server, Future<Reply>> copy = new HashMap<Server, Future<Reply>>(
222 answers);
223 for (final Map.Entry<Server, Future<Reply>> entry : copy.entrySet()) {
224
225 final Server server = entry.getKey();
226 final Future<Reply> reply = entry.getValue();
227
228 if (reply.isDone()) {
229
230 answers.remove(server);
231
232
233 final String putative = checkReply(reply, connections, server,
234 deadline);
235
236
237 if (putative != null) {
238 final Server putativeServer = getState().get(putative);
239 if (verifyPutative(answers, connections, putativeServer,
240 deadline)) {
241
242
243
244
245 LOG.info("New primary for replica set: {}", putative);
246 updateUnknown(state, answers, connections);
247 return createReplicaSetConnection(connections,
248 putativeServer);
249 }
250 }
251 }
252 else {
253 LOG.debug("No reply yet from {}.", server);
254 }
255 }
256
257 return null;
258 }
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275 protected String checkReply(final Future<Reply> replyFuture,
276 final Map<Server, Connection> connections, final Server server,
277 final long deadline) {
278 if (replyFuture != null) {
279 try {
280 final Reply reply = replyFuture.get(
281 Math.max(0, deadline - System.currentTimeMillis()),
282 TimeUnit.MILLISECONDS);
283
284 final List<Document> results = reply.getResults();
285 if (!results.isEmpty()) {
286 final Document doc = results.get(0);
287
288
289 final Element primary = doc.get("primary");
290 if (primary instanceof StringElement) {
291 return ((StringElement) primary).getValue();
292 }
293 }
294 }
295 catch (final InterruptedException e) {
296
297 }
298 catch (final TimeoutException e) {
299
300 final Connection conn = connections.remove(server);
301 IOUtils.close(conn);
302 }
303 catch (final ExecutionException e) {
304
305 final Connection conn = connections.remove(server);
306 IOUtils.close(conn);
307 }
308 }
309 return null;
310 }
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327 protected Future<Reply> sendIsPrimary(
328 final Map<Server, Future<Reply>> answers,
329 final Map<Server, Connection> connections, final Server server,
330 final boolean isPrimary) {
331 Future<Reply> reply = null;
332 try {
333
334 Connection conn = connections.get(server);
335 if ((conn == null) || !conn.isAvailable()) {
336 conn = getConnectionFactory().connect(server, getConfig());
337 connections.put(server, conn);
338 }
339
340
341
342 reply = answers.get(server);
343 if (reply == null) {
344 LOG.debug("Sending reconnect(rs) query to {}.",
345 server.getCanonicalName());
346
347 final ServerUpdateCallback replyCallback = new ServerUpdateCallback(
348 server);
349 conn.send(new IsMaster(), replyCallback);
350
351 reply = replyCallback;
352 answers.put(server, reply);
353
354 myDeadServers.remove(server);
355 }
356 }
357 catch (final IOException e) {
358
359
360
361 final Level level = (isPrimary && myDeadServers.add(server)) ? Level.WARNING
362 : Level.FINE;
363 LOG.log(level, e, "Cannot create a connection to '{}'.", server);
364 }
365
366 return reply;
367 }
368
369
370
371
372
373
374
375
376
377 protected void sleep(final int sleepTime, final TimeUnit units) {
378 try {
379 units.sleep(sleepTime);
380 }
381 catch (final InterruptedException e) {
382
383 }
384 }
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400 protected boolean verifyPutative(final Map<Server, Future<Reply>> answers,
401 final Map<Server, Connection> connections,
402 final Server putativePrimary, final long deadline) {
403
404 LOG.debug("Verify putative server ({}) on reconnect(rs).",
405 putativePrimary);
406
407
408
409 answers.remove(putativePrimary);
410
411
412
413 final Future<Reply> reply = sendIsPrimary(answers, connections,
414 putativePrimary, true);
415 final String primary = checkReply(reply, connections, putativePrimary,
416 deadline);
417 if (putativePrimary.getCanonicalName().equals(primary)) {
418 return true;
419 }
420
421 return false;
422 }
423
424
425
426
427
428
429
430
431
432
433 private ConnectionInfo<Server> createReplicaSetConnection(
434 final Map<Server, Connection> connections,
435 final Server primaryServer) {
436 final Connection primaryConn = connections.remove(primaryServer);
437
438 return new ConnectionInfo<Server>(primaryConn, primaryServer);
439 }
440
441
442
443
444
445
446
447
448
449
450
451
452 private void updateUnknown(final Cluster state,
453 final Map<Server, Future<Reply>> answers,
454 final Map<Server, Connection> connections) {
455 for (final Server server : state.getServers()) {
456 switch (server.getState()) {
457 case UNKNOWN:
458 case UNAVAILABLE: {
459 answers.remove(server);
460 sendIsPrimary(answers, connections, server, false);
461 break;
462 }
463 case READ_ONLY:
464 case WRITABLE:
465 default: {
466
467 break;
468 }
469 }
470 }
471 }
472 }