1 /*
2 * #%L
3 * ClusterPinger.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20 package com.allanbank.mongodb.client.state;
21
22 import java.io.Closeable;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.CopyOnWriteArrayList;
31 import java.util.concurrent.ExecutionException;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.TimeoutException;
35
36 import com.allanbank.mongodb.MongoClientConfiguration;
37 import com.allanbank.mongodb.MongoDbException;
38 import com.allanbank.mongodb.client.ClusterType;
39 import com.allanbank.mongodb.client.connection.Connection;
40 import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory;
41 import com.allanbank.mongodb.client.message.IsMaster;
42 import com.allanbank.mongodb.client.message.ReplicaSetStatus;
43 import com.allanbank.mongodb.client.message.Reply;
44 import com.allanbank.mongodb.util.IOUtils;
45 import com.allanbank.mongodb.util.log.Log;
46 import com.allanbank.mongodb.util.log.LogFactory;
47
48 /**
49 * ClusterPinger pings each of the connections in the cluster and updates the
50 * latency of the server from this client.
51 *
52 * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
53 * mutated in incompatible ways between any two releases of the driver.
54 * @copyright 2012-2014, Allanbank Consulting, Inc., All Rights Reserved
55 */
56 public class ClusterPinger implements Runnable, Closeable {
57
58 /** The default interval between ping sweeps in seconds. */
59 public static final int DEFAULT_PING_INTERVAL_SECONDS = 600;
60
61 /** The logger for the {@link ClusterPinger}. */
62 protected static final Log LOG = LogFactory.getLog(ClusterPinger.class);
63
64 /** Instance of the inner class containing the ping logic. */
65 private static final Pinger PINGER = new Pinger();
66
67 /**
68 * Pings the server and suppresses all exceptions.
69 *
70 * @param server
71 * The address of the server. Used for logging.
72 * @param conn
73 * The connection to ping.
74 * @return True if the ping worked, false otherwise.
75 */
76 public static boolean ping(final Server server, final Connection conn) {
77 return PINGER.ping(server, conn);
78 }
79
80 /** The state of the clusters. */
81 private final List<Cluster> myClusters;
82
83 /** The configuration for the connections. */
84 private final MongoClientConfiguration myConfig;
85
86 /** The factory for creating connections to the servers. */
87 private final ProxiedConnectionFactory myConnectionFactory;
88
89 /** The units for the ping sweep intervals. */
90 private volatile TimeUnit myIntervalUnits;
91
92 /** The interval for a ping sweep across all of the servers. */
93 private volatile int myPingSweepInterval;
94
95 /** The thread that is pinging the servers for latency. */
96 private final Thread myPingThread;
97
98 /** The flag to stop the ping thread. */
99 private volatile boolean myRunning;
100
101 /**
102 * Creates a new ClusterPinger.
103 *
104 * @param cluster
105 * The state of the cluster.
106 * @param factory
107 * The factory for creating connections to the servers.
108 * @param config
109 * The configuration for the connections.
110 */
111 public ClusterPinger(final Cluster cluster,
112 final ProxiedConnectionFactory factory,
113 final MongoClientConfiguration config) {
114 super();
115
116 myConnectionFactory = factory;
117 myConfig = config;
118 myRunning = true;
119
120 myClusters = new CopyOnWriteArrayList<Cluster>();
121 myClusters.add(cluster);
122
123 myIntervalUnits = TimeUnit.SECONDS;
124 myPingSweepInterval = DEFAULT_PING_INTERVAL_SECONDS;
125
126 myPingThread = myConfig.getThreadFactory().newThread(this);
127 myPingThread.setDaemon(true);
128 myPingThread.setName("MongoDB Pinger");
129 myPingThread.setPriority(Thread.MIN_PRIORITY);
130 }
131
132 /**
133 * Adds a new cluster to the set of tracked clusters.
134 *
135 * @param cluster
136 * A new cluster to the set of tracked clusters.
137 */
138 public void addCluster(final Cluster cluster) {
139 myClusters.add(cluster);
140 }
141
142 /**
143 * {@inheritDoc}
144 * <p>
145 * Overridden to close the pinger.
146 * </p>
147 */
148 @Override
149 public void close() {
150 myRunning = false;
151 myPingThread.interrupt();
152 }
153
154 /**
155 * Returns the units for the ping sweep intervals.
156 *
157 * @return The units for the ping sweep intervals.
158 */
159 public TimeUnit getIntervalUnits() {
160 return myIntervalUnits;
161 }
162
163 /**
164 * Returns the interval for a ping sweep across all of the servers..
165 *
166 * @return The interval for a ping sweep across all of the servers..
167 */
168 public int getPingSweepInterval() {
169 return myPingSweepInterval;
170 }
171
172 /**
173 * Performs a single sweep through the servers sending a ping with a
174 * callback to set the latency and tags for each server.
175 * <p>
176 * This method will not return until at least 50% of the servers have
177 * replied (which may be a failure) to the initial ping.
178 * </p>
179 *
180 * @param cluster
181 * The cluster of servers to ping.
182 */
183 public void initialSweep(final Cluster cluster) {
184 final List<Server> servers = cluster.getServers();
185 final List<Future<Reply>> replies = new ArrayList<Future<Reply>>(
186 servers.size());
187 final List<Connection> connections = new ArrayList<Connection>(
188 servers.size());
189 try {
190 for (final Server server : servers) {
191 // Ping the current server.
192 final String name = server.getCanonicalName();
193 Connection conn = null;
194 try {
195 conn = myConnectionFactory.connect(server, myConfig);
196
197 // Use a isMaster request to measure latency. It is
198 // a best case since it does not require any locks.
199 final Future<Reply> reply = PINGER.pingAsync(
200 cluster.getType(), server, conn);
201 replies.add(reply);
202 }
203 catch (final IOException e) {
204 LOG.info("Could not ping '{}': {}", name, e.getMessage());
205 }
206 finally {
207 if (conn != null) {
208 connections.add(conn);
209 conn.shutdown(false);
210 }
211 }
212 }
213
214 long now = System.currentTimeMillis();
215 final long deadline = now
216 + Math.max(5000, myConfig.getConnectTimeout());
217 while ((now < deadline) && !replies.isEmpty()) {
218 final Iterator<Future<Reply>> iter = replies.iterator();
219 while (iter.hasNext() && (now < deadline)) {
220 Future<Reply> future = iter.next();
221 try {
222 if (future != null) {
223 // Pause...
224 future.get(deadline - now, TimeUnit.MILLISECONDS);
225 }
226
227 // A good reply or we could not connect to the server.
228 iter.remove();
229 }
230 catch (final ExecutionException e) {
231 // We got a reply. Its a failure but its a reply.
232 iter.remove();
233 }
234 catch (final TimeoutException e) {
235 // No reply yet.
236 future = null;
237 }
238 catch (final InterruptedException e) {
239 // No reply yet.
240 future = null;
241 }
242
243 now = System.currentTimeMillis();
244 }
245 }
246 }
247 finally {
248 for (final Connection conn : connections) {
249 IOUtils.close(conn);
250 }
251 }
252 }
253
254 /**
255 * {@inheritDoc}
256 * <p>
257 * Overridden to periodically wake-up and ping the servers. At first this
258 * will occur fairly often but eventually degrade to once every 5 minutes.
259 * </p>
260 */
261 @Override
262 public void run() {
263 while (myRunning) {
264 try {
265 final Map<Server, ClusterType> servers = extractAllServers();
266
267 final long interval = getIntervalUnits().toMillis(
268 getPingSweepInterval());
269 final long perServerSleep = servers.isEmpty() ? interval
270 : interval / servers.size();
271
272 // Sleep a little before starting. We do it first to give
273 // tests time to finish without a sweep in the middle
274 // causing confusion and delay.
275 Thread.sleep(TimeUnit.MILLISECONDS.toMillis(perServerSleep));
276
277 startSweep();
278
279 for (final Map.Entry<Server, ClusterType> entry : servers
280 .entrySet()) {
281 // Ping the current server.
282 final Server server = entry.getKey();
283 final String name = server.getCanonicalName();
284 Connection conn = null;
285 try {
286 myPingThread.setName("MongoDB Pinger - " + name);
287
288 conn = myConnectionFactory.connect(server, myConfig);
289
290 PINGER.pingAsync(entry.getValue(), server, conn);
291
292 // Sleep a little between the servers.
293 Thread.sleep(TimeUnit.MILLISECONDS
294 .toMillis(perServerSleep));
295 }
296 catch (final IOException e) {
297 LOG.info("Could not ping '{}': {}", name,
298 e.getMessage());
299 }
300 finally {
301 myPingThread.setName("MongoDB Pinger - Idle");
302 if (conn != null) {
303 conn.shutdown(true);
304 }
305 }
306
307 }
308 }
309 catch (final InterruptedException ok) {
310 LOG.debug("Pinger interrupted.");
311 }
312 }
313 }
314
315 /**
316 * Sets the value of units for the ping sweep intervals.
317 *
318 * @param intervalUnits
319 * The new value for the units for the ping sweep intervals.
320 */
321 public void setIntervalUnits(final TimeUnit intervalUnits) {
322 myIntervalUnits = intervalUnits;
323 }
324
325 /**
326 * Sets the interval for a ping sweep across all of the servers..
327 *
328 * @param pingSweepInterval
329 * The new value for the interval for a ping sweep across all of
330 * the servers..
331 */
332 public void setPingSweepInterval(final int pingSweepInterval) {
333 myPingSweepInterval = pingSweepInterval;
334 }
335
336 /**
337 * Starts the background pinger.
338 */
339 public void start() {
340 myPingThread.start();
341 }
342
343 /**
344 * Stops the background pinger. Equivalent to {@link #close()}.
345 */
346 public void stop() {
347 close();
348 }
349
350 /**
351 * Starts the background pinger.
352 */
353 public void wakeUp() {
354 myPingThread.interrupt();
355 }
356
357 /**
358 * Extension point to notify derived classes that a new sweep is starting.
359 */
360 protected void startSweep() {
361 // Nothing.
362 }
363
364 /**
365 * Extracts the complete list of servers in all clusters.
366 *
367 * @return The complete list of servers across all clusters.
368 */
369 private Map<Server, ClusterType> extractAllServers() {
370 final Map<Server, ClusterType> servers = new HashMap<Server, ClusterType>();
371
372 for (final Cluster cluster : myClusters) {
373 for (final Server server : cluster.getServers()) {
374 servers.put(server, cluster.getType());
375 }
376 }
377
378 return Collections.unmodifiableMap(servers);
379 }
380
381 /**
382 * Pinger provides logic to ping servers.
383 *
384 * @copyright 2012-2013, Allanbank Consulting, Inc., All Rights Reserved
385 */
386 protected static final class Pinger {
387 /**
388 * Pings the server and suppresses all exceptions. Updates the server
389 * state with a latency and the tags found in the response, if any.
390 *
391 * @param server
392 * The server to update with the results of the ping. If
393 * <code>false</code> is returned then the state will not
394 * have been updated. Passing <code>null</code> for the state
395 * is allowed.
396 * @param conn
397 * The connection to ping.
398 * @return True if the ping worked, false otherwise.
399 */
400 public boolean ping(final Server server, final Connection conn) {
401 try {
402 final Future<Reply> future = pingAsync(ClusterType.STAND_ALONE,
403 server, conn);
404
405 // Wait for the reply.
406 if (future != null) {
407 future.get(1, TimeUnit.MINUTES);
408
409 return true;
410 }
411 }
412 catch (final ExecutionException e) {
413 LOG.info(e, "Could not ping '{}': {}",
414 server.getCanonicalName(), e.getMessage());
415 }
416 catch (final TimeoutException e) {
417 LOG.info(e, "'{}' might be a zombie - not receiving "
418 + "a response to ping: {}", server.getCanonicalName(),
419 e.getMessage());
420 }
421 catch (final InterruptedException e) {
422 LOG.info(e, "Interrupted pinging '{}': {}",
423 server.getCanonicalName(), e.getMessage());
424 }
425
426 return false;
427 }
428
429 /**
430 * Pings the server and suppresses all exceptions. Returns a future that
431 * can be used to determine if a response has been received. The future
432 * will update the {@link Server} latency and tags if found.
433 *
434 * @param type
435 * The type of cluster to ping.
436 * @param server
437 * The server to update with the results of the ping. If
438 * <code>false</code> is returned then the state will not
439 * have been updated. Passing <code>null</code> for the state
440 * is allowed.
441 * @param conn
442 * The connection to ping.
443 * @return A {@link Future} that will be updated once the reply is
444 * received.
445 */
446 public Future<Reply> pingAsync(final ClusterType type,
447 final Server server, final Connection conn) {
448 try {
449 final ServerUpdateCallback future = new ServerUpdateCallback(
450 server);
451
452 conn.send(new IsMaster(), future);
453 if (type == ClusterType.REPLICA_SET) {
454 conn.send(new ReplicaSetStatus(), new ServerUpdateCallback(
455 server));
456 }
457
458 return future;
459 }
460 catch (final MongoDbException e) {
461 LOG.info("Could not ping '{}': {}", server, e.getMessage());
462 }
463 return null;
464 }
465 }
466 }