1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package com.allanbank.mongodb.client.state;
21
22 import java.beans.PropertyChangeListener;
23 import java.beans.PropertyChangeSupport;
24 import java.net.InetSocketAddress;
25 import java.util.Arrays;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicLong;
31
32 import com.allanbank.mongodb.Version;
33 import com.allanbank.mongodb.bson.Document;
34 import com.allanbank.mongodb.bson.Element;
35 import com.allanbank.mongodb.bson.NumericElement;
36 import com.allanbank.mongodb.bson.builder.BuilderFactory;
37 import com.allanbank.mongodb.bson.element.BooleanElement;
38 import com.allanbank.mongodb.bson.element.DocumentElement;
39 import com.allanbank.mongodb.bson.element.StringElement;
40 import com.allanbank.mongodb.bson.element.TimestampElement;
41 import com.allanbank.mongodb.client.Client;
42 import com.allanbank.mongodb.util.ServerNameUtils;
43
44
45
46
47
48
49
50
51 public class Server {
52
53
54 public static final String CANONICAL_NAME_PROP = "canonicalName";
55
56
57 public static final double DECAY_ALPHA;
58
59
60 public static final double DECAY_SAMPLES = 1000.0D;
61
62
63 public static final int DEFAULT_PORT = ServerNameUtils.DEFAULT_PORT;
64
65
66 public static final Class<DocumentElement> DOCUMENT_TYPE = DocumentElement.class;
67
68
69 public static final int MAX_BATCHED_WRITE_OPERATIONS_DEFAULT = 1000;
70
71
72 public static final String MAX_BATCHED_WRITE_OPERATIONS_PROP = "maxWriteBatchSize";
73
74
75 public static final String MAX_BSON_OBJECT_SIZE_PROP = "maxBsonObjectSize";
76
77
78 public static final Class<NumericElement> NUMERIC_TYPE = NumericElement.class;
79
80
81 public static final int PRIMARY_STATE = 1;
82
83
84 public static final int SECONDARY_STATE = 2;
85
86
87 public static final String STATE_PROP = "state";
88
89
90 public static final Class<StringElement> STRING_TYPE = StringElement.class;
91
92
93 public static final String TAGS_PROP = "tags";
94
95
96 public static final Class<TimestampElement> TIMESTAMP_TYPE = TimestampElement.class;
97
98
99 public static final String VERSION_PROP = "version";
100
101
102 private static final double NANOS_PER_MILLI = TimeUnit.MILLISECONDS
103 .toNanos(1);
104
105 static {
106 DECAY_ALPHA = (2.0D / (DECAY_SAMPLES + 1));
107 }
108
109
110
111
112
113
114 private volatile double myAverageLatency;
115
116
117
118
119
120 private final InetSocketAddress myCanonicalAddress;
121
122
123
124
125
126 private final String myCanonicalHostName;
127
128
129 private volatile String myCanonicalName;
130
131
132 private final PropertyChangeSupport myEventSupport;
133
134
135 private long myLastVersionUpdate = 0;
136
137
138
139
140
141 private volatile int myMaxBatchedWriteOperations = MAX_BATCHED_WRITE_OPERATIONS_DEFAULT;
142
143
144
145
146
147 private volatile int myMaxBsonObjectSize = Client.MAX_DOCUMENT_SIZE;
148
149
150 private final AtomicLong myMessagesSent;
151
152
153 private final AtomicLong myRepliesReceived;
154
155
156
157
158
159 private volatile double mySecondsBehind;
160
161
162 private volatile State myState;
163
164
165 private volatile Document myTags;
166
167
168 private final AtomicLong myTotalLatency;
169
170
171 private Version myVersion;
172
173
174
175
176
177 private volatile InetSocketAddress myWorkingAddress;
178
179
180
181
182
183
184
185
186 Server(final InetSocketAddress server) {
187 myCanonicalAddress = server;
188 myCanonicalHostName = server.getHostName();
189 myCanonicalName = ServerNameUtils.normalize(server);
190 myWorkingAddress = myCanonicalAddress;
191
192 myEventSupport = new PropertyChangeSupport(this);
193
194 myMessagesSent = new AtomicLong(0);
195 myRepliesReceived = new AtomicLong(0);
196 myTotalLatency = new AtomicLong(0);
197
198 myState = State.UNKNOWN;
199 myAverageLatency = Double.MAX_VALUE;
200 mySecondsBehind = Double.MAX_VALUE;
201 myTags = null;
202
203 myVersion = Version.UNKNOWN;
204 }
205
206
207
208
209
210
211
212
213
214
215 public void addListener(final PropertyChangeListener listener) {
216 myEventSupport.addPropertyChangeListener(listener);
217
218 }
219
220
221
222
223
224 public void connectFailed() {
225 final State oldValue = myState;
226
227 myWorkingAddress = null;
228 myState = State.UNAVAILABLE;
229
230 myEventSupport.firePropertyChange(STATE_PROP, oldValue, myState);
231 }
232
233
234
235
236
237
238 public void connectionClosed() {
239
240 }
241
242
243
244
245
246
247
248
249
250 public void connectionOpened(final InetSocketAddress addressUsed) {
251 myWorkingAddress = addressUsed;
252 }
253
254
255
256
257
258 public void connectionTerminated() {
259 final State oldValue = myState;
260
261 myWorkingAddress = null;
262 myState = State.UNAVAILABLE;
263
264 myEventSupport.firePropertyChange(STATE_PROP, oldValue, myState);
265 }
266
267
268
269
270
271
272
273
274
275 @Override
276 public boolean equals(final Object object) {
277 return (this == object);
278 }
279
280
281
282
283
284
285 public Collection<InetSocketAddress> getAddresses() {
286 if (myWorkingAddress == null) {
287 myWorkingAddress = InetSocketAddress.createUnresolved(
288 myCanonicalHostName, myCanonicalAddress.getPort());
289 }
290
291 if (myCanonicalAddress == myWorkingAddress) {
292 return Collections.singleton(myCanonicalAddress);
293 }
294 return Arrays.asList(myWorkingAddress, myCanonicalAddress);
295 }
296
297
298
299
300
301
302
303
304
305
306
307
308 public double getAverageLatency() {
309 return myAverageLatency;
310 }
311
312
313
314
315
316
317 public String getCanonicalName() {
318 return myCanonicalName;
319 }
320
321
322
323
324
325
326
327
328 public int getMaxBatchedWriteOperations() {
329 return myMaxBatchedWriteOperations;
330 }
331
332
333
334
335
336
337
338 public int getMaxBsonObjectSize() {
339 return myMaxBsonObjectSize;
340 }
341
342
343
344
345
346
347 public long getMessagesSent() {
348 return myMessagesSent.get();
349 }
350
351
352
353
354
355
356 public long getRepliesReceived() {
357 return myRepliesReceived.get();
358 }
359
360
361
362
363
364
365 public double getSecondsBehind() {
366 return mySecondsBehind;
367 }
368
369
370
371
372
373
374 public State getState() {
375 return myState;
376 }
377
378
379
380
381
382
383 public Document getTags() {
384 return myTags;
385 }
386
387
388
389
390
391
392
393
394
395 public long getTotalLatencyNanoSeconds() {
396 return myTotalLatency.get();
397 }
398
399
400
401
402
403
404 public Version getVersion() {
405 return myVersion;
406 }
407
408
409
410
411
412
413
414
415
416
417 @Override
418 public int hashCode() {
419 return System.identityHashCode(this);
420 }
421
422
423
424
425 public void incrementMessagesSent() {
426 myMessagesSent.incrementAndGet();
427 }
428
429
430
431
432 public void incrementRepliesReceived() {
433 myRepliesReceived.incrementAndGet();
434 }
435
436
437
438
439
440
441
442
443
444
445
446 public boolean isWritable() {
447 return (myState == State.WRITABLE);
448 }
449
450
451
452
453
454
455
456
457 public boolean needBuildInfo() {
458 final long now = System.currentTimeMillis();
459 final long tenMinutesAgo = now - TimeUnit.MINUTES.toMillis(10);
460
461 return Version.UNKNOWN.equals(myVersion)
462 || (myLastVersionUpdate < tenMinutesAgo);
463 }
464
465
466
467
468
469
470
471
472
473
474 public void removeListener(final PropertyChangeListener listener) {
475 myEventSupport.removePropertyChangeListener(listener);
476 }
477
478
479
480
481
482
483
484
485
486
487
488 public void requestFailed() {
489 mySecondsBehind = Integer.MAX_VALUE;
490 }
491
492
493
494
495
496
497
498 @Override
499 public String toString() {
500 final StringBuilder builder = new StringBuilder();
501
502 builder.append(getCanonicalName());
503 builder.append("(");
504 builder.append(myState);
505 builder.append(",");
506 if (myTags != null) {
507 builder.append("T,");
508 }
509 builder.append(getAverageLatency());
510 builder.append(")");
511
512 return builder.toString();
513 }
514
515
516
517
518
519
520
521
522
523 public void update(final Document document) {
524 updateState(document);
525 updateSecondsBehind(document);
526 updateTags(document);
527 updateName(document);
528 updateVersion(document);
529 updateMaxBsonObjectSize(document);
530 updateMaxWriteOperations(document);
531 }
532
533
534
535
536
537
538
539
540 public void updateAverageLatency(final long latencyNanoSeconds) {
541 myTotalLatency.addAndGet(latencyNanoSeconds);
542
543 final double latency = latencyNanoSeconds / NANOS_PER_MILLI;
544 final double oldAverage = myAverageLatency;
545 if (Double.MAX_VALUE == oldAverage) {
546 myAverageLatency = latency;
547 if (mySecondsBehind == Double.MAX_VALUE) {
548 mySecondsBehind = 0.0;
549 }
550 }
551 else {
552 myAverageLatency = (DECAY_ALPHA * latency)
553 + ((1.0D - DECAY_ALPHA) * oldAverage);
554 }
555 }
556
557
558
559
560
561
562
563 private void updateMaxBsonObjectSize(final Document isMasterReply) {
564 final int oldValue = myMaxBsonObjectSize;
565
566 final NumericElement maxSize = isMasterReply.findFirst(NUMERIC_TYPE,
567 MAX_BSON_OBJECT_SIZE_PROP);
568 if (maxSize != null) {
569 myMaxBsonObjectSize = maxSize.getIntValue();
570 }
571
572 myEventSupport.firePropertyChange(MAX_BSON_OBJECT_SIZE_PROP, oldValue,
573 myMaxBsonObjectSize);
574 }
575
576
577
578
579
580
581
582 private void updateMaxWriteOperations(final Document isMasterReply) {
583 final int oldValue = myMaxBatchedWriteOperations;
584
585 final NumericElement maxSize = isMasterReply.findFirst(NUMERIC_TYPE,
586 MAX_BATCHED_WRITE_OPERATIONS_PROP);
587 if (maxSize != null) {
588 myMaxBatchedWriteOperations = maxSize.getIntValue();
589 }
590
591 myEventSupport.firePropertyChange(MAX_BATCHED_WRITE_OPERATIONS_PROP,
592 oldValue, myMaxBatchedWriteOperations);
593 }
594
595
596
597
598
599
600
601
602 private void updateName(final Document isMasterReply) {
603 final String oldValue = myCanonicalName;
604
605 final Element element = isMasterReply.findFirst("me");
606 if (element != null) {
607 final String name = element.getValueAsString();
608 if ((name != null) && !myCanonicalName.equals(name)) {
609 myCanonicalName = name;
610 }
611 }
612
613 myEventSupport.firePropertyChange(CANONICAL_NAME_PROP, oldValue,
614 myCanonicalName);
615 }
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634 private void updateSecondsBehind(final Document replicaStateDoc) {
635 final State oldValue = myState;
636
637 final NumericElement state = replicaStateDoc.get(NUMERIC_TYPE,
638 "myState");
639 if (state != null) {
640 final int value = state.getIntValue();
641 if (value == PRIMARY_STATE) {
642 myState = State.WRITABLE;
643 mySecondsBehind = 0;
644 }
645 else if (value == SECONDARY_STATE) {
646 myState = State.READ_ONLY;
647
648 TimestampElement serverTimestamp = null;
649 final StringElement expectedName = new StringElement("name",
650 myCanonicalName);
651 for (final DocumentElement member : replicaStateDoc.find(
652 DOCUMENT_TYPE, "members", ".*")) {
653 if (expectedName.equals(member.get("name"))
654 && (member.get(TIMESTAMP_TYPE, "optimeDate") != null)) {
655
656 serverTimestamp = member.get(TIMESTAMP_TYPE,
657 "optimeDate");
658 }
659 }
660
661 if (serverTimestamp != null) {
662 TimestampElement latestTimestamp = serverTimestamp;
663 for (final TimestampElement time : replicaStateDoc.find(
664 TIMESTAMP_TYPE, "members", ".*", "optimeDate")) {
665 if (latestTimestamp.getTime() < time.getTime()) {
666 latestTimestamp = time;
667 }
668 }
669
670 final double msBehind = latestTimestamp.getTime()
671 - serverTimestamp.getTime();
672 mySecondsBehind = (msBehind / TimeUnit.SECONDS.toMillis(1));
673 }
674 }
675 else {
676
677 mySecondsBehind = Double.MAX_VALUE;
678 myState = State.UNAVAILABLE;
679 }
680 }
681
682 myEventSupport.firePropertyChange(STATE_PROP, oldValue, myState);
683 }
684
685
686
687
688
689
690
691 private void updateState(final Document isMasterReply) {
692 final State oldValue = myState;
693
694 BooleanElement element = isMasterReply.findFirst(BooleanElement.class,
695 "ismaster");
696 if (element != null) {
697 if (element.getValue()) {
698 myState = State.WRITABLE;
699 mySecondsBehind = 0.0;
700 }
701 else {
702 element = isMasterReply.findFirst(BooleanElement.class,
703 "secondary");
704 if ((element != null) && element.getValue()) {
705 myState = State.READ_ONLY;
706
707
708
709 if ((mySecondsBehind == Double.MAX_VALUE)
710 || (mySecondsBehind == Integer.MAX_VALUE)) {
711 mySecondsBehind = 0.0;
712 }
713 }
714 else {
715 myState = State.UNAVAILABLE;
716 }
717 }
718 }
719
720 myEventSupport.firePropertyChange(STATE_PROP, oldValue, myState);
721 }
722
723
724
725
726
727
728
729 private void updateTags(final Document isMasterReply) {
730 final Document oldValue = myTags;
731
732 Document tags = isMasterReply.findFirst(DOCUMENT_TYPE, TAGS_PROP);
733 if (tags != null) {
734
735 tags = BuilderFactory.start(tags.asDocument()).build();
736 if (tags.getElements().isEmpty()) {
737 myTags = null;
738 }
739 else if (!tags.equals(myTags)) {
740 myTags = tags;
741 }
742 }
743
744 myEventSupport.firePropertyChange(TAGS_PROP, oldValue, myTags);
745 }
746
747
748
749
750
751
752
753 private void updateVersion(final Document buildInfoReply) {
754 final Version oldValue = myVersion;
755
756 final List<NumericElement> versionElements = buildInfoReply.find(
757 NUMERIC_TYPE, "versionArray", ".*");
758 if (!versionElements.isEmpty()) {
759 myVersion = Version.parse(versionElements);
760 myLastVersionUpdate = System.currentTimeMillis();
761 }
762 else {
763
764 final StringElement stringVersion = buildInfoReply.findFirst(
765 STRING_TYPE, "version");
766 if (stringVersion != null) {
767 myVersion = Version.parse(stringVersion.getValue());
768 myLastVersionUpdate = System.currentTimeMillis();
769 }
770 else {
771
772 final NumericElement wireVersion = buildInfoReply.findFirst(
773 NUMERIC_TYPE, "maxWireVersion");
774 if (wireVersion != null) {
775 final Version version = Version.forWireVersion(wireVersion
776 .getIntValue());
777
778
779
780
781
782
783
784 if (oldValue.equals(Version.UNKNOWN)
785 || (oldValue.compareTo(version) < 0)) {
786 myVersion = version;
787
788
789 }
790 }
791 }
792 }
793
794 myEventSupport.firePropertyChange(VERSION_PROP, oldValue, myVersion);
795 }
796
797
798
799
800
801
802
803
804
805
806 public enum State {
807
808
809
810
811 READ_ONLY,
812
813
814 UNAVAILABLE,
815
816
817
818
819
820 UNKNOWN,
821
822
823
824
825
826
827 WRITABLE;
828 }
829 }