1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package com.allanbank.mongodb.client;
21
22 import java.lang.ref.Reference;
23 import java.lang.ref.ReferenceQueue;
24 import java.util.ArrayList;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ConcurrentMap;
29
30 import com.allanbank.mongodb.Callback;
31 import com.allanbank.mongodb.Durability;
32 import com.allanbank.mongodb.LambdaCallback;
33 import com.allanbank.mongodb.ListenableFuture;
34 import com.allanbank.mongodb.LockType;
35 import com.allanbank.mongodb.MongoClient;
36 import com.allanbank.mongodb.MongoCollection;
37 import com.allanbank.mongodb.MongoDatabase;
38 import com.allanbank.mongodb.MongoDbException;
39 import com.allanbank.mongodb.MongoIterator;
40 import com.allanbank.mongodb.ProfilingStatus;
41 import com.allanbank.mongodb.ReadPreference;
42 import com.allanbank.mongodb.Version;
43 import com.allanbank.mongodb.bson.Document;
44 import com.allanbank.mongodb.bson.DocumentAssignable;
45 import com.allanbank.mongodb.bson.Element;
46 import com.allanbank.mongodb.bson.NumericElement;
47 import com.allanbank.mongodb.bson.builder.BuilderFactory;
48 import com.allanbank.mongodb.bson.builder.DocumentBuilder;
49 import com.allanbank.mongodb.bson.element.StringElement;
50 import com.allanbank.mongodb.client.callback.CursorCallback;
51 import com.allanbank.mongodb.client.callback.ReplyCommandCallback;
52 import com.allanbank.mongodb.client.message.Command;
53 import com.allanbank.mongodb.client.message.Query;
54 import com.allanbank.mongodb.util.FutureUtils;
55
56
57
58
59
60
61
62
63 public class MongoDatabaseImpl implements MongoDatabase {
64
65
66 public static final Document EMPTY_QUERY = MongoCollection.ALL;
67
68
69 protected final Client myClient;
70
71
72 private MongoDatabase myAdminDatabase;
73
74
75 private final ConcurrentMap<String, Reference<MongoCollection>> myCollections;
76
77
78 private Durability myDurability;
79
80
81 private final MongoClient myMongoClient;
82
83
84 private final String myName;
85
86
87 private ReadPreference myReadPreference;
88
89
90 private final ReferenceQueue<MongoCollection> myReferenceQueue = new ReferenceQueue<MongoCollection>();
91
92
93
94
95
96
97
98
99
100
101
102 public MongoDatabaseImpl(final MongoClient mongoClient,
103 final Client client, final String name) {
104 myMongoClient = mongoClient;
105 myClient = client;
106 myName = name;
107 myDurability = null;
108 myReadPreference = null;
109 myCollections = new ConcurrentHashMap<String, Reference<MongoCollection>>();
110 }
111
112
113
114
115
116
117
118
119 @Override
120 public boolean createCappedCollection(final String name, final long size)
121 throws MongoDbException {
122 return createCollection(name, BuilderFactory.start()
123 .add("capped", true).add("size", size));
124 }
125
126
127
128
129
130
131
132
133 @Override
134 public boolean createCollection(final String name,
135 final DocumentAssignable options) throws MongoDbException {
136 final Document result = runCommand("create", name, options);
137 final NumericElement okElem = result.get(NumericElement.class, "ok");
138
139 return ((okElem != null) && (okElem.getIntValue() > 0));
140 }
141
142
143
144
145
146
147
148
149
150 @Override
151 public boolean drop() {
152 final Document result = runCommand("dropDatabase");
153 final NumericElement okElem = result.get(NumericElement.class, "ok");
154
155 return ((okElem != null) && (okElem.getIntValue() > 0));
156 }
157
158
159
160
161 @Override
162 public boolean exists() {
163 return myMongoClient.listDatabaseNames().contains(getName());
164 }
165
166
167
168
169
170
171
172
173
174 @Override
175 public MongoCollection getCollection(final String name) {
176 MongoCollection collection = null;
177 Reference<MongoCollection> ref = myCollections.get(name);
178 if (ref != null) {
179 collection = ref.get();
180 if (collection == null) {
181
182 myCollections.remove(name, ref);
183 }
184 }
185
186
187 if (collection == null) {
188 collection = new SynchronousMongoCollectionImpl(myClient, this,
189 name);
190 ref = new NamedReference<MongoCollection>(name, collection,
191 myReferenceQueue);
192
193 final Reference<MongoCollection> existing = myCollections
194 .putIfAbsent(name, ref);
195 if (existing != null) {
196 final MongoCollection existingCollection = existing.get();
197 if (existingCollection != null) {
198 collection = existingCollection;
199 }
200
201
202
203 }
204 }
205
206
207 Reference<?> polled;
208 while ((polled = myReferenceQueue.poll()) != null) {
209 if (polled instanceof NamedReference) {
210 myCollections.remove(((NamedReference<?>) polled).getName(),
211 polled);
212 }
213 }
214
215 return collection;
216 }
217
218
219
220
221 @Override
222 public Durability getDurability() {
223 Durability result = myDurability;
224 if (result == null) {
225 result = myClient.getDefaultDurability();
226 }
227 return result;
228 }
229
230
231
232
233 @Override
234 public String getName() {
235 return myName;
236 }
237
238
239
240
241
242
243
244
245
246
247 @Override
248 public ProfilingStatus getProfilingStatus() throws MongoDbException {
249 final Document result = runCommand("profile", -1, null);
250
251 final NumericElement level = result.get(NumericElement.class, "was");
252 final NumericElement millis = result
253 .get(NumericElement.class, "slowms");
254
255 if ((level != null) && (millis != null)) {
256 final ProfilingStatus.Level l = ProfilingStatus.Level
257 .fromValue(level.getIntValue());
258 if (l != null) {
259 switch (l) {
260 case NONE:
261 return ProfilingStatus.OFF;
262 case ALL:
263 return ProfilingStatus.ON;
264 case SLOW_ONLY:
265 return ProfilingStatus.slow(millis.getIntValue());
266 }
267 }
268 }
269
270
271 return null;
272
273 }
274
275
276
277
278 @Override
279 public ReadPreference getReadPreference() {
280 ReadPreference result = myReadPreference;
281 if (result == null) {
282 result = myClient.getDefaultReadPreference();
283 }
284 return result;
285 }
286
287
288
289
290
291
292
293
294
295
296 @Override
297 public List<String> listCollectionNames() {
298 final Query query = new Query(myName, "system.namespaces", EMPTY_QUERY,
299 null,
300 0, 0, 0,
301 false, ReadPreference.PRIMARY,
302 false, false,
303 false, false);
304
305 final FutureCallback<MongoIterator<Document>> iterFuture = new FutureCallback<MongoIterator<Document>>(
306 getLockType());
307 final CursorCallback callback = new CursorCallback(myClient, query,
308 false, iterFuture);
309
310 myClient.send(query, callback);
311
312 final List<String> names = new ArrayList<String>();
313 final Iterator<Document> iter = FutureUtils.unwrap(iterFuture);
314 while (iter.hasNext()) {
315 final Document collection = iter.next();
316 for (final StringElement nameElement : collection.find(
317 StringElement.class, "name")) {
318 final String name = nameElement.getValue();
319 if ((name.indexOf('$') >= 0) && (name.indexOf(".oplog.$") < 0)) {
320 continue;
321 }
322
323 names.add(name.substring(myName.length() + 1));
324 }
325 }
326
327 return names;
328 }
329
330
331
332
333
334
335
336
337
338
339 @Override
340 @Deprecated
341 public List<String> listCollections() {
342 return listCollectionNames();
343 }
344
345
346
347
348
349
350
351
352
353 @Override
354 public Document runAdminCommand(final String command)
355 throws MongoDbException {
356 return getAdminDatabase().runCommand(command);
357 }
358
359
360
361
362
363
364
365
366
367
368 @Override
369 public Document runAdminCommand(final String command,
370 final DocumentAssignable options) throws MongoDbException {
371 return getAdminDatabase().runCommand(command, options);
372 }
373
374
375
376
377
378
379
380
381
382
383 @Override
384 public Document runAdminCommand(final String commandName,
385 final String commandValue, final DocumentAssignable options)
386 throws MongoDbException {
387 return getAdminDatabase()
388 .runCommand(commandName, commandValue, options);
389 }
390
391
392
393
394
395
396
397
398
399
400 @Override
401 public Document runCommand(final DocumentAssignable command)
402 throws MongoDbException {
403 return FutureUtils.unwrap(runCommandAsync(command));
404 }
405
406
407
408
409
410
411
412
413
414
415
416 @Override
417 public Document runCommand(final String command) throws MongoDbException {
418 return FutureUtils.unwrap(runCommandAsync(command, null));
419 }
420
421
422
423
424
425
426
427
428
429
430 @Override
431 public Document runCommand(final String command,
432 final DocumentAssignable options) throws MongoDbException {
433 return FutureUtils.unwrap(runCommandAsync(command, options));
434 }
435
436
437
438
439
440
441
442
443
444
445 @Override
446 public Document runCommand(final String commandName,
447 final int commandValue, final DocumentAssignable options)
448 throws MongoDbException {
449 return FutureUtils.unwrap(runCommandAsync(commandName, commandValue,
450 options));
451 }
452
453
454
455
456
457
458
459
460
461
462 @Override
463 public Document runCommand(final String commandName,
464 final String commandValue, final DocumentAssignable options)
465 throws MongoDbException {
466 return FutureUtils.unwrap(runCommandAsync(commandName, commandValue,
467 options));
468 }
469
470
471
472
473
474
475
476
477
478
479
480 @Override
481 public void runCommandAsync(final Callback<Document> reply,
482 final DocumentAssignable command) throws MongoDbException {
483 runCommandAsync(reply, command, null);
484 }
485
486
487
488
489
490
491
492 @Override
493 public void runCommandAsync(final Callback<Document> reply,
494 final DocumentAssignable command, final Version requireServerVersion)
495 throws MongoDbException {
496 final Command commandMessage = new Command(myName,
497 Command.COMMAND_COLLECTION, command.asDocument(),
498 ReadPreference.PRIMARY,
499 VersionRange.minimum(requireServerVersion));
500
501 myClient.send(commandMessage, new ReplyCommandCallback(reply));
502 }
503
504
505
506
507
508
509
510
511
512
513
514 @Override
515 public void runCommandAsync(final Callback<Document> reply,
516 final String command) throws MongoDbException {
517 runCommandAsync(reply, command, null);
518 }
519
520
521
522
523
524
525
526
527 @Override
528 public void runCommandAsync(final Callback<Document> reply,
529 final String command, final DocumentAssignable options)
530 throws MongoDbException {
531 final DocumentBuilder builder = BuilderFactory.start();
532 builder.addInteger(command, 1);
533 addOptions(command, options, builder);
534
535 runCommandAsync(reply, builder);
536 }
537
538
539
540
541
542
543
544
545 @Override
546 public void runCommandAsync(final Callback<Document> reply,
547 final String commandName, final int commandValue,
548 final DocumentAssignable options) throws MongoDbException {
549 final DocumentBuilder builder = BuilderFactory.start();
550 builder.add(commandName, commandValue);
551 addOptions(commandName, options, builder);
552
553 runCommandAsync(reply, builder);
554 }
555
556
557
558
559
560
561
562
563 @Override
564 public void runCommandAsync(final Callback<Document> reply,
565 final String commandName, final String commandValue,
566 final DocumentAssignable options) throws MongoDbException {
567 final DocumentBuilder builder = BuilderFactory.start();
568 builder.add(commandName, commandValue);
569 addOptions(commandName, options, builder);
570
571 runCommandAsync(reply, builder);
572 }
573
574
575
576
577
578
579
580
581
582
583 @Override
584 public ListenableFuture<Document> runCommandAsync(
585 final DocumentAssignable command) throws MongoDbException {
586 final FutureCallback<Document> future = new FutureCallback<Document>(
587 getLockType());
588
589 runCommandAsync(future, command);
590
591 return future;
592 }
593
594
595
596
597
598
599
600
601 @Override
602 public void runCommandAsync(final LambdaCallback<Document> reply,
603 final DocumentAssignable command) throws MongoDbException {
604 runCommandAsync(new LambdaCallbackAdapter<Document>(reply), command);
605 }
606
607
608
609
610
611
612
613
614
615 @Override
616 public void runCommandAsync(final LambdaCallback<Document> reply,
617 final DocumentAssignable command,
618 final Version requiredServerVersion) throws MongoDbException {
619 runCommandAsync(new LambdaCallbackAdapter<Document>(reply), command,
620 requiredServerVersion);
621 }
622
623
624
625
626
627
628
629
630 @Override
631 public void runCommandAsync(final LambdaCallback<Document> reply,
632 final String command) throws MongoDbException {
633 runCommandAsync(new LambdaCallbackAdapter<Document>(reply), command);
634 }
635
636
637
638
639
640
641
642
643
644 @Override
645 public void runCommandAsync(final LambdaCallback<Document> reply,
646 final String command, final DocumentAssignable options)
647 throws MongoDbException {
648 runCommandAsync(new LambdaCallbackAdapter<Document>(reply), command,
649 options);
650 }
651
652
653
654
655
656
657
658
659
660 @Override
661 public void runCommandAsync(final LambdaCallback<Document> reply,
662 final String commandName, final int commandValue,
663 final DocumentAssignable options) throws MongoDbException {
664 runCommandAsync(new LambdaCallbackAdapter<Document>(reply),
665 commandName, commandValue, options);
666 }
667
668
669
670
671
672
673
674
675
676 @Override
677 public void runCommandAsync(final LambdaCallback<Document> reply,
678 final String commandName, final String commandValue,
679 final DocumentAssignable options) throws MongoDbException {
680 runCommandAsync(new LambdaCallbackAdapter<Document>(reply),
681 commandName, commandValue, options);
682 }
683
684
685
686
687
688
689
690
691
692
693
694 @Override
695 public ListenableFuture<Document> runCommandAsync(final String command)
696 throws MongoDbException {
697 final FutureCallback<Document> future = new FutureCallback<Document>(
698 getLockType());
699
700 runCommandAsync(future, command, null);
701
702 return future;
703 }
704
705
706
707
708
709
710
711
712
713
714 @Override
715 public ListenableFuture<Document> runCommandAsync(final String command,
716 final DocumentAssignable options) throws MongoDbException {
717 final FutureCallback<Document> future = new FutureCallback<Document>(
718 getLockType());
719
720 runCommandAsync(future, command, options);
721
722 return future;
723 }
724
725
726
727
728
729
730
731
732
733
734
735 @Override
736 public ListenableFuture<Document> runCommandAsync(final String commandName,
737 final int commandValue, final DocumentAssignable options)
738 throws MongoDbException {
739 final FutureCallback<Document> future = new FutureCallback<Document>(
740 getLockType());
741
742 runCommandAsync(future, commandName, commandValue, options);
743
744 return future;
745 }
746
747
748
749
750
751
752
753
754
755
756
757 @Override
758 public ListenableFuture<Document> runCommandAsync(final String commandName,
759 final String commandValue, final DocumentAssignable options)
760 throws MongoDbException {
761 final FutureCallback<Document> future = new FutureCallback<Document>(
762 getLockType());
763
764 runCommandAsync(future, commandName, commandValue, options);
765
766 return future;
767 }
768
769
770
771
772 @Override
773 public void setDurability(final Durability durability) {
774 myDurability = durability;
775 }
776
777
778
779
780
781
782
783
784
785 @Override
786 public boolean setProfilingStatus(final ProfilingStatus profileLevel)
787 throws MongoDbException {
788 final Document result = runCommand(
789 "profile",
790 profileLevel.getLevel().getValue(),
791 BuilderFactory.start().add("slowms",
792 profileLevel.getSlowMillisThreshold()));
793
794 final NumericElement level = result.get(NumericElement.class, "was");
795 final NumericElement millis = result
796 .get(NumericElement.class, "slowms");
797
798 if ((level != null) && (millis != null)) {
799 final ProfilingStatus.Level l = ProfilingStatus.Level
800 .fromValue(level.getIntValue());
801 if (l != null) {
802 switch (l) {
803 case NONE:
804 return !ProfilingStatus.Level.NONE.equals(profileLevel
805 .getLevel());
806 case ALL:
807 return !ProfilingStatus.Level.ALL.equals(profileLevel
808 .getLevel());
809 case SLOW_ONLY:
810 final ProfilingStatus before = ProfilingStatus.slow(millis
811 .getIntValue());
812 return !before.equals(profileLevel);
813 }
814 }
815 }
816
817
818 return true;
819 }
820
821
822
823
824 @Override
825 public void setReadPreference(final ReadPreference readPreference) {
826 myReadPreference = readPreference;
827 }
828
829
830
831
832
833
834
835
836
837 @Override
838 public Document stats() throws MongoDbException {
839 return runCommand("dbStats");
840 }
841
842
843
844
845
846
847
848
849
850
851
852 protected void addOptions(final String command,
853 final DocumentAssignable options, final DocumentBuilder builder) {
854 if (options != null) {
855 for (final Element element : options.asDocument()) {
856 if (!command.equals(element.getName())) {
857 builder.add(element);
858 }
859 }
860 }
861 }
862
863
864
865
866
867
868 protected LockType getLockType() {
869 return myClient.getConfig().getLockType();
870 }
871
872
873
874
875
876
877
878 private MongoDatabase getAdminDatabase() {
879 if (myAdminDatabase == null) {
880 if (myName.equals("admin")) {
881 myAdminDatabase = this;
882 }
883 else {
884 myAdminDatabase = myMongoClient.getDatabase("admin");
885 }
886 }
887
888 return myAdminDatabase;
889 }
890 }