Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
Server |
|
| 1.9722222222222223;1.972 | ||||
Server$State |
|
| 1.9722222222222223;1.972 |
1 | /* | |
2 | * #%L | |
3 | * Server.java - mongodb-async-driver - Allanbank Consulting, Inc. | |
4 | * %% | |
5 | * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc. | |
6 | * %% | |
7 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
8 | * you may not use this file except in compliance with the License. | |
9 | * You may obtain a copy of the License at | |
10 | * | |
11 | * http://www.apache.org/licenses/LICENSE-2.0 | |
12 | * | |
13 | * Unless required by applicable law or agreed to in writing, software | |
14 | * distributed under the License is distributed on an "AS IS" BASIS, | |
15 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
16 | * See the License for the specific language governing permissions and | |
17 | * limitations under the License. | |
18 | * #L% | |
19 | */ | |
20 | package com.allanbank.mongodb.client.state; | |
21 | ||
22 | import java.beans.PropertyChangeListener; | |
23 | import java.beans.PropertyChangeSupport; | |
24 | import java.net.InetSocketAddress; | |
25 | import java.util.Arrays; | |
26 | import java.util.Collection; | |
27 | import java.util.Collections; | |
28 | import java.util.List; | |
29 | import java.util.concurrent.TimeUnit; | |
30 | import java.util.concurrent.atomic.AtomicLong; | |
31 | ||
32 | import com.allanbank.mongodb.Version; | |
33 | import com.allanbank.mongodb.bson.Document; | |
34 | import com.allanbank.mongodb.bson.Element; | |
35 | import com.allanbank.mongodb.bson.NumericElement; | |
36 | import com.allanbank.mongodb.bson.builder.BuilderFactory; | |
37 | import com.allanbank.mongodb.bson.element.BooleanElement; | |
38 | import com.allanbank.mongodb.bson.element.DocumentElement; | |
39 | import com.allanbank.mongodb.bson.element.StringElement; | |
40 | import com.allanbank.mongodb.bson.element.TimestampElement; | |
41 | import com.allanbank.mongodb.client.Client; | |
42 | import com.allanbank.mongodb.util.ServerNameUtils; | |
43 | ||
44 | /** | |
45 | * Server provides tracking of the state of a single MongoDB server. | |
46 | * | |
47 | * @api.no This class is <b>NOT</b> part of the drivers API. This class may be | |
48 | * mutated in incompatible ways between any two releases of the driver. | |
49 | * @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved | |
50 | */ | |
51 | public class Server { | |
52 | ||
53 | /** The name for the Server's canonical name property: '{@value} '. */ | |
54 | public static final String CANONICAL_NAME_PROP = "canonicalName"; | |
55 | ||
56 | /** The decay rate for the exponential average for the latency. */ | |
57 | public static final double DECAY_ALPHA; | |
58 | ||
59 | /** The decay period (number of samples) for the average latency. */ | |
60 | public static final double DECAY_SAMPLES = 1000.0D; | |
61 | ||
62 | /** The default MongoDB port. */ | |
63 | public static final int DEFAULT_PORT = ServerNameUtils.DEFAULT_PORT; | |
64 | ||
65 | /** The document element type. */ | |
66 | 1 | public static final Class<DocumentElement> DOCUMENT_TYPE = DocumentElement.class; |
67 | ||
68 | /** The default number of max batched write operations. */ | |
69 | public static final int MAX_BATCHED_WRITE_OPERATIONS_DEFAULT = 1000; | |
70 | ||
71 | /** The name for the Server's maximum BSON object size property: {@value} . */ | |
72 | public static final String MAX_BATCHED_WRITE_OPERATIONS_PROP = "maxWriteBatchSize"; | |
73 | ||
74 | /** The name for the Server's maximum BSON object size property: {@value} . */ | |
75 | public static final String MAX_BSON_OBJECT_SIZE_PROP = "maxBsonObjectSize"; | |
76 | ||
77 | /** The numeric element type. */ | |
78 | 1 | public static final Class<NumericElement> NUMERIC_TYPE = NumericElement.class; |
79 | ||
80 | /** The value for a primary server's state. */ | |
81 | public static final int PRIMARY_STATE = 1; | |
82 | ||
83 | /** The value for a secondary (actively replicating) server's state. */ | |
84 | public static final int SECONDARY_STATE = 2; | |
85 | ||
86 | /** The name for the Server's state property: {@value} . */ | |
87 | public static final String STATE_PROP = "state"; | |
88 | ||
89 | /** The string element type. */ | |
90 | 1 | public static final Class<StringElement> STRING_TYPE = StringElement.class; |
91 | ||
92 | /** The name for the Server's tags property: {@value} . */ | |
93 | public static final String TAGS_PROP = "tags"; | |
94 | ||
95 | /** The timestamp element type. */ | |
96 | 1 | public static final Class<TimestampElement> TIMESTAMP_TYPE = TimestampElement.class; |
97 | ||
98 | /** The name for the Server's version property: {@value} . */ | |
99 | public static final String VERSION_PROP = "version"; | |
100 | ||
101 | /** The number of nano-seconds per milli-second. */ | |
102 | 1 | private static final double NANOS_PER_MILLI = TimeUnit.MILLISECONDS |
103 | .toNanos(1); | |
104 | ||
105 | static { | |
106 | 1 | DECAY_ALPHA = (2.0D / (DECAY_SAMPLES + 1)); |
107 | 1 | } |
108 | ||
109 | /** | |
110 | * Tracks the average latency for the server connection. This is set when | |
111 | * the connection to the server is first created and then updated | |
112 | * periodically using an exponential moving average. | |
113 | */ | |
114 | private volatile double myAverageLatency; | |
115 | ||
116 | /** | |
117 | * The socket address provided by the user. This address will not be | |
118 | * updated. | |
119 | */ | |
120 | private final InetSocketAddress myCanonicalAddress; | |
121 | ||
122 | /** | |
123 | * The host name for the {@link #myCanonicalAddress}. This is use to | |
124 | * re-resolve the IP address when a connection failure is experienced. | |
125 | */ | |
126 | private final String myCanonicalHostName; | |
127 | ||
128 | /** The normalized name of the server being tracked. */ | |
129 | private volatile String myCanonicalName; | |
130 | ||
131 | /** Provides support for the sending of property change events. */ | |
132 | private final PropertyChangeSupport myEventSupport; | |
133 | ||
134 | /** The time of the last version update. */ | |
135 | 1365 | private long myLastVersionUpdate = 0; |
136 | ||
137 | /** | |
138 | * The maximum number of write operations allowed in a single write command. | |
139 | * Defaults to {@value #MAX_BATCHED_WRITE_OPERATIONS_DEFAULT}. | |
140 | */ | |
141 | 1365 | private volatile int myMaxBatchedWriteOperations = MAX_BATCHED_WRITE_OPERATIONS_DEFAULT; |
142 | ||
143 | /** | |
144 | * The maximum BSON object size the server will accept. Defaults to | |
145 | * {@link Client#MAX_DOCUMENT_SIZE}. | |
146 | */ | |
147 | 1365 | private volatile int myMaxBsonObjectSize = Client.MAX_DOCUMENT_SIZE; |
148 | ||
149 | /** The number of messages sent to the server. */ | |
150 | private final AtomicLong myMessagesSent; | |
151 | ||
152 | /** The number of messages received from the server. */ | |
153 | private final AtomicLong myRepliesReceived; | |
154 | ||
155 | /** | |
156 | * Tracks the last report of how many seconds the server is behind the | |
157 | * primary. | |
158 | */ | |
159 | private volatile double mySecondsBehind; | |
160 | ||
161 | /** Tracking the state of the server. */ | |
162 | private volatile State myState; | |
163 | ||
164 | /** Tracking the tags for the server. */ | |
165 | private volatile Document myTags; | |
166 | ||
167 | /** The total amount of latency for sending messages to the server. */ | |
168 | private final AtomicLong myTotalLatency; | |
169 | ||
170 | /** The version of the server. */ | |
171 | private Version myVersion; | |
172 | ||
173 | /** | |
174 | * The socket address being actively used. This will be re-created using the | |
175 | * server's hostname if a connection attempt fails. | |
176 | */ | |
177 | private volatile InetSocketAddress myWorkingAddress; | |
178 | ||
179 | /** | |
180 | * Creates a new {@link Server}. Package private to force creation through | |
181 | * the {@link Cluster}. | |
182 | * | |
183 | * @param server | |
184 | * The server being tracked. | |
185 | */ | |
186 | 1365 | /* package */Server(final InetSocketAddress server) { |
187 | 1365 | myCanonicalAddress = server; |
188 | 1365 | myCanonicalHostName = server.getHostName(); |
189 | 1365 | myCanonicalName = ServerNameUtils.normalize(server); |
190 | 1365 | myWorkingAddress = myCanonicalAddress; |
191 | ||
192 | 1365 | myEventSupport = new PropertyChangeSupport(this); |
193 | ||
194 | 1365 | myMessagesSent = new AtomicLong(0); |
195 | 1365 | myRepliesReceived = new AtomicLong(0); |
196 | 1365 | myTotalLatency = new AtomicLong(0); |
197 | ||
198 | 1365 | myState = State.UNKNOWN; |
199 | 1365 | myAverageLatency = Double.MAX_VALUE; |
200 | 1365 | mySecondsBehind = Double.MAX_VALUE; |
201 | 1365 | myTags = null; |
202 | ||
203 | 1365 | myVersion = Version.UNKNOWN; |
204 | 1365 | } |
205 | ||
206 | /** | |
207 | * Add a PropertyChangeListener to receive all future property changes for | |
208 | * the {@link Server}. | |
209 | * | |
210 | * @param listener | |
211 | * The PropertyChangeListener to be added | |
212 | * | |
213 | * @see PropertyChangeSupport#addPropertyChangeListener(PropertyChangeListener) | |
214 | */ | |
215 | public void addListener(final PropertyChangeListener listener) { | |
216 | 343 | myEventSupport.addPropertyChangeListener(listener); |
217 | ||
218 | 343 | } |
219 | ||
220 | /** | |
221 | * Notification that an attempt to connect to the server via the all of the | |
222 | * {@link #getAddresses() addresses provided} failed. | |
223 | */ | |
224 | public void connectFailed() { | |
225 | 28 | final State oldValue = myState; |
226 | ||
227 | 28 | myWorkingAddress = null; |
228 | 28 | myState = State.UNAVAILABLE; |
229 | ||
230 | 28 | myEventSupport.firePropertyChange(STATE_PROP, oldValue, myState); |
231 | 28 | } |
232 | ||
233 | /** | |
234 | * Notification that a connection has closed normally. This will leave the | |
235 | * connection in the last known state even if it is the last open | |
236 | * connection. | |
237 | */ | |
238 | public void connectionClosed() { | |
239 | // Nothing for now.... | |
240 | 0 | } |
241 | ||
242 | /** | |
243 | * Notification that a connection was successfully opened to the server. The | |
244 | * {@link InetSocketAddress} provided becomes the preferred address to use | |
245 | * when connecting to the server. | |
246 | * | |
247 | * @param addressUsed | |
248 | * The address that was used to connect to the server. | |
249 | */ | |
250 | public void connectionOpened(final InetSocketAddress addressUsed) { | |
251 | 176 | myWorkingAddress = addressUsed; |
252 | 176 | } |
253 | ||
254 | /** | |
255 | * Notification that a connection has closed abruptly. This will normally | |
256 | * transition the connection to an unknown state. | |
257 | */ | |
258 | public void connectionTerminated() { | |
259 | 12 | final State oldValue = myState; |
260 | ||
261 | 12 | myWorkingAddress = null; |
262 | 12 | myState = State.UNAVAILABLE; |
263 | ||
264 | 12 | myEventSupport.firePropertyChange(STATE_PROP, oldValue, myState); |
265 | 12 | } |
266 | ||
267 | /** | |
268 | * {@inheritDoc} | |
269 | * <p> | |
270 | * Overridden to return a stable equality check. This is based only on the | |
271 | * server object's identity. The {@link Cluster} class will de-duplicate | |
272 | * once the canonical host names are determined. | |
273 | * </p> | |
274 | */ | |
275 | @Override | |
276 | public boolean equals(final Object object) { | |
277 | 509 | return (this == object); |
278 | } | |
279 | ||
280 | /** | |
281 | * Returns the address of the server being tracked. | |
282 | * | |
283 | * @return The address of the server being tracked. | |
284 | */ | |
285 | public Collection<InetSocketAddress> getAddresses() { | |
286 | 219 | if (myWorkingAddress == null) { |
287 | 15 | myWorkingAddress = InetSocketAddress.createUnresolved( |
288 | myCanonicalHostName, myCanonicalAddress.getPort()); | |
289 | } | |
290 | ||
291 | 219 | if (myCanonicalAddress == myWorkingAddress) { |
292 | 204 | return Collections.singleton(myCanonicalAddress); |
293 | } | |
294 | 15 | return Arrays.asList(myWorkingAddress, myCanonicalAddress); |
295 | } | |
296 | ||
297 | /** | |
298 | * Returns the current average latency (in milliseconds) seen in issuing | |
299 | * requests to the server. If the latency returns {@link Double#MAX_VALUE} | |
300 | * then we have no basis for determining the latency. | |
301 | * <p> | |
302 | * This average is over the recent replies not over all replies received. | |
303 | * </p> | |
304 | * | |
305 | * @return The current average latency (in milliseconds) seen in issuing | |
306 | * requests to the server. | |
307 | */ | |
308 | public double getAverageLatency() { | |
309 | 2136598 | return myAverageLatency; |
310 | } | |
311 | ||
312 | /** | |
313 | * Returns the name of the server as reported by the server itself. | |
314 | * | |
315 | * @return The name of the server as reported by the server itself. | |
316 | */ | |
317 | public String getCanonicalName() { | |
318 | 432 | return myCanonicalName; |
319 | } | |
320 | ||
321 | /** | |
322 | * Returns the maximum number of write operations allowed in a single write | |
323 | * command. Defaults to {@value #MAX_BATCHED_WRITE_OPERATIONS_DEFAULT}. | |
324 | * | |
325 | * @return The maximum number of write operations allowed in a single write | |
326 | * command. | |
327 | */ | |
328 | public int getMaxBatchedWriteOperations() { | |
329 | 105 | return myMaxBatchedWriteOperations; |
330 | } | |
331 | ||
332 | /** | |
333 | * Returns the maximum BSON object size the server will accept. Defaults to | |
334 | * {@link Client#MAX_DOCUMENT_SIZE}. | |
335 | * | |
336 | * @return The maximum BSON object size the server will accept. | |
337 | */ | |
338 | public int getMaxBsonObjectSize() { | |
339 | 438 | return myMaxBsonObjectSize; |
340 | } | |
341 | ||
342 | /** | |
343 | * Returns the number of messages sent to the server. | |
344 | * | |
345 | * @return The number of messages sent to the server. | |
346 | */ | |
347 | public long getMessagesSent() { | |
348 | 3 | return myMessagesSent.get(); |
349 | } | |
350 | ||
351 | /** | |
352 | * Returns the number of messages received from the server. | |
353 | * | |
354 | * @return The number of messages received from the server. | |
355 | */ | |
356 | public long getRepliesReceived() { | |
357 | 3 | return myRepliesReceived.get(); |
358 | } | |
359 | ||
360 | /** | |
361 | * Sets the last reported seconds behind the primary. | |
362 | * | |
363 | * @return The seconds behind the primary server. | |
364 | */ | |
365 | public double getSecondsBehind() { | |
366 | 85 | return mySecondsBehind; |
367 | } | |
368 | ||
369 | /** | |
370 | * Returns the state value. | |
371 | * | |
372 | * @return The state value. | |
373 | */ | |
374 | public State getState() { | |
375 | 12 | return myState; |
376 | } | |
377 | ||
378 | /** | |
379 | * Returns the tags for the server. | |
380 | * | |
381 | * @return The tags for the server. | |
382 | */ | |
383 | public Document getTags() { | |
384 | 148 | return myTags; |
385 | } | |
386 | ||
387 | /** | |
388 | * Returns the total amount of time messages waited for a reply from the | |
389 | * server in nanoseconds. The average latency is approximately | |
390 | * {@link #getTotalLatencyNanoSeconds()}/{@link #getRepliesReceived()}. | |
391 | * | |
392 | * @return The total amount of time messages waited for a reply from the | |
393 | * server in nanoseconds. | |
394 | */ | |
395 | public long getTotalLatencyNanoSeconds() { | |
396 | 1 | return myTotalLatency.get(); |
397 | } | |
398 | ||
399 | /** | |
400 | * Returns the version of the server. | |
401 | * | |
402 | * @return The version of the server. | |
403 | */ | |
404 | public Version getVersion() { | |
405 | 546 | return myVersion; |
406 | } | |
407 | ||
408 | /** | |
409 | * {@inheritDoc} | |
410 | * <p> | |
411 | * Overridden to return a stable hash for the server. This is based only on | |
412 | * the server object's {@link System#identityHashCode(Object) identity hash | |
413 | * code}. The {@link Cluster} class will de-duplicate once the canonical | |
414 | * host names are determined. | |
415 | * </p> | |
416 | */ | |
417 | @Override | |
418 | public int hashCode() { | |
419 | 520 | return System.identityHashCode(this); |
420 | } | |
421 | ||
422 | /** | |
423 | * Increments the number of messages sent to the server. | |
424 | */ | |
425 | public void incrementMessagesSent() { | |
426 | 326 | myMessagesSent.incrementAndGet(); |
427 | 326 | } |
428 | ||
429 | /** | |
430 | * Increments the number of messages received from the server. | |
431 | */ | |
432 | public void incrementRepliesReceived() { | |
433 | 220 | myRepliesReceived.incrementAndGet(); |
434 | 220 | } |
435 | ||
436 | /** | |
437 | * Returns true if the server can be written to, false otherwise. | |
438 | * <p> | |
439 | * If writable it might be a standalone server, the primary in a replica | |
440 | * set, or a mongos in a sharded configuration. If not writable it is a | |
441 | * secondary server in a replica set. | |
442 | * </p> | |
443 | * | |
444 | * @return True if the server can be written to, false otherwise. | |
445 | */ | |
446 | public boolean isWritable() { | |
447 | 12 | return (myState == State.WRITABLE); |
448 | } | |
449 | ||
450 | /** | |
451 | * Returns true if there has not been a recent update to the server's | |
452 | * version or maximum document size. | |
453 | * | |
454 | * @return True if there has not been a recent update to the server's | |
455 | * version or maximum document size. | |
456 | */ | |
457 | public boolean needBuildInfo() { | |
458 | 172 | final long now = System.currentTimeMillis(); |
459 | 172 | final long tenMinutesAgo = now - TimeUnit.MINUTES.toMillis(10); |
460 | ||
461 | 172 | return Version.UNKNOWN.equals(myVersion) |
462 | || (myLastVersionUpdate < tenMinutesAgo); | |
463 | } | |
464 | ||
465 | /** | |
466 | * Remove a PropertyChangeListener to stop receiving future property changes | |
467 | * for the {@link Server}. | |
468 | * | |
469 | * @param listener | |
470 | * The PropertyChangeListener to be removed | |
471 | * | |
472 | * @see PropertyChangeSupport#removePropertyChangeListener(PropertyChangeListener) | |
473 | */ | |
474 | public void removeListener(final PropertyChangeListener listener) { | |
475 | 4 | myEventSupport.removePropertyChangeListener(listener); |
476 | 4 | } |
477 | ||
478 | /** | |
479 | * Notification that a status request message on the connection failed. | |
480 | * <p> | |
481 | * In the case of an exception the seconds behind is set to | |
482 | * {@link Integer#MAX_VALUE}. The value is configurable as a long so in | |
483 | * theory a user can ignore this case using a large | |
484 | * {@link com.allanbank.mongodb.MongoClientConfiguration#setMaxSecondaryLag(long)} | |
485 | * . | |
486 | * </p> | |
487 | */ | |
488 | public void requestFailed() { | |
489 | 23 | mySecondsBehind = Integer.MAX_VALUE; |
490 | 23 | } |
491 | ||
492 | /** | |
493 | * {@inheritDoc} | |
494 | * <p> | |
495 | * Overridden to to return a human readable version of the server state. | |
496 | * </p> | |
497 | */ | |
498 | @Override | |
499 | public String toString() { | |
500 | 16 | final StringBuilder builder = new StringBuilder(); |
501 | ||
502 | 16 | builder.append(getCanonicalName()); |
503 | 16 | builder.append("("); |
504 | 16 | builder.append(myState); |
505 | 16 | builder.append(","); |
506 | 16 | if (myTags != null) { |
507 | 1 | builder.append("T,"); |
508 | } | |
509 | 16 | builder.append(getAverageLatency()); |
510 | 16 | builder.append(")"); |
511 | ||
512 | 16 | return builder.toString(); |
513 | } | |
514 | ||
515 | /** | |
516 | * Updates the state of the server based on the document provided. The | |
517 | * document should be the reply to either a {@code ismaster} or | |
518 | * {@code replSetGetStatus} command. | |
519 | * | |
520 | * @param document | |
521 | * The document with the state of the server. | |
522 | */ | |
523 | public void update(final Document document) { | |
524 | 432 | updateState(document); |
525 | 431 | updateSecondsBehind(document); |
526 | 431 | updateTags(document); |
527 | 431 | updateName(document); |
528 | 431 | updateVersion(document); |
529 | 431 | updateMaxBsonObjectSize(document); |
530 | 432 | updateMaxWriteOperations(document); |
531 | 432 | } |
532 | ||
533 | /** | |
534 | * Updates the average latency (in nano-seconds) for the server. | |
535 | * | |
536 | * @param latencyNanoSeconds | |
537 | * The latency seen sending a request and receiving a reply from | |
538 | * the server. | |
539 | */ | |
540 | public void updateAverageLatency(final long latencyNanoSeconds) { | |
541 | 2304 | myTotalLatency.addAndGet(latencyNanoSeconds); |
542 | ||
543 | 2304 | final double latency = latencyNanoSeconds / NANOS_PER_MILLI; |
544 | 2304 | final double oldAverage = myAverageLatency; |
545 | 2304 | if (Double.MAX_VALUE == oldAverage) { |
546 | 1139 | myAverageLatency = latency; |
547 | 1139 | if (mySecondsBehind == Double.MAX_VALUE) { |
548 | 1139 | mySecondsBehind = 0.0; |
549 | } | |
550 | } | |
551 | else { | |
552 | 1165 | myAverageLatency = (DECAY_ALPHA * latency) |
553 | + ((1.0D - DECAY_ALPHA) * oldAverage); | |
554 | } | |
555 | 2304 | } |
556 | ||
557 | /** | |
558 | * Extract any {@code maxBsonObjectSize} from the reply. | |
559 | * | |
560 | * @param isMasterReply | |
561 | * The reply to the {@code ismaster} command. | |
562 | */ | |
563 | private void updateMaxBsonObjectSize(final Document isMasterReply) { | |
564 | 431 | final int oldValue = myMaxBsonObjectSize; |
565 | ||
566 | 431 | final NumericElement maxSize = isMasterReply.findFirst(NUMERIC_TYPE, |
567 | MAX_BSON_OBJECT_SIZE_PROP); | |
568 | 432 | if (maxSize != null) { |
569 | 155 | myMaxBsonObjectSize = maxSize.getIntValue(); |
570 | } | |
571 | ||
572 | 432 | myEventSupport.firePropertyChange(MAX_BSON_OBJECT_SIZE_PROP, oldValue, |
573 | myMaxBsonObjectSize); | |
574 | 432 | } |
575 | ||
576 | /** | |
577 | * Extract any {@code maxWriteBatchSize} from the reply. | |
578 | * | |
579 | * @param isMasterReply | |
580 | * The reply to the {@code ismaster} command. | |
581 | */ | |
582 | private void updateMaxWriteOperations(final Document isMasterReply) { | |
583 | 432 | final int oldValue = myMaxBatchedWriteOperations; |
584 | ||
585 | 432 | final NumericElement maxSize = isMasterReply.findFirst(NUMERIC_TYPE, |
586 | MAX_BATCHED_WRITE_OPERATIONS_PROP); | |
587 | 432 | if (maxSize != null) { |
588 | 2 | myMaxBatchedWriteOperations = maxSize.getIntValue(); |
589 | } | |
590 | ||
591 | 432 | myEventSupport.firePropertyChange(MAX_BATCHED_WRITE_OPERATIONS_PROP, |
592 | oldValue, myMaxBatchedWriteOperations); | |
593 | 432 | } |
594 | ||
595 | /** | |
596 | * Updates the canonical name for the server based on the response to the | |
597 | * {@code ismaster} command. | |
598 | * | |
599 | * @param isMasterReply | |
600 | * The reply to the {@code ismaster} command. | |
601 | */ | |
602 | private void updateName(final Document isMasterReply) { | |
603 | 432 | final String oldValue = myCanonicalName; |
604 | ||
605 | 431 | final Element element = isMasterReply.findFirst("me"); |
606 | 432 | if (element != null) { |
607 | 15 | final String name = element.getValueAsString(); |
608 | 15 | if ((name != null) && !myCanonicalName.equals(name)) { |
609 | 5 | myCanonicalName = name; |
610 | } | |
611 | } | |
612 | ||
613 | 431 | myEventSupport.firePropertyChange(CANONICAL_NAME_PROP, oldValue, |
614 | myCanonicalName); | |
615 | 431 | } |
616 | ||
617 | /** | |
618 | * Extract the number of seconds this Server is behind the primary by | |
619 | * comparing its latest optime with that of the absolute latest optime. | |
620 | * <p> | |
621 | * To account for idle servers we use the optime for each server and assign | |
622 | * a value of zero to the "latest" optime and then subtract the remaining | |
623 | * servers from that optime. | |
624 | * </p> | |
625 | * <p> | |
626 | * Lastly, the state of the server is also checked and the seconds behind is | |
627 | * set to {@link Double#MAX_VALUE} if not in the primary ( | |
628 | * {@value #PRIMARY_STATE}) or secondary ({@value #SECONDARY_STATE}). | |
629 | * </p> | |
630 | * | |
631 | * @param replicaStateDoc | |
632 | * The document to extract the seconds behind from. | |
633 | */ | |
634 | private void updateSecondsBehind(final Document replicaStateDoc) { | |
635 | 431 | final State oldValue = myState; |
636 | ||
637 | 431 | final NumericElement state = replicaStateDoc.get(NUMERIC_TYPE, |
638 | "myState"); | |
639 | 431 | if (state != null) { |
640 | 1 | final int value = state.getIntValue(); |
641 | 1 | if (value == PRIMARY_STATE) { |
642 | 0 | myState = State.WRITABLE; |
643 | 0 | mySecondsBehind = 0; |
644 | } | |
645 | 1 | else if (value == SECONDARY_STATE) { |
646 | 1 | myState = State.READ_ONLY; |
647 | ||
648 | 1 | TimestampElement serverTimestamp = null; |
649 | 1 | final StringElement expectedName = new StringElement("name", |
650 | myCanonicalName); | |
651 | 1 | for (final DocumentElement member : replicaStateDoc.find( |
652 | DOCUMENT_TYPE, "members", ".*")) { | |
653 | 3 | if (expectedName.equals(member.get("name")) |
654 | && (member.get(TIMESTAMP_TYPE, "optimeDate") != null)) { | |
655 | ||
656 | 1 | serverTimestamp = member.get(TIMESTAMP_TYPE, |
657 | "optimeDate"); | |
658 | } | |
659 | 3 | } |
660 | ||
661 | 1 | if (serverTimestamp != null) { |
662 | 1 | TimestampElement latestTimestamp = serverTimestamp; |
663 | 1 | for (final TimestampElement time : replicaStateDoc.find( |
664 | TIMESTAMP_TYPE, "members", ".*", "optimeDate")) { | |
665 | 3 | if (latestTimestamp.getTime() < time.getTime()) { |
666 | 1 | latestTimestamp = time; |
667 | } | |
668 | 3 | } |
669 | ||
670 | 1 | final double msBehind = latestTimestamp.getTime() |
671 | - serverTimestamp.getTime(); | |
672 | 1 | mySecondsBehind = (msBehind / TimeUnit.SECONDS.toMillis(1)); |
673 | } | |
674 | 1 | } |
675 | else { | |
676 | // "myState" != 1 and "myState" != 2 | |
677 | 0 | mySecondsBehind = Double.MAX_VALUE; |
678 | 0 | myState = State.UNAVAILABLE; |
679 | } | |
680 | } | |
681 | ||
682 | 431 | myEventSupport.firePropertyChange(STATE_PROP, oldValue, myState); |
683 | 431 | } |
684 | ||
685 | /** | |
686 | * Extract the if the result implies that the server is writable. | |
687 | * | |
688 | * @param isMasterReply | |
689 | * The document to extract the seconds behind from. | |
690 | */ | |
691 | private void updateState(final Document isMasterReply) { | |
692 | 432 | final State oldValue = myState; |
693 | ||
694 | 432 | BooleanElement element = isMasterReply.findFirst(BooleanElement.class, |
695 | "ismaster"); | |
696 | 432 | if (element != null) { |
697 | 186 | if (element.getValue()) { |
698 | 108 | myState = State.WRITABLE; |
699 | 108 | mySecondsBehind = 0.0; |
700 | } | |
701 | else { | |
702 | 78 | element = isMasterReply.findFirst(BooleanElement.class, |
703 | "secondary"); | |
704 | 78 | if ((element != null) && element.getValue()) { |
705 | 77 | myState = State.READ_ONLY; |
706 | // Check the seconds behind for default values. | |
707 | // This protects from not being able to get the replica set | |
708 | // status due to permissions. | |
709 | 77 | if ((mySecondsBehind == Double.MAX_VALUE) |
710 | || (mySecondsBehind == Integer.MAX_VALUE)) { | |
711 | 11 | mySecondsBehind = 0.0; |
712 | } | |
713 | } | |
714 | else { | |
715 | 1 | myState = State.UNAVAILABLE; |
716 | } | |
717 | } | |
718 | } | |
719 | ||
720 | 431 | myEventSupport.firePropertyChange(STATE_PROP, oldValue, myState); |
721 | 431 | } |
722 | ||
723 | /** | |
724 | * Extract any tags from the reply. | |
725 | * | |
726 | * @param isMasterReply | |
727 | * The reply to the {@code ismaster} command. | |
728 | */ | |
729 | private void updateTags(final Document isMasterReply) { | |
730 | 431 | final Document oldValue = myTags; |
731 | ||
732 | 432 | Document tags = isMasterReply.findFirst(DOCUMENT_TYPE, TAGS_PROP); |
733 | 431 | if (tags != null) { |
734 | // Strip to a pure Document from a DocumentElement. | |
735 | 35 | tags = BuilderFactory.start(tags.asDocument()).build(); |
736 | 35 | if (tags.getElements().isEmpty()) { |
737 | 2 | myTags = null; |
738 | } | |
739 | 33 | else if (!tags.equals(myTags)) { |
740 | 21 | myTags = tags; |
741 | } | |
742 | } | |
743 | ||
744 | 431 | myEventSupport.firePropertyChange(TAGS_PROP, oldValue, myTags); |
745 | 431 | } |
746 | ||
747 | /** | |
748 | * Extract any {@code versionArray} from the reply. | |
749 | * | |
750 | * @param buildInfoReply | |
751 | * The reply to the {@code buildinfo} command. | |
752 | */ | |
753 | private void updateVersion(final Document buildInfoReply) { | |
754 | 431 | final Version oldValue = myVersion; |
755 | ||
756 | 431 | final List<NumericElement> versionElements = buildInfoReply.find( |
757 | NUMERIC_TYPE, "versionArray", ".*"); | |
758 | 431 | if (!versionElements.isEmpty()) { |
759 | 102 | myVersion = Version.parse(versionElements); |
760 | 102 | myLastVersionUpdate = System.currentTimeMillis(); |
761 | } | |
762 | else { | |
763 | // Use the String version if present. | |
764 | 329 | final StringElement stringVersion = buildInfoReply.findFirst( |
765 | STRING_TYPE, "version"); | |
766 | 329 | if (stringVersion != null) { |
767 | 1 | myVersion = Version.parse(stringVersion.getValue()); |
768 | 1 | myLastVersionUpdate = System.currentTimeMillis(); |
769 | } | |
770 | else { | |
771 | // Use the wire version if present. | |
772 | 328 | final NumericElement wireVersion = buildInfoReply.findFirst( |
773 | NUMERIC_TYPE, "maxWireVersion"); | |
774 | 328 | if (wireVersion != null) { |
775 | 3 | final Version version = Version.forWireVersion(wireVersion |
776 | .getIntValue()); | |
777 | ||
778 | // Don't want to update the version if we are getting the | |
779 | // value | |
780 | // some other way since the wire protocol version requires | |
781 | // interpretation and really just provides a "floor" | |
782 | // version. | |
783 | // Check for an unknown or lower version. | |
784 | 3 | if (oldValue.equals(Version.UNKNOWN) |
785 | || (oldValue.compareTo(version) < 0)) { | |
786 | 2 | myVersion = version; |
787 | // Don't update the myLastVersionUpdate time so we still | |
788 | // try and get the precise version. | |
789 | } | |
790 | } | |
791 | } | |
792 | } | |
793 | ||
794 | 431 | myEventSupport.firePropertyChange(VERSION_PROP, oldValue, myVersion); |
795 | 431 | } |
796 | ||
797 | /** | |
798 | * State provides the possible sttes for a server within the MongoDB | |
799 | * cluster. | |
800 | * | |
801 | * @api.no This class is <b>NOT</b> part of the drivers API. This class may | |
802 | * be mutated in incompatible ways between any two releases of the | |
803 | * driver. | |
804 | * @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved | |
805 | */ | |
806 | 6 | public enum State { |
807 | /** | |
808 | * We can send reads to the server. It is running, we can connect to it | |
809 | * and is a secondary in the replica set. | |
810 | */ | |
811 | 1 | READ_ONLY, |
812 | ||
813 | /** We cannot connect to the server. */ | |
814 | 1 | UNAVAILABLE, |
815 | ||
816 | /** | |
817 | * A transient state for the server. We have either never connected to | |
818 | * the server or have lost all of the connections to the server. | |
819 | */ | |
820 | 1 | UNKNOWN, |
821 | ||
822 | /** | |
823 | * We can send writes to the server. It is running, we can connect to it | |
824 | * and is either a stand-alone instance, the primary in the replica set | |
825 | * or a mongos. | |
826 | */ | |
827 | 1 | WRITABLE; |
828 | } | |
829 | } |