1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package com.allanbank.mongodb.client.state;
21
22 import java.io.Closeable;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.CopyOnWriteArrayList;
31 import java.util.concurrent.ExecutionException;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.TimeoutException;
35
36 import com.allanbank.mongodb.MongoClientConfiguration;
37 import com.allanbank.mongodb.MongoDbException;
38 import com.allanbank.mongodb.client.ClusterType;
39 import com.allanbank.mongodb.client.connection.Connection;
40 import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory;
41 import com.allanbank.mongodb.client.message.IsMaster;
42 import com.allanbank.mongodb.client.message.ReplicaSetStatus;
43 import com.allanbank.mongodb.client.message.Reply;
44 import com.allanbank.mongodb.util.IOUtils;
45 import com.allanbank.mongodb.util.log.Log;
46 import com.allanbank.mongodb.util.log.LogFactory;
47
48
49
50
51
52
53
54
55
56 public class ClusterPinger implements Runnable, Closeable {
57
58
59 public static final int DEFAULT_PING_INTERVAL_SECONDS = 600;
60
61
62 protected static final Log LOG = LogFactory.getLog(ClusterPinger.class);
63
64
65 private static final Pinger PINGER = new Pinger();
66
67
68
69
70
71
72
73
74
75
76 public static boolean ping(final Server server, final Connection conn) {
77 return PINGER.ping(server, conn);
78 }
79
80
81 private final List<Cluster> myClusters;
82
83
84 private final MongoClientConfiguration myConfig;
85
86
87 private final ProxiedConnectionFactory myConnectionFactory;
88
89
90 private volatile TimeUnit myIntervalUnits;
91
92
93 private volatile int myPingSweepInterval;
94
95
96 private final Thread myPingThread;
97
98
99 private volatile boolean myRunning;
100
101
102
103
104
105
106
107
108
109
110
111 public ClusterPinger(final Cluster cluster,
112 final ProxiedConnectionFactory factory,
113 final MongoClientConfiguration config) {
114 super();
115
116 myConnectionFactory = factory;
117 myConfig = config;
118 myRunning = true;
119
120 myClusters = new CopyOnWriteArrayList<Cluster>();
121 myClusters.add(cluster);
122
123 myIntervalUnits = TimeUnit.SECONDS;
124 myPingSweepInterval = DEFAULT_PING_INTERVAL_SECONDS;
125
126 myPingThread = myConfig.getThreadFactory().newThread(this);
127 myPingThread.setDaemon(true);
128 myPingThread.setName("MongoDB Pinger");
129 myPingThread.setPriority(Thread.MIN_PRIORITY);
130 }
131
132
133
134
135
136
137
138 public void addCluster(final Cluster cluster) {
139 myClusters.add(cluster);
140 }
141
142
143
144
145
146
147
148 @Override
149 public void close() {
150 myRunning = false;
151 myPingThread.interrupt();
152 }
153
154
155
156
157
158
159 public TimeUnit getIntervalUnits() {
160 return myIntervalUnits;
161 }
162
163
164
165
166
167
168 public int getPingSweepInterval() {
169 return myPingSweepInterval;
170 }
171
172
173
174
175
176
177
178
179
180
181
182
183 public void initialSweep(final Cluster cluster) {
184 final List<Server> servers = cluster.getServers();
185 final List<Future<Reply>> replies = new ArrayList<Future<Reply>>(
186 servers.size());
187 final List<Connection> connections = new ArrayList<Connection>(
188 servers.size());
189 try {
190 for (final Server server : servers) {
191
192 final String name = server.getCanonicalName();
193 Connection conn = null;
194 try {
195 conn = myConnectionFactory.connect(server, myConfig);
196
197
198
199 final Future<Reply> reply = PINGER.pingAsync(
200 cluster.getType(), server, conn);
201 replies.add(reply);
202 }
203 catch (final IOException e) {
204 LOG.info("Could not ping '{}': {}", name, e.getMessage());
205 }
206 finally {
207 if (conn != null) {
208 connections.add(conn);
209 conn.shutdown(false);
210 }
211 }
212 }
213
214 long now = System.currentTimeMillis();
215 final long deadline = now
216 + Math.max(5000, myConfig.getConnectTimeout());
217 while ((now < deadline) && !replies.isEmpty()) {
218 final Iterator<Future<Reply>> iter = replies.iterator();
219 while (iter.hasNext() && (now < deadline)) {
220 Future<Reply> future = iter.next();
221 try {
222 if (future != null) {
223
224 future.get(deadline - now, TimeUnit.MILLISECONDS);
225 }
226
227
228 iter.remove();
229 }
230 catch (final ExecutionException e) {
231
232 iter.remove();
233 }
234 catch (final TimeoutException e) {
235
236 future = null;
237 }
238 catch (final InterruptedException e) {
239
240 future = null;
241 }
242
243 now = System.currentTimeMillis();
244 }
245 }
246 }
247 finally {
248 for (final Connection conn : connections) {
249 IOUtils.close(conn);
250 }
251 }
252 }
253
254
255
256
257
258
259
260
261 @Override
262 public void run() {
263 while (myRunning) {
264 try {
265 final Map<Server, ClusterType> servers = extractAllServers();
266
267 final long interval = getIntervalUnits().toMillis(
268 getPingSweepInterval());
269 final long perServerSleep = servers.isEmpty() ? interval
270 : interval / servers.size();
271
272
273
274
275 Thread.sleep(TimeUnit.MILLISECONDS.toMillis(perServerSleep));
276
277 startSweep();
278
279 for (final Map.Entry<Server, ClusterType> entry : servers
280 .entrySet()) {
281
282 final Server server = entry.getKey();
283 final String name = server.getCanonicalName();
284 Connection conn = null;
285 try {
286 myPingThread.setName("MongoDB Pinger - " + name);
287
288 conn = myConnectionFactory.connect(server, myConfig);
289
290 PINGER.pingAsync(entry.getValue(), server, conn);
291
292
293 Thread.sleep(TimeUnit.MILLISECONDS
294 .toMillis(perServerSleep));
295 }
296 catch (final IOException e) {
297 LOG.info("Could not ping '{}': {}", name,
298 e.getMessage());
299 }
300 finally {
301 myPingThread.setName("MongoDB Pinger - Idle");
302 if (conn != null) {
303 conn.shutdown(true);
304 }
305 }
306
307 }
308 }
309 catch (final InterruptedException ok) {
310 LOG.debug("Pinger interrupted.");
311 }
312 }
313 }
314
315
316
317
318
319
320
321 public void setIntervalUnits(final TimeUnit intervalUnits) {
322 myIntervalUnits = intervalUnits;
323 }
324
325
326
327
328
329
330
331
332 public void setPingSweepInterval(final int pingSweepInterval) {
333 myPingSweepInterval = pingSweepInterval;
334 }
335
336
337
338
339 public void start() {
340 myPingThread.start();
341 }
342
343
344
345
346 public void stop() {
347 close();
348 }
349
350
351
352
353 public void wakeUp() {
354 myPingThread.interrupt();
355 }
356
357
358
359
360 protected void startSweep() {
361
362 }
363
364
365
366
367
368
369 private Map<Server, ClusterType> extractAllServers() {
370 final Map<Server, ClusterType> servers = new HashMap<Server, ClusterType>();
371
372 for (final Cluster cluster : myClusters) {
373 for (final Server server : cluster.getServers()) {
374 servers.put(server, cluster.getType());
375 }
376 }
377
378 return Collections.unmodifiableMap(servers);
379 }
380
381
382
383
384
385
386 protected static final class Pinger {
387
388
389
390
391
392
393
394
395
396
397
398
399
400 public boolean ping(final Server server, final Connection conn) {
401 try {
402 final Future<Reply> future = pingAsync(ClusterType.STAND_ALONE,
403 server, conn);
404
405
406 if (future != null) {
407 future.get(1, TimeUnit.MINUTES);
408
409 return true;
410 }
411 }
412 catch (final ExecutionException e) {
413 LOG.info(e, "Could not ping '{}': {}",
414 server.getCanonicalName(), e.getMessage());
415 }
416 catch (final TimeoutException e) {
417 LOG.info(e, "'{}' might be a zombie - not receiving "
418 + "a response to ping: {}", server.getCanonicalName(),
419 e.getMessage());
420 }
421 catch (final InterruptedException e) {
422 LOG.info(e, "Interrupted pinging '{}': {}",
423 server.getCanonicalName(), e.getMessage());
424 }
425
426 return false;
427 }
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446 public Future<Reply> pingAsync(final ClusterType type,
447 final Server server, final Connection conn) {
448 try {
449 final ServerUpdateCallback future = new ServerUpdateCallback(
450 server);
451
452 conn.send(new IsMaster(), future);
453 if (type == ClusterType.REPLICA_SET) {
454 conn.send(new ReplicaSetStatus(), new ServerUpdateCallback(
455 server));
456 }
457
458 return future;
459 }
460 catch (final MongoDbException e) {
461 LOG.info("Could not ping '{}': {}", server, e.getMessage());
462 }
463 return null;
464 }
465 }
466 }