1 /*
2 * #%L
3 * Cluster.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20 package com.allanbank.mongodb.client.state;
21
22 import java.beans.PropertyChangeEvent;
23 import java.beans.PropertyChangeListener;
24 import java.beans.PropertyChangeSupport;
25 import java.net.InetSocketAddress;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.Collections;
29 import java.util.List;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ConcurrentMap;
32 import java.util.concurrent.CopyOnWriteArrayList;
33
34 import com.allanbank.mongodb.MongoClientConfiguration;
35 import com.allanbank.mongodb.ReadPreference;
36 import com.allanbank.mongodb.Version;
37 import com.allanbank.mongodb.client.ClusterStats;
38 import com.allanbank.mongodb.client.ClusterType;
39 import com.allanbank.mongodb.client.Message;
40 import com.allanbank.mongodb.client.VersionRange;
41 import com.allanbank.mongodb.util.ServerNameUtils;
42
43 /**
44 * {@link Cluster} tracks the state of the cluster of MongoDB servers.
45 * PropertyChangeEvents are fired when a server is added or marked writable/not
46 * writable.
47 * <p>
48 * This class uses brute force synchronization to protect its internal state. It
49 * is assumed that multiple connections will be concurrently updating the
50 * {@link Cluster} at once and that at any given time this class may not contain
51 * the absolute truth about the state of the cluster. Instead connections should
52 * keep querying for the state of the cluster via their connection until the
53 * view the server returned and the {@link Cluster} are consistent. Since this
54 * class will not fire a {@link PropertyChangeEvent} when the state is not truly
55 * modified the simplest mechanism is to keep querying for the cluster state on
56 * the connection until no addition change events are seen.
57 * </p>
58 *
59 * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
60 * mutated in incompatible ways between any two releases of the driver.
61 * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved
62 */
63 public class Cluster implements ClusterStats {
64
65 /** The property sued for adding a new server. */
66 public static final String SERVER_PROP = "server";
67
68 /** The property name for if there is a writable server. */
69 public static final String WRITABLE_PROP = "writable";
70
71 /** The configuration for connecting to the servers. */
72 protected final MongoClientConfiguration myConfig;
73
74 /** The complete list of servers. */
75 protected final ConcurrentMap<String, Server> myServers;
76
77 /** The range of versions within the cluster. */
78 protected VersionRange myServerVersionRange;
79
80 /** The smallest maximum number of operations in a batch in the cluster. */
81 protected int mySmallestMaxBatchedWriteOperations;
82
83 /** The smallest maximum document size in the cluster. */
84 protected long mySmallestMaxBsonObjectSize;
85
86 /** Support for firing property change events. */
87 /* package */final PropertyChangeSupport myChangeSupport;
88
89 /** The listener for changes to the server. */
90 /* package */final ServerListener myListener;
91
92 /** The complete list of non-writable servers. */
93 /* package */final CopyOnWriteArrayList<Server> myNonWritableServers;
94
95 /** The complete list of writable servers. */
96 /* package */final CopyOnWriteArrayList<Server> myWritableServers;
97
98 /** The type of the cluster. */
99 private final ClusterType myType;
100
101 /**
102 * Creates a new CLusterState.
103 *
104 * @param config
105 * The configuration for the cluster.
106 * @param type
107 * The type of the cluster.
108 */
109 public Cluster(final MongoClientConfiguration config, final ClusterType type) {
110 myConfig = config;
111 myType = type;
112 myChangeSupport = new PropertyChangeSupport(this);
113 myServers = new ConcurrentHashMap<String, Server>();
114 myWritableServers = new CopyOnWriteArrayList<Server>();
115 myNonWritableServers = new CopyOnWriteArrayList<Server>();
116 myListener = new ServerListener();
117 myServerVersionRange = VersionRange.range(Version.parse("0"),
118 Version.parse("0"));
119 }
120
121 /**
122 * Adds a {@link Server} to the {@link Cluster} for the address provided if
123 * one does not already exist.
124 *
125 * @param address
126 * The address of the {@link Server} to return.
127 * @return The {@link Server} for the address.
128 */
129 public Server add(final InetSocketAddress address) {
130 final String normalized = ServerNameUtils.normalize(address);
131 Server server = myServers.get(normalized);
132 if (server == null) {
133
134 server = new Server(address);
135
136 synchronized (this) {
137 final Server existing = myServers.putIfAbsent(normalized,
138 server);
139 if (existing != null) {
140 server = existing;
141 }
142 else {
143 myNonWritableServers.add(server);
144 myChangeSupport.firePropertyChange(SERVER_PROP, null,
145 server);
146
147 server.addListener(myListener);
148 }
149 }
150 }
151 return server;
152 }
153
154 /**
155 * Adds a {@link Server} to the {@link Cluster} for the address provided if
156 * one does not already exist.
157 * <p>
158 * This method is equivalent to calling {@link #add(InetSocketAddress)
159 * add(ServerNameUtils.parse(address))}.
160 * </p>
161 *
162 * @param address
163 * The address of the {@link Server} to return.
164 * @return The {@link Server} for the address.
165 */
166 public Server add(final String address) {
167 Server server = myServers.get(address);
168 if (server == null) {
169 server = add(ServerNameUtils.parse(address));
170 }
171
172 return server;
173 }
174
175 /**
176 * Adds a listener to the state.
177 *
178 * @param listener
179 * The listener for the state changes.
180 */
181 public void addListener(final PropertyChangeListener listener) {
182 synchronized (this) {
183 myChangeSupport.addPropertyChangeListener(listener);
184 }
185 }
186
187 /**
188 * Removes all of the servers from the cluster.
189 */
190 public void clear() {
191 for (final Server server : myServers.values()) {
192 remove(server);
193 }
194 }
195
196 /**
197 * Returns the set of servers that can be used based on the provided
198 * {@link ReadPreference}.
199 *
200 * @param readPreference
201 * The {@link ReadPreference} to filter the servers.
202 * @return The {@link List} of servers that can be used. Servers will be
203 * ordered by preference to be used, most preferred to least
204 * preferred.
205 */
206 public List<Server> findCandidateServers(final ReadPreference readPreference) {
207 List<Server> results = Collections.emptyList();
208
209 switch (readPreference.getMode()) {
210 case NEAREST:
211 results = findNearestCandidates(readPreference);
212 break;
213 case PRIMARY_ONLY:
214 results = findWritableCandidates(readPreference);
215 break;
216 case PRIMARY_PREFERRED:
217 results = merge(findWritableCandidates(readPreference),
218 findNonWritableCandidates(readPreference));
219 break;
220 case SECONDARY_ONLY:
221 results = findNonWritableCandidates(readPreference);
222 break;
223 case SECONDARY_PREFERRED:
224 results = merge(findNonWritableCandidates(readPreference),
225 findWritableCandidates(readPreference));
226 break;
227 case SERVER:
228 results = findCandidateServer(readPreference);
229 break;
230 }
231
232 return results;
233 }
234
235 /**
236 * Locates the set of servers that can be used to send the specified
237 * messages.
238 *
239 * @param message1
240 * The first message to send.
241 * @param message2
242 * The second message to send. May be <code>null</code>.
243 * @return The servers that can be used.
244 */
245 public List<Server> findServers(final Message message1,
246 final Message message2) {
247 List<Server> servers = Collections.emptyList();
248
249 if (message1 != null) {
250 List<Server> potentialServers = findCandidateServers(message1
251 .getReadPreference());
252 servers = potentialServers;
253
254 if (message2 != null) {
255 servers = new ArrayList<Server>(potentialServers);
256 potentialServers = findCandidateServers(message2
257 .getReadPreference());
258 servers.retainAll(potentialServers);
259 }
260 }
261 return servers;
262 }
263
264 /**
265 * Returns the server state for the address provided. If the {@link Server}
266 * does not already exist a non-writable state is created and returned.
267 * <p>
268 * This method is equivalent to calling {@link #add(String) add(address)}.
269 * </p>
270 *
271 * @param address
272 * The address of the {@link Server} to return.
273 * @return The {@link Server} for the address.
274 */
275 public Server get(final String address) {
276 return add(address);
277 }
278
279 /**
280 * Returns a copy of the list of non-writable servers. The list returned is
281 * a copy of the internal list and can be modified by the caller.
282 *
283 * @return The complete list of non-writable servers.
284 */
285 public List<Server> getNonWritableServers() {
286 return new ArrayList<Server>(myNonWritableServers);
287 }
288
289 /**
290 * Returns a copy of the list of servers. The list returned is a copy of the
291 * internal list and can be modified by the caller.
292 *
293 * @return The complete list of servers.
294 */
295 public List<Server> getServers() {
296 return new ArrayList<Server>(myServers.values());
297 }
298
299 /**
300 * {@inheritDoc}
301 */
302 @Override
303 public VersionRange getServerVersionRange() {
304 return myServerVersionRange;
305 }
306
307 /**
308 * Returns smallest value for the maximum number of write operations allowed
309 * in a single write command.
310 *
311 * @return The smallest value for maximum number of write operations allowed
312 * in a single write command.
313 */
314 @Override
315 public int getSmallestMaxBatchedWriteOperations() {
316 return mySmallestMaxBatchedWriteOperations;
317 }
318
319 /**
320 * Returns the smallest value for the maximum BSON object size within the
321 * cluster.
322 *
323 * @return The smallest value for the maximum BSON object size within the
324 * cluster.
325 */
326 @Override
327 public long getSmallestMaxBsonObjectSize() {
328 return mySmallestMaxBsonObjectSize;
329 }
330
331 /**
332 * Returns the type of cluster.
333 *
334 * @return The type of cluster.
335 */
336 public ClusterType getType() {
337 return myType;
338 }
339
340 /**
341 * Returns a copy of the list of writable servers. The list returned is a
342 * copy of the internal list and can be modified by the caller.
343 *
344 * @return The complete list of writable servers.
345 */
346 public List<Server> getWritableServers() {
347 return new ArrayList<Server>(myWritableServers);
348 }
349
350 /**
351 * Removes the specified server from the cluster.
352 *
353 * @param server
354 * The server to remove from the cluster.
355 */
356 public void remove(final Server server) {
357
358 final Server removed = myServers.remove(server.getCanonicalName());
359 if (removed != null) {
360 removed.removeListener(myListener);
361 myNonWritableServers.remove(removed);
362 myWritableServers.remove(removed);
363
364 updateVersions();
365 }
366 }
367
368 /**
369 * Removes a listener to the state.
370 *
371 * @param listener
372 * The listener for the state changes.
373 */
374 public void removeListener(final PropertyChangeListener listener) {
375 synchronized (this) {
376 myChangeSupport.removePropertyChangeListener(listener);
377 }
378 }
379
380 /**
381 * Computes a relative CDF (cumulative distribution function) for the
382 * servers based on the latency from the client.
383 * <p>
384 * The latency of each server is used to create a strict ordering of servers
385 * from lowest latency to highest. The relative latency of the i'th server
386 * is then calculated based on the function:
387 * </p>
388 * <blockquote>
389 *
390 * <pre>
391 * latency[0]
392 * relative_latency[i] = ----------
393 * latency[i]
394 * </pre>
395 *
396 * </blockquote>
397 * <p>
398 * The relative latencies are then then summed and the probability of
399 * selecting each server is then calculated by:
400 * </p>
401 * <blockquote>
402 *
403 * <pre>
404 * relative_latency[i]
405 * probability[i] = -------------------------------------------------
406 * sum(relative_latency[0], ... relative_latency[n])
407 * </pre>
408 *
409 * </blockquote>
410 *
411 * <p>
412 * The CDF over these probabilities is returned.
413 * </p>
414 *
415 * @param servers
416 * The servers to compute the CDF for.
417 * @return The CDF for the server latencies.
418 */
419 protected final double[] cdf(final List<Server> servers) {
420 Collections.sort(servers, ServerLatencyComparator.COMPARATOR);
421
422 // Pick a server to move to the front.
423 final double[] relativeLatency = new double[servers.size()];
424 double sum = 0;
425 double first = Double.NEGATIVE_INFINITY;
426 for (int i = 0; i < relativeLatency.length; ++i) {
427 final Server server = servers.get(i);
428 double latency = server.getAverageLatency();
429
430 // Turn the latency into a ratio of the lowest latency.
431 if (first == Double.NEGATIVE_INFINITY) {
432 first = latency;
433 latency = 1.0D; // By definition N/N = 1.0.
434 }
435 else {
436 latency /= first;
437 }
438
439 latency = (1.0D / latency); // 4 times as long is 1/4 as likely.
440 relativeLatency[i] = latency;
441 sum += latency;
442 }
443
444 // Turn the latencies into a range of 0 <= relativeLatency < 1.
445 // Also known as the CDF (cumulative distribution function)
446 double accum = 0.0D;
447 for (int i = 0; i < relativeLatency.length; ++i) {
448 accum += relativeLatency[i];
449
450 relativeLatency[i] = accum / sum;
451 }
452
453 return relativeLatency;
454 }
455
456 /**
457 * Finds the candidate server, if known.
458 *
459 * @param readPreference
460 * The read preference to match the server against.
461 * @return The Server found in a singleton list or an empty list if the
462 * server is not known.
463 */
464 protected List<Server> findCandidateServer(
465 final ReadPreference readPreference) {
466 final Server server = myServers.get(readPreference.getServer());
467 if ((server != null) && readPreference.matches(server.getTags())) {
468 return Collections.singletonList(server);
469 }
470 return Collections.emptyList();
471 }
472
473 /**
474 * Returns the list of servers that match the read preference's tags.
475 *
476 * @param readPreference
477 * The read preference to match the server against.
478 * @return The servers found in order of preference. Generally this is in
479 * latency order but we randomly move one of the servers to the
480 * front of the list to distribute the load across more servers.
481 *
482 * @see #sort
483 */
484 protected List<Server> findNearestCandidates(
485 final ReadPreference readPreference) {
486 final List<Server> results = new ArrayList<Server>(myServers.size());
487 for (final Server server : myServers.values()) {
488 if (readPreference.matches(server.getTags())) {
489 results.add(server);
490 }
491 }
492
493 // Sort the server by preference.
494 sort(results);
495
496 return results;
497 }
498
499 /**
500 * Returns the list of non-writable servers that match the read preference's
501 * tags.
502 *
503 * @param readPreference
504 * The read preference to match the server against.
505 * @return The servers found in order of preference. Generally this is in
506 * latency order but we randomly move one of the servers to the
507 * front of the list to distribute the load across more servers.
508 *
509 * @see #sort
510 */
511 protected List<Server> findNonWritableCandidates(
512 final ReadPreference readPreference) {
513 final List<Server> results = new ArrayList<Server>(
514 myNonWritableServers.size());
515 for (final Server server : myNonWritableServers) {
516 if (readPreference.matches(server.getTags())
517 && isRecentEnough(server.getSecondsBehind())) {
518 results.add(server);
519 }
520 }
521
522 // Sort the server by preference.
523 sort(results);
524
525 return results;
526 }
527
528 /**
529 * Returns the list of writable servers that match the read preference's
530 * tags.
531 *
532 * @param readPreference
533 * The read preference to match the server against.
534 * @return The servers found in order of preference. Generally this is in
535 * latency order but we randomly move one of the servers to the
536 * front of the list to distribute the load across more servers.
537 *
538 * @see #sort
539 */
540 protected List<Server> findWritableCandidates(
541 final ReadPreference readPreference) {
542 final List<Server> results = new ArrayList<Server>(
543 myWritableServers.size());
544 for (final Server server : myWritableServers) {
545 if (readPreference.matches(server.getTags())) {
546 results.add(server);
547 }
548 }
549
550 // Sort the server by preference.
551 sort(results);
552
553 return results;
554 }
555
556 /**
557 * Sorts the servers based on the latency from the client.
558 * <p>
559 * To distribute the requests across servers more evenly the first server is
560 * replaced with a random server based on a single sided simplified Gaussian
561 * distribution.
562 * </p>
563 *
564 * @param servers
565 * The servers to be sorted.
566 *
567 * @see #cdf(List)
568 */
569 protected final void sort(final List<Server> servers) {
570 if (servers.isEmpty() || (servers.size() == 1)) {
571 return;
572 }
573
574 // Pick a server to move to the front.
575 final double[] cdf = cdf(servers);
576 final double random = Math.random();
577 int index = Arrays.binarySearch(cdf, random);
578
579 // Probably a negative index since not expecting an exact match.
580 if (index < 0) {
581 // Undo (-(insertion point) - 1)
582 index = Math.abs(index + 1);
583 }
584
585 // Should not be needed. random should be < 1.0 and
586 // relativeLatency[relativeLatency.length] == 1.0
587 //
588 // assert (random < 1.0D) :
589 // "The random value should be strictly less than 1.0.";
590 // assert (cdf[cdf.length - 1] <= 1.0001) :
591 // "The cdf of the last server should be 1.0.";
592 // assert (0.9999 <= cdf[cdf.length - 1]) :
593 // "The cdf of the last server should be 1.0.";
594 index = Math.min(cdf.length - 1, index);
595
596 // Swap the lucky winner into the first position.
597 Collections.swap(servers, 0, index);
598 }
599
600 /**
601 * Updates the min/max versions across all servers. Since the max BSON
602 * object size is tied to the version we also update that value.
603 */
604 protected void updateVersions() {
605 Version min = null;
606 Version max = null;
607
608 long smallestMaxBsonObjectSize = Long.MAX_VALUE;
609 int smallestMaxBatchedWriteOperations = Integer.MAX_VALUE;
610
611 for (final Server server : myServers.values()) {
612 min = Version.earlier(min, server.getVersion());
613 max = Version.later(max, server.getVersion());
614
615 smallestMaxBsonObjectSize = Math.min(smallestMaxBsonObjectSize,
616 server.getMaxBsonObjectSize());
617 smallestMaxBatchedWriteOperations = Math.min(
618 smallestMaxBatchedWriteOperations,
619 server.getMaxBatchedWriteOperations());
620 }
621
622 myServerVersionRange = VersionRange.range(min, max);
623 mySmallestMaxBsonObjectSize = smallestMaxBsonObjectSize;
624 mySmallestMaxBatchedWriteOperations = smallestMaxBatchedWriteOperations;
625 }
626
627 /**
628 * Returns true if the server is recent enough to be queried.
629 *
630 * @param secondsBehind
631 * The number of seconds the server is behind.
632 * @return True if the server is recent enough to be queried, false
633 * otherwise.
634 */
635 private boolean isRecentEnough(final double secondsBehind) {
636 return ((secondsBehind * 1000) < myConfig.getMaxSecondaryLag());
637 }
638
639 /**
640 * Merges the two lists into a single list.
641 *
642 * @param list1
643 * The first list of servers.
644 * @param list2
645 * The second list of servers.
646 * @return The 2 lists of servers merged into a single list.
647 */
648 private final List<Server> merge(final List<Server> list1,
649 final List<Server> list2) {
650 List<Server> results;
651 if (list1.isEmpty()) {
652 results = list2;
653 }
654 else if (list2.isEmpty()) {
655 results = list1;
656 }
657 else {
658 results = new ArrayList<Server>(list1.size() + list2.size());
659 results.addAll(list1);
660 results.addAll(list2);
661 }
662 return results;
663 }
664
665 /**
666 * ServerListener provides a listener for the state updates of the
667 * {@link Server}.
668 *
669 * @api.no This class is <b>NOT</b> part of the drivers API. This class may
670 * be mutated in incompatible ways between any two releases of the
671 * driver.
672 * @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved
673 */
674 protected final class ServerListener implements PropertyChangeListener {
675 @Override
676 public void propertyChange(final PropertyChangeEvent evt) {
677 final String propertyName = evt.getPropertyName();
678 final Server server = (Server) evt.getSource();
679
680 if (Server.STATE_PROP.equals(propertyName)) {
681
682 final boolean old = !myWritableServers.isEmpty();
683
684 if (Server.State.WRITABLE == evt.getNewValue()) {
685 myWritableServers.addIfAbsent(server);
686 myNonWritableServers.remove(server);
687 }
688 else if (Server.State.READ_ONLY == evt.getNewValue()) {
689 myWritableServers.remove(server);
690 myNonWritableServers.addIfAbsent(server);
691 }
692 else {
693 myWritableServers.remove(server);
694 myNonWritableServers.remove(server);
695 }
696
697 myChangeSupport.firePropertyChange(WRITABLE_PROP, old,
698 !myWritableServers.isEmpty());
699
700 }
701 else if (Server.CANONICAL_NAME_PROP.equals(propertyName)) {
702 // Resolved a new canonical name. e.g., What the server
703 // calls itself in the cluster.
704
705 // Remove the entry with the old name.
706 myServers.remove(evt.getOldValue(), server);
707
708 // And add with the new name. Checking for duplicate entries.
709 final Server existing = myServers.putIfAbsent(
710 server.getCanonicalName(), server);
711 if (existing != null) {
712 // Already have a Server with that name. Remove the listener
713 // and let this server get garbage collected.
714 myNonWritableServers.remove(server);
715 myWritableServers.remove(server);
716 server.removeListener(myListener);
717
718 myChangeSupport.firePropertyChange(SERVER_PROP, server,
719 null);
720 }
721 }
722 else if (Server.VERSION_PROP.equals(propertyName)) {
723 // If the old version is either the high or low for the cluster
724 // (or the version is UNKNOWN) then recompute the high/low
725 // versions.
726 final Version old = (Version) evt.getOldValue();
727
728 if (Version.UNKNOWN.equals(old)
729 || (myServerVersionRange.getUpperBounds()
730 .compareTo(old) <= 0)
731 || (myServerVersionRange.getLowerBounds()
732 .compareTo(old) >= 0)) {
733 updateVersions();
734 }
735 }
736 }
737 }
738 }