Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
Cluster |
|
| 2.5555555555555554;2.556 | ||||
Cluster$1 |
|
| 2.5555555555555554;2.556 | ||||
Cluster$ServerListener |
|
| 2.5555555555555554;2.556 |
1 | /* | |
2 | * #%L | |
3 | * Cluster.java - mongodb-async-driver - Allanbank Consulting, Inc. | |
4 | * %% | |
5 | * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc. | |
6 | * %% | |
7 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
8 | * you may not use this file except in compliance with the License. | |
9 | * You may obtain a copy of the License at | |
10 | * | |
11 | * http://www.apache.org/licenses/LICENSE-2.0 | |
12 | * | |
13 | * Unless required by applicable law or agreed to in writing, software | |
14 | * distributed under the License is distributed on an "AS IS" BASIS, | |
15 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
16 | * See the License for the specific language governing permissions and | |
17 | * limitations under the License. | |
18 | * #L% | |
19 | */ | |
20 | package com.allanbank.mongodb.client.state; | |
21 | ||
22 | import java.beans.PropertyChangeEvent; | |
23 | import java.beans.PropertyChangeListener; | |
24 | import java.beans.PropertyChangeSupport; | |
25 | import java.net.InetSocketAddress; | |
26 | import java.util.ArrayList; | |
27 | import java.util.Arrays; | |
28 | import java.util.Collections; | |
29 | import java.util.List; | |
30 | import java.util.concurrent.ConcurrentHashMap; | |
31 | import java.util.concurrent.ConcurrentMap; | |
32 | import java.util.concurrent.CopyOnWriteArrayList; | |
33 | ||
34 | import com.allanbank.mongodb.MongoClientConfiguration; | |
35 | import com.allanbank.mongodb.ReadPreference; | |
36 | import com.allanbank.mongodb.Version; | |
37 | import com.allanbank.mongodb.client.ClusterStats; | |
38 | import com.allanbank.mongodb.client.ClusterType; | |
39 | import com.allanbank.mongodb.client.Message; | |
40 | import com.allanbank.mongodb.client.VersionRange; | |
41 | import com.allanbank.mongodb.util.ServerNameUtils; | |
42 | ||
43 | /** | |
44 | * {@link Cluster} tracks the state of the cluster of MongoDB servers. | |
45 | * PropertyChangeEvents are fired when a server is added or marked writable/not | |
46 | * writable. | |
47 | * <p> | |
48 | * This class uses brute force synchronization to protect its internal state. It | |
49 | * is assumed that multiple connections will be concurrently updating the | |
50 | * {@link Cluster} at once and that at any given time this class may not contain | |
51 | * the absolute truth about the state of the cluster. Instead connections should | |
52 | * keep querying for the state of the cluster via their connection until the | |
53 | * view the server returned and the {@link Cluster} are consistent. Since this | |
54 | * class will not fire a {@link PropertyChangeEvent} when the state is not truly | |
55 | * modified the simplest mechanism is to keep querying for the cluster state on | |
56 | * the connection until no addition change events are seen. | |
57 | * </p> | |
58 | * | |
59 | * @api.no This class is <b>NOT</b> part of the drivers API. This class may be | |
60 | * mutated in incompatible ways between any two releases of the driver. | |
61 | * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved | |
62 | */ | |
63 | public class Cluster implements ClusterStats { | |
64 | ||
65 | /** The property sued for adding a new server. */ | |
66 | public static final String SERVER_PROP = "server"; | |
67 | ||
68 | /** The property name for if there is a writable server. */ | |
69 | public static final String WRITABLE_PROP = "writable"; | |
70 | ||
71 | /** The configuration for connecting to the servers. */ | |
72 | protected final MongoClientConfiguration myConfig; | |
73 | ||
74 | /** The complete list of servers. */ | |
75 | protected final ConcurrentMap<String, Server> myServers; | |
76 | ||
77 | /** The range of versions within the cluster. */ | |
78 | protected VersionRange myServerVersionRange; | |
79 | ||
80 | /** The smallest maximum number of operations in a batch in the cluster. */ | |
81 | protected int mySmallestMaxBatchedWriteOperations; | |
82 | ||
83 | /** The smallest maximum document size in the cluster. */ | |
84 | protected long mySmallestMaxBsonObjectSize; | |
85 | ||
86 | /** Support for firing property change events. */ | |
87 | /* package */final PropertyChangeSupport myChangeSupport; | |
88 | ||
89 | /** The listener for changes to the server. */ | |
90 | /* package */final ServerListener myListener; | |
91 | ||
92 | /** The complete list of non-writable servers. */ | |
93 | /* package */final CopyOnWriteArrayList<Server> myNonWritableServers; | |
94 | ||
95 | /** The complete list of writable servers. */ | |
96 | /* package */final CopyOnWriteArrayList<Server> myWritableServers; | |
97 | ||
98 | /** The type of the cluster. */ | |
99 | private final ClusterType myType; | |
100 | ||
101 | /** | |
102 | * Creates a new CLusterState. | |
103 | * | |
104 | * @param config | |
105 | * The configuration for the cluster. | |
106 | * @param type | |
107 | * The type of the cluster. | |
108 | */ | |
109 | 268 | public Cluster(final MongoClientConfiguration config, final ClusterType type) { |
110 | 268 | myConfig = config; |
111 | 268 | myType = type; |
112 | 268 | myChangeSupport = new PropertyChangeSupport(this); |
113 | 268 | myServers = new ConcurrentHashMap<String, Server>(); |
114 | 268 | myWritableServers = new CopyOnWriteArrayList<Server>(); |
115 | 268 | myNonWritableServers = new CopyOnWriteArrayList<Server>(); |
116 | 268 | myListener = new ServerListener(); |
117 | 268 | myServerVersionRange = VersionRange.range(Version.parse("0"), |
118 | Version.parse("0")); | |
119 | 268 | } |
120 | ||
121 | /** | |
122 | * Adds a {@link Server} to the {@link Cluster} for the address provided if | |
123 | * one does not already exist. | |
124 | * | |
125 | * @param address | |
126 | * The address of the {@link Server} to return. | |
127 | * @return The {@link Server} for the address. | |
128 | */ | |
129 | public Server add(final InetSocketAddress address) { | |
130 | 373 | final String normalized = ServerNameUtils.normalize(address); |
131 | 373 | Server server = myServers.get(normalized); |
132 | 373 | if (server == null) { |
133 | ||
134 | 343 | server = new Server(address); |
135 | ||
136 | 343 | synchronized (this) { |
137 | 343 | final Server existing = myServers.putIfAbsent(normalized, |
138 | server); | |
139 | 343 | if (existing != null) { |
140 | 0 | server = existing; |
141 | } | |
142 | else { | |
143 | 343 | myNonWritableServers.add(server); |
144 | 343 | myChangeSupport.firePropertyChange(SERVER_PROP, null, |
145 | server); | |
146 | ||
147 | 343 | server.addListener(myListener); |
148 | } | |
149 | 343 | } |
150 | } | |
151 | 373 | return server; |
152 | } | |
153 | ||
154 | /** | |
155 | * Adds a {@link Server} to the {@link Cluster} for the address provided if | |
156 | * one does not already exist. | |
157 | * <p> | |
158 | * This method is equivalent to calling {@link #add(InetSocketAddress) | |
159 | * add(ServerNameUtils.parse(address))}. | |
160 | * </p> | |
161 | * | |
162 | * @param address | |
163 | * The address of the {@link Server} to return. | |
164 | * @return The {@link Server} for the address. | |
165 | */ | |
166 | public Server add(final String address) { | |
167 | 267 | Server server = myServers.get(address); |
168 | 267 | if (server == null) { |
169 | 184 | server = add(ServerNameUtils.parse(address)); |
170 | } | |
171 | ||
172 | 267 | return server; |
173 | } | |
174 | ||
175 | /** | |
176 | * Adds a listener to the state. | |
177 | * | |
178 | * @param listener | |
179 | * The listener for the state changes. | |
180 | */ | |
181 | public void addListener(final PropertyChangeListener listener) { | |
182 | 59 | synchronized (this) { |
183 | 59 | myChangeSupport.addPropertyChangeListener(listener); |
184 | 59 | } |
185 | 59 | } |
186 | ||
187 | /** | |
188 | * Removes all of the servers from the cluster. | |
189 | */ | |
190 | public void clear() { | |
191 | 2 | for (final Server server : myServers.values()) { |
192 | 2 | remove(server); |
193 | 2 | } |
194 | 2 | } |
195 | ||
196 | /** | |
197 | * Returns the set of servers that can be used based on the provided | |
198 | * {@link ReadPreference}. | |
199 | * | |
200 | * @param readPreference | |
201 | * The {@link ReadPreference} to filter the servers. | |
202 | * @return The {@link List} of servers that can be used. Servers will be | |
203 | * ordered by preference to be used, most preferred to least | |
204 | * preferred. | |
205 | */ | |
206 | public List<Server> findCandidateServers(final ReadPreference readPreference) { | |
207 | 50 | List<Server> results = Collections.emptyList(); |
208 | ||
209 | 1 | switch (readPreference.getMode()) { |
210 | case NEAREST: | |
211 | 2 | results = findNearestCandidates(readPreference); |
212 | 2 | break; |
213 | case PRIMARY_ONLY: | |
214 | 10 | results = findWritableCandidates(readPreference); |
215 | 10 | break; |
216 | case PRIMARY_PREFERRED: | |
217 | 5 | results = merge(findWritableCandidates(readPreference), |
218 | findNonWritableCandidates(readPreference)); | |
219 | 5 | break; |
220 | case SECONDARY_ONLY: | |
221 | 23 | results = findNonWritableCandidates(readPreference); |
222 | 23 | break; |
223 | case SECONDARY_PREFERRED: | |
224 | 4 | results = merge(findNonWritableCandidates(readPreference), |
225 | findWritableCandidates(readPreference)); | |
226 | 4 | break; |
227 | case SERVER: | |
228 | 6 | results = findCandidateServer(readPreference); |
229 | break; | |
230 | } | |
231 | ||
232 | 50 | return results; |
233 | } | |
234 | ||
235 | /** | |
236 | * Locates the set of servers that can be used to send the specified | |
237 | * messages. | |
238 | * | |
239 | * @param message1 | |
240 | * The first message to send. | |
241 | * @param message2 | |
242 | * The second message to send. May be <code>null</code>. | |
243 | * @return The servers that can be used. | |
244 | */ | |
245 | public List<Server> findServers(final Message message1, | |
246 | final Message message2) { | |
247 | 19 | List<Server> servers = Collections.emptyList(); |
248 | ||
249 | 19 | if (message1 != null) { |
250 | 18 | List<Server> potentialServers = findCandidateServers(message1 |
251 | .getReadPreference()); | |
252 | 18 | servers = potentialServers; |
253 | ||
254 | 18 | if (message2 != null) { |
255 | 5 | servers = new ArrayList<Server>(potentialServers); |
256 | 5 | potentialServers = findCandidateServers(message2 |
257 | .getReadPreference()); | |
258 | 5 | servers.retainAll(potentialServers); |
259 | } | |
260 | } | |
261 | 19 | return servers; |
262 | } | |
263 | ||
264 | /** | |
265 | * Returns the server state for the address provided. If the {@link Server} | |
266 | * does not already exist a non-writable state is created and returned. | |
267 | * <p> | |
268 | * This method is equivalent to calling {@link #add(String) add(address)}. | |
269 | * </p> | |
270 | * | |
271 | * @param address | |
272 | * The address of the {@link Server} to return. | |
273 | * @return The {@link Server} for the address. | |
274 | */ | |
275 | public Server get(final String address) { | |
276 | 64 | return add(address); |
277 | } | |
278 | ||
279 | /** | |
280 | * Returns a copy of the list of non-writable servers. The list returned is | |
281 | * a copy of the internal list and can be modified by the caller. | |
282 | * | |
283 | * @return The complete list of non-writable servers. | |
284 | */ | |
285 | public List<Server> getNonWritableServers() { | |
286 | 2 | return new ArrayList<Server>(myNonWritableServers); |
287 | } | |
288 | ||
289 | /** | |
290 | * Returns a copy of the list of servers. The list returned is a copy of the | |
291 | * internal list and can be modified by the caller. | |
292 | * | |
293 | * @return The complete list of servers. | |
294 | */ | |
295 | public List<Server> getServers() { | |
296 | 140 | return new ArrayList<Server>(myServers.values()); |
297 | } | |
298 | ||
299 | /** | |
300 | * {@inheritDoc} | |
301 | */ | |
302 | @Override | |
303 | public VersionRange getServerVersionRange() { | |
304 | 0 | return myServerVersionRange; |
305 | } | |
306 | ||
307 | /** | |
308 | * Returns smallest value for the maximum number of write operations allowed | |
309 | * in a single write command. | |
310 | * | |
311 | * @return The smallest value for maximum number of write operations allowed | |
312 | * in a single write command. | |
313 | */ | |
314 | @Override | |
315 | public int getSmallestMaxBatchedWriteOperations() { | |
316 | 0 | return mySmallestMaxBatchedWriteOperations; |
317 | } | |
318 | ||
319 | /** | |
320 | * Returns the smallest value for the maximum BSON object size within the | |
321 | * cluster. | |
322 | * | |
323 | * @return The smallest value for the maximum BSON object size within the | |
324 | * cluster. | |
325 | */ | |
326 | @Override | |
327 | public long getSmallestMaxBsonObjectSize() { | |
328 | 0 | return mySmallestMaxBsonObjectSize; |
329 | } | |
330 | ||
331 | /** | |
332 | * Returns the type of cluster. | |
333 | * | |
334 | * @return The type of cluster. | |
335 | */ | |
336 | public ClusterType getType() { | |
337 | 104 | return myType; |
338 | } | |
339 | ||
340 | /** | |
341 | * Returns a copy of the list of writable servers. The list returned is a | |
342 | * copy of the internal list and can be modified by the caller. | |
343 | * | |
344 | * @return The complete list of writable servers. | |
345 | */ | |
346 | public List<Server> getWritableServers() { | |
347 | 34 | return new ArrayList<Server>(myWritableServers); |
348 | } | |
349 | ||
350 | /** | |
351 | * Removes the specified server from the cluster. | |
352 | * | |
353 | * @param server | |
354 | * The server to remove from the cluster. | |
355 | */ | |
356 | public void remove(final Server server) { | |
357 | ||
358 | 2 | final Server removed = myServers.remove(server.getCanonicalName()); |
359 | 2 | if (removed != null) { |
360 | 2 | removed.removeListener(myListener); |
361 | 2 | myNonWritableServers.remove(removed); |
362 | 2 | myWritableServers.remove(removed); |
363 | ||
364 | 2 | updateVersions(); |
365 | } | |
366 | 2 | } |
367 | ||
368 | /** | |
369 | * Removes a listener to the state. | |
370 | * | |
371 | * @param listener | |
372 | * The listener for the state changes. | |
373 | */ | |
374 | public void removeListener(final PropertyChangeListener listener) { | |
375 | 52 | synchronized (this) { |
376 | 52 | myChangeSupport.removePropertyChangeListener(listener); |
377 | 52 | } |
378 | 52 | } |
379 | ||
380 | /** | |
381 | * Computes a relative CDF (cumulative distribution function) for the | |
382 | * servers based on the latency from the client. | |
383 | * <p> | |
384 | * The latency of each server is used to create a strict ordering of servers | |
385 | * from lowest latency to highest. The relative latency of the i'th server | |
386 | * is then calculated based on the function: | |
387 | * </p> | |
388 | * <blockquote> | |
389 | * | |
390 | * <pre> | |
391 | * latency[0] | |
392 | * relative_latency[i] = ---------- | |
393 | * latency[i] | |
394 | * </pre> | |
395 | * | |
396 | * </blockquote> | |
397 | * <p> | |
398 | * The relative latencies are then then summed and the probability of | |
399 | * selecting each server is then calculated by: | |
400 | * </p> | |
401 | * <blockquote> | |
402 | * | |
403 | * <pre> | |
404 | * relative_latency[i] | |
405 | * probability[i] = ------------------------------------------------- | |
406 | * sum(relative_latency[0], ... relative_latency[n]) | |
407 | * </pre> | |
408 | * | |
409 | * </blockquote> | |
410 | * | |
411 | * <p> | |
412 | * The CDF over these probabilities is returned. | |
413 | * </p> | |
414 | * | |
415 | * @param servers | |
416 | * The servers to compute the CDF for. | |
417 | * @return The CDF for the server latencies. | |
418 | */ | |
419 | protected final double[] cdf(final List<Server> servers) { | |
420 | 121 | Collections.sort(servers, ServerLatencyComparator.COMPARATOR); |
421 | ||
422 | // Pick a server to move to the front. | |
423 | 121 | final double[] relativeLatency = new double[servers.size()]; |
424 | 121 | double sum = 0; |
425 | 121 | double first = Double.NEGATIVE_INFINITY; |
426 | 100212 | for (int i = 0; i < relativeLatency.length; ++i) { |
427 | 100091 | final Server server = servers.get(i); |
428 | 100091 | double latency = server.getAverageLatency(); |
429 | ||
430 | // Turn the latency into a ratio of the lowest latency. | |
431 | 100091 | if (first == Double.NEGATIVE_INFINITY) { |
432 | 121 | first = latency; |
433 | 121 | latency = 1.0D; // By definition N/N = 1.0. |
434 | } | |
435 | else { | |
436 | 99970 | latency /= first; |
437 | } | |
438 | ||
439 | 100091 | latency = (1.0D / latency); // 4 times as long is 1/4 as likely. |
440 | 100091 | relativeLatency[i] = latency; |
441 | 100091 | sum += latency; |
442 | } | |
443 | ||
444 | // Turn the latencies into a range of 0 <= relativeLatency < 1. | |
445 | // Also known as the CDF (cumulative distribution function) | |
446 | 121 | double accum = 0.0D; |
447 | 100212 | for (int i = 0; i < relativeLatency.length; ++i) { |
448 | 100091 | accum += relativeLatency[i]; |
449 | ||
450 | 100091 | relativeLatency[i] = accum / sum; |
451 | } | |
452 | ||
453 | 121 | return relativeLatency; |
454 | } | |
455 | ||
456 | /** | |
457 | * Finds the candidate server, if known. | |
458 | * | |
459 | * @param readPreference | |
460 | * The read preference to match the server against. | |
461 | * @return The Server found in a singleton list or an empty list if the | |
462 | * server is not known. | |
463 | */ | |
464 | protected List<Server> findCandidateServer( | |
465 | final ReadPreference readPreference) { | |
466 | 6 | final Server server = myServers.get(readPreference.getServer()); |
467 | 6 | if ((server != null) && readPreference.matches(server.getTags())) { |
468 | 3 | return Collections.singletonList(server); |
469 | } | |
470 | 3 | return Collections.emptyList(); |
471 | } | |
472 | ||
473 | /** | |
474 | * Returns the list of servers that match the read preference's tags. | |
475 | * | |
476 | * @param readPreference | |
477 | * The read preference to match the server against. | |
478 | * @return The servers found in order of preference. Generally this is in | |
479 | * latency order but we randomly move one of the servers to the | |
480 | * front of the list to distribute the load across more servers. | |
481 | * | |
482 | * @see #sort | |
483 | */ | |
484 | protected List<Server> findNearestCandidates( | |
485 | final ReadPreference readPreference) { | |
486 | 2 | final List<Server> results = new ArrayList<Server>(myServers.size()); |
487 | 2 | for (final Server server : myServers.values()) { |
488 | 6 | if (readPreference.matches(server.getTags())) { |
489 | 5 | results.add(server); |
490 | } | |
491 | 6 | } |
492 | ||
493 | // Sort the server by preference. | |
494 | 2 | sort(results); |
495 | ||
496 | 2 | return results; |
497 | } | |
498 | ||
499 | /** | |
500 | * Returns the list of non-writable servers that match the read preference's | |
501 | * tags. | |
502 | * | |
503 | * @param readPreference | |
504 | * The read preference to match the server against. | |
505 | * @return The servers found in order of preference. Generally this is in | |
506 | * latency order but we randomly move one of the servers to the | |
507 | * front of the list to distribute the load across more servers. | |
508 | * | |
509 | * @see #sort | |
510 | */ | |
511 | protected List<Server> findNonWritableCandidates( | |
512 | final ReadPreference readPreference) { | |
513 | 32 | final List<Server> results = new ArrayList<Server>( |
514 | myNonWritableServers.size()); | |
515 | 32 | for (final Server server : myNonWritableServers) { |
516 | 90 | if (readPreference.matches(server.getTags()) |
517 | && isRecentEnough(server.getSecondsBehind())) { | |
518 | 84 | results.add(server); |
519 | } | |
520 | 90 | } |
521 | ||
522 | // Sort the server by preference. | |
523 | 32 | sort(results); |
524 | ||
525 | 32 | return results; |
526 | } | |
527 | ||
528 | /** | |
529 | * Returns the list of writable servers that match the read preference's | |
530 | * tags. | |
531 | * | |
532 | * @param readPreference | |
533 | * The read preference to match the server against. | |
534 | * @return The servers found in order of preference. Generally this is in | |
535 | * latency order but we randomly move one of the servers to the | |
536 | * front of the list to distribute the load across more servers. | |
537 | * | |
538 | * @see #sort | |
539 | */ | |
540 | protected List<Server> findWritableCandidates( | |
541 | final ReadPreference readPreference) { | |
542 | 19 | final List<Server> results = new ArrayList<Server>( |
543 | myWritableServers.size()); | |
544 | 19 | for (final Server server : myWritableServers) { |
545 | 25 | if (readPreference.matches(server.getTags())) { |
546 | 18 | results.add(server); |
547 | } | |
548 | 25 | } |
549 | ||
550 | // Sort the server by preference. | |
551 | 19 | sort(results); |
552 | ||
553 | 19 | return results; |
554 | } | |
555 | ||
556 | /** | |
557 | * Sorts the servers based on the latency from the client. | |
558 | * <p> | |
559 | * To distribute the requests across servers more evenly the first server is | |
560 | * replaced with a random server based on a single sided simplified Gaussian | |
561 | * distribution. | |
562 | * </p> | |
563 | * | |
564 | * @param servers | |
565 | * The servers to be sorted. | |
566 | * | |
567 | * @see #cdf(List) | |
568 | */ | |
569 | protected final void sort(final List<Server> servers) { | |
570 | 153 | if (servers.isEmpty() || (servers.size() == 1)) { |
571 | 33 | return; |
572 | } | |
573 | ||
574 | // Pick a server to move to the front. | |
575 | 120 | final double[] cdf = cdf(servers); |
576 | 120 | final double random = Math.random(); |
577 | 120 | int index = Arrays.binarySearch(cdf, random); |
578 | ||
579 | // Probably a negative index since not expecting an exact match. | |
580 | 120 | if (index < 0) { |
581 | // Undo (-(insertion point) - 1) | |
582 | 120 | index = Math.abs(index + 1); |
583 | } | |
584 | ||
585 | // Should not be needed. random should be < 1.0 and | |
586 | // relativeLatency[relativeLatency.length] == 1.0 | |
587 | // | |
588 | // assert (random < 1.0D) : | |
589 | // "The random value should be strictly less than 1.0."; | |
590 | // assert (cdf[cdf.length - 1] <= 1.0001) : | |
591 | // "The cdf of the last server should be 1.0."; | |
592 | // assert (0.9999 <= cdf[cdf.length - 1]) : | |
593 | // "The cdf of the last server should be 1.0."; | |
594 | 120 | index = Math.min(cdf.length - 1, index); |
595 | ||
596 | // Swap the lucky winner into the first position. | |
597 | 120 | Collections.swap(servers, 0, index); |
598 | 120 | } |
599 | ||
600 | /** | |
601 | * Updates the min/max versions across all servers. Since the max BSON | |
602 | * object size is tied to the version we also update that value. | |
603 | */ | |
604 | protected void updateVersions() { | |
605 | 102 | Version min = null; |
606 | 102 | Version max = null; |
607 | ||
608 | 102 | long smallestMaxBsonObjectSize = Long.MAX_VALUE; |
609 | 102 | int smallestMaxBatchedWriteOperations = Integer.MAX_VALUE; |
610 | ||
611 | 102 | for (final Server server : myServers.values()) { |
612 | 100 | min = Version.earlier(min, server.getVersion()); |
613 | 100 | max = Version.later(max, server.getVersion()); |
614 | ||
615 | 100 | smallestMaxBsonObjectSize = Math.min(smallestMaxBsonObjectSize, |
616 | server.getMaxBsonObjectSize()); | |
617 | 100 | smallestMaxBatchedWriteOperations = Math.min( |
618 | smallestMaxBatchedWriteOperations, | |
619 | server.getMaxBatchedWriteOperations()); | |
620 | 100 | } |
621 | ||
622 | 102 | myServerVersionRange = VersionRange.range(min, max); |
623 | 102 | mySmallestMaxBsonObjectSize = smallestMaxBsonObjectSize; |
624 | 102 | mySmallestMaxBatchedWriteOperations = smallestMaxBatchedWriteOperations; |
625 | 102 | } |
626 | ||
627 | /** | |
628 | * Returns true if the server is recent enough to be queried. | |
629 | * | |
630 | * @param secondsBehind | |
631 | * The number of seconds the server is behind. | |
632 | * @return True if the server is recent enough to be queried, false | |
633 | * otherwise. | |
634 | */ | |
635 | private boolean isRecentEnough(final double secondsBehind) { | |
636 | 85 | return ((secondsBehind * 1000) < myConfig.getMaxSecondaryLag()); |
637 | } | |
638 | ||
639 | /** | |
640 | * Merges the two lists into a single list. | |
641 | * | |
642 | * @param list1 | |
643 | * The first list of servers. | |
644 | * @param list2 | |
645 | * The second list of servers. | |
646 | * @return The 2 lists of servers merged into a single list. | |
647 | */ | |
648 | private final List<Server> merge(final List<Server> list1, | |
649 | final List<Server> list2) { | |
650 | List<Server> results; | |
651 | 9 | if (list1.isEmpty()) { |
652 | 3 | results = list2; |
653 | } | |
654 | 6 | else if (list2.isEmpty()) { |
655 | 1 | results = list1; |
656 | } | |
657 | else { | |
658 | 5 | results = new ArrayList<Server>(list1.size() + list2.size()); |
659 | 5 | results.addAll(list1); |
660 | 5 | results.addAll(list2); |
661 | } | |
662 | 9 | return results; |
663 | } | |
664 | ||
665 | /** | |
666 | * ServerListener provides a listener for the state updates of the | |
667 | * {@link Server}. | |
668 | * | |
669 | * @api.no This class is <b>NOT</b> part of the drivers API. This class may | |
670 | * be mutated in incompatible ways between any two releases of the | |
671 | * driver. | |
672 | * @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved | |
673 | */ | |
674 | 268 | protected final class ServerListener implements PropertyChangeListener { |
675 | @Override | |
676 | public void propertyChange(final PropertyChangeEvent evt) { | |
677 | 645 | final String propertyName = evt.getPropertyName(); |
678 | 645 | final Server server = (Server) evt.getSource(); |
679 | ||
680 | 645 | if (Server.STATE_PROP.equals(propertyName)) { |
681 | ||
682 | 153 | final boolean old = !myWritableServers.isEmpty(); |
683 | ||
684 | 153 | if (Server.State.WRITABLE == evt.getNewValue()) { |
685 | 77 | myWritableServers.addIfAbsent(server); |
686 | 77 | myNonWritableServers.remove(server); |
687 | } | |
688 | 76 | else if (Server.State.READ_ONLY == evt.getNewValue()) { |
689 | 47 | myWritableServers.remove(server); |
690 | 47 | myNonWritableServers.addIfAbsent(server); |
691 | } | |
692 | else { | |
693 | 29 | myWritableServers.remove(server); |
694 | 29 | myNonWritableServers.remove(server); |
695 | } | |
696 | ||
697 | 153 | myChangeSupport.firePropertyChange(WRITABLE_PROP, old, |
698 | !myWritableServers.isEmpty()); | |
699 | ||
700 | 153 | } |
701 | 492 | else if (Server.CANONICAL_NAME_PROP.equals(propertyName)) { |
702 | // Resolved a new canonical name. e.g., What the server | |
703 | // calls itself in the cluster. | |
704 | ||
705 | // Remove the entry with the old name. | |
706 | 4 | myServers.remove(evt.getOldValue(), server); |
707 | ||
708 | // And add with the new name. Checking for duplicate entries. | |
709 | 4 | final Server existing = myServers.putIfAbsent( |
710 | server.getCanonicalName(), server); | |
711 | 4 | if (existing != null) { |
712 | // Already have a Server with that name. Remove the listener | |
713 | // and let this server get garbage collected. | |
714 | 2 | myNonWritableServers.remove(server); |
715 | 2 | myWritableServers.remove(server); |
716 | 2 | server.removeListener(myListener); |
717 | ||
718 | 2 | myChangeSupport.firePropertyChange(SERVER_PROP, server, |
719 | null); | |
720 | } | |
721 | 4 | } |
722 | 488 | else if (Server.VERSION_PROP.equals(propertyName)) { |
723 | // If the old version is either the high or low for the cluster | |
724 | // (or the version is UNKNOWN) then recompute the high/low | |
725 | // versions. | |
726 | 100 | final Version old = (Version) evt.getOldValue(); |
727 | ||
728 | 100 | if (Version.UNKNOWN.equals(old) |
729 | || (myServerVersionRange.getUpperBounds() | |
730 | .compareTo(old) <= 0) | |
731 | || (myServerVersionRange.getLowerBounds() | |
732 | .compareTo(old) >= 0)) { | |
733 | 100 | updateVersions(); |
734 | } | |
735 | } | |
736 | 645 | } |
737 | } | |
738 | } |