1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package com.allanbank.mongodb.client;
21
22 import java.util.Collection;
23 import java.util.List;
24
25 import com.allanbank.mongodb.BatchedAsyncMongoCollection;
26 import com.allanbank.mongodb.Durability;
27 import com.allanbank.mongodb.ListenableFuture;
28 import com.allanbank.mongodb.MongoCollection;
29 import com.allanbank.mongodb.MongoDatabase;
30 import com.allanbank.mongodb.MongoDbException;
31 import com.allanbank.mongodb.MongoIterator;
32 import com.allanbank.mongodb.ReadPreference;
33 import com.allanbank.mongodb.Version;
34 import com.allanbank.mongodb.bson.Document;
35 import com.allanbank.mongodb.bson.DocumentAssignable;
36 import com.allanbank.mongodb.bson.Element;
37 import com.allanbank.mongodb.bson.ElementType;
38 import com.allanbank.mongodb.bson.NumericElement;
39 import com.allanbank.mongodb.bson.builder.BuilderFactory;
40 import com.allanbank.mongodb.bson.builder.DocumentBuilder;
41 import com.allanbank.mongodb.bson.element.BooleanElement;
42 import com.allanbank.mongodb.bson.element.IntegerElement;
43 import com.allanbank.mongodb.builder.Aggregate;
44 import com.allanbank.mongodb.builder.BatchedWrite;
45 import com.allanbank.mongodb.builder.ConditionBuilder;
46 import com.allanbank.mongodb.builder.Count;
47 import com.allanbank.mongodb.builder.Distinct;
48 import com.allanbank.mongodb.builder.Find;
49 import com.allanbank.mongodb.builder.FindAndModify;
50 import com.allanbank.mongodb.builder.GroupBy;
51 import com.allanbank.mongodb.builder.Index;
52 import com.allanbank.mongodb.builder.MapReduce;
53 import com.allanbank.mongodb.builder.ParallelScan;
54 import com.allanbank.mongodb.client.callback.FutureReplyCallback;
55 import com.allanbank.mongodb.client.callback.ValidatingReplyCallback;
56 import com.allanbank.mongodb.client.message.CreateIndexCommand;
57 import com.allanbank.mongodb.util.FutureUtils;
58
59
60
61
62
63
64
65
66
67 public class SynchronousMongoCollectionImpl extends
68 AbstractAsyncMongoCollection implements MongoCollection {
69
70
71
72
73
74
75
76
77
78
79
80 public SynchronousMongoCollectionImpl(final Client client,
81 final MongoDatabase database, final String name) {
82 super(client, database, name);
83 }
84
85
86
87
88
89
90
91
92
93 @Override
94 public MongoIterator<Document> aggregate(final Aggregate command)
95 throws MongoDbException {
96 return FutureUtils.unwrap(aggregateAsync(command));
97 }
98
99
100
101
102
103
104
105 @Override
106 public MongoIterator<Document> aggregate(final Aggregate.Builder command)
107 throws MongoDbException {
108 return aggregate(command.build());
109 }
110
111
112
113
114
115
116
117
118
119 @Override
120 public long count() throws MongoDbException {
121 return count(BuilderFactory.start(), getReadPreference());
122 }
123
124
125
126
127
128
129
130 @Override
131 public long count(final Count count) throws MongoDbException {
132 final ListenableFuture<Long> future = countAsync(count);
133
134 return FutureUtils.unwrap(future).longValue();
135 }
136
137
138
139
140
141
142
143 @Override
144 public long count(final Count.Builder count) throws MongoDbException {
145 return count(count.build());
146 }
147
148
149
150
151
152
153
154
155
156 @Override
157 public long count(final DocumentAssignable query) throws MongoDbException {
158 return count(query, getReadPreference());
159 }
160
161
162
163
164
165
166
167
168 @Override
169 public long count(final DocumentAssignable query,
170 final ReadPreference readPreference) throws MongoDbException {
171
172 final ListenableFuture<Long> future = countAsync(query, readPreference);
173
174 return FutureUtils.unwrap(future).longValue();
175 }
176
177
178
179
180
181
182
183
184 @Override
185 public long count(final ReadPreference readPreference)
186 throws MongoDbException {
187 return count(BuilderFactory.start(), readPreference);
188 }
189
190
191
192
193
194
195
196
197
198
199 @Override
200 public void createIndex(final boolean unique, final Element... keys)
201 throws MongoDbException {
202 createIndex(null, unique, keys);
203 }
204
205
206
207
208
209
210
211
212
213
214
215 @Override
216 public void createIndex(final DocumentAssignable options,
217 final Element... keys) throws MongoDbException {
218 createIndex(null, options, keys);
219 }
220
221
222
223
224
225
226
227
228
229
230
231 @Override
232 public void createIndex(final Element... keys) throws MongoDbException {
233 createIndex(EMPTY_INDEX_OPTIONS, keys);
234 }
235
236
237
238
239
240
241
242
243
244
245
246
247 @Override
248 public void createIndex(final String name, final boolean unique,
249 final Element... keys) throws MongoDbException {
250 createIndex(name, unique ? UNIQUE_INDEX_OPTIONS : EMPTY_INDEX_OPTIONS,
251 keys);
252 }
253
254
255
256
257
258
259
260
261
262
263 @Override
264 public void createIndex(final String name,
265 final DocumentAssignable options, final Element... keys)
266 throws MongoDbException {
267
268 if (isCreateIndexesSupported()) {
269 final CreateIndexCommand command = new CreateIndexCommand(
270 getDatabaseName(), getName(), keys, name, options);
271
272 final FutureReplyCallback callback = new FutureReplyCallback();
273 myClient.send(command, new ValidatingReplyCallback(callback));
274
275 FutureUtils.unwrap(callback);
276 }
277 else {
278
279 String indexName = name;
280 if ((name == null) || name.isEmpty()) {
281 indexName = CreateIndexCommand.buildIndexName(keys);
282 }
283
284 final DocumentBuilder indexEntryBuilder = BuilderFactory.start();
285 indexEntryBuilder.addString("name", indexName);
286 indexEntryBuilder.addString("ns", getDatabaseName() + "."
287 + getName());
288
289 final DocumentBuilder keyBuilder = indexEntryBuilder.push("key");
290 for (final Element key : keys) {
291 keyBuilder.add(key);
292 }
293
294 for (final Element option : options.asDocument()) {
295 indexEntryBuilder.add(option);
296 }
297
298 final SynchronousMongoCollectionImpl indexCollection = new SynchronousMongoCollectionImpl(
299 myClient, myDatabase, "system.indexes");
300 final Document indexDocument = indexEntryBuilder.build();
301 if (indexCollection.findOne(indexDocument) == null) {
302
303 final Version requiredServerVersion = determineIndexServerVersion(keys);
304
305 final FutureCallback<Integer> callback = new FutureCallback<Integer>();
306 indexCollection.doInsertAsync(callback, false, Durability.ACK,
307 requiredServerVersion, indexDocument);
308
309 FutureUtils.unwrap(callback);
310 }
311 }
312 }
313
314
315
316
317
318
319
320
321
322
323
324
325 @Override
326 public long delete(final DocumentAssignable query) throws MongoDbException {
327 return delete(query, DELETE_SINGLE_DELETE_DEFAULT, getDurability());
328 }
329
330
331
332
333
334
335
336
337
338
339
340 @Override
341 public long delete(final DocumentAssignable query,
342 final boolean singleDelete) throws MongoDbException {
343 return delete(query, singleDelete, getDurability());
344 }
345
346
347
348
349
350
351
352
353
354
355 @Override
356 public long delete(final DocumentAssignable query,
357 final boolean singleDelete, final Durability durability)
358 throws MongoDbException {
359
360 final ListenableFuture<Long> future = deleteAsync(query, singleDelete,
361 durability);
362
363 return FutureUtils.unwrap(future).longValue();
364 }
365
366
367
368
369
370
371
372
373
374
375
376 @Override
377 public long delete(final DocumentAssignable query,
378 final Durability durability) throws MongoDbException {
379 return delete(query, DELETE_SINGLE_DELETE_DEFAULT, durability);
380 }
381
382
383
384
385
386
387
388 @Override
389 public MongoIterator<Element> distinct(final Distinct command)
390 throws MongoDbException {
391 return FutureUtils.unwrap(distinctAsync(command));
392 }
393
394
395
396
397
398
399
400 @Override
401 public MongoIterator<Element> distinct(final Distinct.Builder command)
402 throws MongoDbException {
403 return distinct(command.build());
404 }
405
406
407
408
409
410
411
412
413
414 @Override
415 public boolean drop() {
416 final Document result = myDatabase.runCommand("drop", myName, null);
417 final List<NumericElement> okElem = result.find(NumericElement.class,
418 "ok");
419
420 return ((okElem.size() > 0) && (okElem.get(0).getIntValue() > 0));
421 }
422
423
424
425
426
427
428
429 @Override
430 public boolean dropIndex(final IntegerElement... keys)
431 throws MongoDbException {
432 return dropIndex(CreateIndexCommand.buildIndexName(keys));
433 }
434
435
436
437
438
439
440
441
442 @Override
443 public boolean dropIndex(final String name) throws MongoDbException {
444
445 final DocumentBuilder options = BuilderFactory.start();
446 options.addString("index", name);
447
448 final Document result = myDatabase.runCommand("deleteIndexes", myName,
449 options.build());
450 final List<NumericElement> okElem = result.find(NumericElement.class,
451 "ok");
452
453 return ((okElem.size() > 0) && (okElem.get(0).getIntValue() > 0));
454 }
455
456
457
458
459 @Override
460 public boolean exists() throws MongoDbException {
461 return myDatabase.listCollectionNames().contains(getName());
462 }
463
464
465
466
467
468
469
470 @Override
471 public Document explain(final Aggregate aggregation)
472 throws MongoDbException {
473 return FutureUtils.unwrap(explainAsync(aggregation));
474 }
475
476
477
478
479
480
481
482 @Override
483 public Document explain(final Aggregate.Builder aggregation)
484 throws MongoDbException {
485 return FutureUtils.unwrap(explainAsync(aggregation.build()));
486
487 }
488
489
490
491
492
493
494
495
496
497 @Override
498 public Document explain(final DocumentAssignable query)
499 throws MongoDbException {
500 return explain(new Find.Builder(query).build());
501 }
502
503
504
505
506
507
508
509
510
511 @Override
512 public Document explain(final Find query) throws MongoDbException {
513 return FutureUtils.unwrap(explainAsync(query));
514 }
515
516
517
518
519
520
521
522 @Override
523 public Document explain(final Find.Builder query) throws MongoDbException {
524 return explain(query.build());
525 }
526
527
528
529
530
531
532
533
534
535 @Override
536 public MongoIterator<Document> find(final DocumentAssignable query)
537 throws MongoDbException {
538 return FutureUtils.unwrap(findAsync(query));
539 }
540
541
542
543
544
545
546
547
548
549 @Override
550 public MongoIterator<Document> find(final Find query)
551 throws MongoDbException {
552 return FutureUtils.unwrap(findAsync(query));
553 }
554
555
556
557
558
559
560
561 @Override
562 public MongoIterator<Document> find(final Find.Builder query)
563 throws MongoDbException {
564 return find(query.build());
565 }
566
567
568
569
570
571
572
573
574
575 @Override
576 public Document findAndModify(final FindAndModify command)
577 throws MongoDbException {
578 return FutureUtils.unwrap(findAndModifyAsync(command));
579 }
580
581
582
583
584
585
586
587 @Override
588 public Document findAndModify(final FindAndModify.Builder command)
589 throws MongoDbException {
590 return findAndModify(command.build());
591 }
592
593
594
595
596
597
598
599
600
601 @Override
602 public Document findOne(final DocumentAssignable query)
603 throws MongoDbException {
604 return FutureUtils.unwrap(findOneAsync(query));
605 }
606
607
608
609
610
611
612
613
614
615 @Override
616 public Document findOne(final Find query) throws MongoDbException {
617 return FutureUtils.unwrap(findOneAsync(query));
618 }
619
620
621
622
623
624
625
626 @Override
627 public Document findOne(final Find.Builder query) throws MongoDbException {
628 return findOne(query.build());
629 }
630
631
632
633
634
635
636
637 @Override
638 public MongoIterator<Element> groupBy(final GroupBy command)
639 throws MongoDbException {
640 return FutureUtils.unwrap(groupByAsync(command));
641 }
642
643
644
645
646
647
648
649 @Override
650 public MongoIterator<Element> groupBy(final GroupBy.Builder command)
651 throws MongoDbException {
652 return groupBy(command.build());
653 }
654
655
656
657
658
659
660
661
662
663
664
665 @Override
666 public int insert(final boolean continueOnError,
667 final DocumentAssignable... documents) throws MongoDbException {
668 return insert(continueOnError, getDurability(), documents);
669 }
670
671
672
673
674
675
676
677
678
679
680 @Override
681 public int insert(final boolean continueOnError,
682 final Durability durability, final DocumentAssignable... documents)
683 throws MongoDbException {
684 final ListenableFuture<Integer> future = insertAsync(continueOnError,
685 durability, documents);
686
687 return FutureUtils.unwrap(future).intValue();
688 }
689
690
691
692
693
694
695
696
697
698
699
700
701
702 @Override
703 public int insert(final DocumentAssignable... documents)
704 throws MongoDbException {
705 return insert(INSERT_CONTINUE_ON_ERROR_DEFAULT, getDurability(),
706 documents);
707 }
708
709
710
711
712
713
714
715
716
717
718
719 @Override
720 public int insert(final Durability durability,
721 final DocumentAssignable... documents) throws MongoDbException {
722 return insert(INSERT_CONTINUE_ON_ERROR_DEFAULT, durability, documents);
723 }
724
725
726
727
728
729
730
731
732
733
734
735 @Override
736 public boolean isCapped() throws MongoDbException {
737 final Document statistics = stats();
738
739 final NumericElement numeric = statistics.get(NumericElement.class,
740 "capped");
741 if (numeric != null) {
742 return (numeric.getIntValue() != 0);
743 }
744
745 final BooleanElement bool = statistics.get(BooleanElement.class,
746 "capped");
747 if (bool != null) {
748 return bool.getValue();
749 }
750
751
752 return false;
753 }
754
755
756
757
758
759
760
761
762
763 @Override
764 public MongoIterator<Document> mapReduce(final MapReduce command)
765 throws MongoDbException {
766 return FutureUtils.unwrap(mapReduceAsync(command));
767 }
768
769
770
771
772
773
774
775 @Override
776 public MongoIterator<Document> mapReduce(final MapReduce.Builder command)
777 throws MongoDbException {
778 return mapReduce(command.build());
779 }
780
781
782
783
784
785
786
787
788
789 @Override
790 public Collection<MongoIterator<Document>> parallelScan(
791 final ParallelScan parallelScan) throws MongoDbException {
792 return FutureUtils.unwrap(parallelScanAsync(parallelScan));
793 }
794
795
796
797
798
799
800
801
802
803 @Override
804 public Collection<MongoIterator<Document>> parallelScan(
805 final ParallelScan.Builder parallelScan) throws MongoDbException {
806 return parallelScan(parallelScan.build());
807 }
808
809
810
811
812
813
814
815
816 @Override
817 public int save(final DocumentAssignable document) throws MongoDbException {
818 return save(document, getDurability());
819 }
820
821
822
823
824
825
826
827
828 @Override
829 public int save(final DocumentAssignable document,
830 final Durability durability) throws MongoDbException {
831 return FutureUtils.unwrap(saveAsync(document, durability)).intValue();
832 }
833
834
835
836
837
838
839
840 @Override
841 public BatchedAsyncMongoCollection startBatch() {
842 return new BatchedAsyncMongoCollectionImpl(myClient, myDatabase, myName);
843 }
844
845
846
847
848
849
850
851
852
853 @Override
854 public Document stats() throws MongoDbException {
855 return myDatabase.runCommand("collStats", getName(), null);
856 }
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873 @Deprecated
874 @Override
875 public MongoIterator<com.allanbank.mongodb.builder.TextResult> textSearch(
876 final com.allanbank.mongodb.builder.Text command)
877 throws MongoDbException {
878 return FutureUtils.unwrap(textSearchAsync(command));
879 }
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895 @Deprecated
896 @Override
897 public MongoIterator<com.allanbank.mongodb.builder.TextResult> textSearch(
898 final com.allanbank.mongodb.builder.Text.Builder command)
899 throws MongoDbException {
900 return textSearch(command.build());
901 }
902
903
904
905
906
907
908
909
910
911
912
913
914
915 @Override
916 public long update(final DocumentAssignable query,
917 final DocumentAssignable update) throws MongoDbException {
918 return update(query, update, UPDATE_MULTIUPDATE_DEFAULT,
919 UPDATE_UPSERT_DEFAULT, getDurability());
920 }
921
922
923
924
925
926
927
928
929
930
931
932
933 @Override
934 public long update(final DocumentAssignable query,
935 final DocumentAssignable update, final boolean multiUpdate,
936 final boolean upsert) throws MongoDbException {
937 return update(query, update, multiUpdate, upsert, getDurability());
938 }
939
940
941
942
943
944
945
946
947
948
949
950
951 @Override
952 public long update(final DocumentAssignable query,
953 final DocumentAssignable update, final boolean multiUpdate,
954 final boolean upsert, final Durability durability)
955 throws MongoDbException {
956
957 final ListenableFuture<Long> future = updateAsync(query, update,
958 multiUpdate, upsert, durability);
959
960 return FutureUtils.unwrap(future).longValue();
961 }
962
963
964
965
966
967
968
969
970
971
972
973
974 @Override
975 public long update(final DocumentAssignable query,
976 final DocumentAssignable update, final Durability durability)
977 throws MongoDbException {
978 return update(query, update, UPDATE_MULTIUPDATE_DEFAULT,
979 UPDATE_UPSERT_DEFAULT, durability);
980 }
981
982
983
984
985
986
987
988 @Override
989 public Document updateOptions(final DocumentAssignable options)
990 throws MongoDbException {
991 final FutureCallback<Document> future = new FutureCallback<Document>(
992 getLockType());
993
994 final DocumentBuilder commandDoc = BuilderFactory.start();
995 commandDoc.add("collMod", getName());
996 addOptions("collMod", options, commandDoc);
997
998 myDatabase.runCommandAsync(future, commandDoc.build(),
999 Version.VERSION_2_2);
1000
1001 return FutureUtils.unwrap(future);
1002 }
1003
1004
1005
1006
1007
1008
1009
1010 @Override
1011 public Document validate(final ValidateMode mode) throws MongoDbException {
1012 Document result = null;
1013
1014 switch (mode) {
1015 case INDEX_ONLY:
1016 result = myDatabase.runCommand("validate", getName(),
1017 BuilderFactory.start().add("scandata", false).build());
1018 break;
1019 case NORMAL:
1020 result = myDatabase.runCommand("validate", getName(), null);
1021 break;
1022 case FULL:
1023 result = myDatabase.runCommand("validate", getName(),
1024 BuilderFactory.start().add("full", true).build());
1025 break;
1026 }
1027
1028 return result;
1029 }
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039 @Override
1040 public long write(final BatchedWrite write) throws MongoDbException {
1041 final ListenableFuture<Long> future = writeAsync(write);
1042
1043 return FutureUtils.unwrap(future).longValue();
1044 }
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054 @Override
1055 public long write(final BatchedWrite.Builder write) throws MongoDbException {
1056 return write(write.build());
1057 }
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069 protected void addOptions(final String command,
1070 final DocumentAssignable options, final DocumentBuilder builder) {
1071 if (options != null) {
1072 for (final Element element : options.asDocument()) {
1073 if (!command.equals(element.getName())) {
1074 builder.add(element);
1075 }
1076 }
1077 }
1078 }
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088 protected Version determineIndexServerVersion(final Element[] keys) {
1089 Version result = null;
1090
1091 for (final Element key : keys) {
1092 if (key.getType() == ElementType.STRING) {
1093 final String type = key.getValueAsString();
1094 if (Index.GEO_2DSPHERE_INDEX_NAME.equals(type)) {
1095 result = Version.later(result, Version.VERSION_2_4);
1096 }
1097 else if (Index.HASHED_INDEX_NAME.equals(type)) {
1098 result = Version.later(result, Version.VERSION_2_4);
1099 }
1100 else if (Index.TEXT_INDEX_NAME.equals(type)) {
1101 result = Version.later(result, Version.VERSION_2_4);
1102 }
1103 }
1104 }
1105
1106 return result;
1107 }
1108
1109
1110
1111
1112
1113
1114
1115
1116 protected boolean isCreateIndexesSupported() {
1117 final ClusterStats clusterStats = myClient.getClusterStats();
1118 final VersionRange serverVersionRange = clusterStats
1119 .getServerVersionRange();
1120 final Version minServerVersion = serverVersionRange.getLowerBounds();
1121
1122 return (CreateIndexCommand.REQUIRED_VERSION.compareTo(minServerVersion) <= 0);
1123 }
1124 }