1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package com.allanbank.mongodb.bson.io;
21
22 import java.io.DataInput;
23 import java.io.EOFException;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.StreamCorruptedException;
27 import java.nio.charset.Charset;
28 import java.util.ArrayList;
29 import java.util.List;
30
31 import com.allanbank.mongodb.bson.Document;
32 import com.allanbank.mongodb.bson.Element;
33 import com.allanbank.mongodb.bson.ElementType;
34 import com.allanbank.mongodb.bson.element.ArrayElement;
35 import com.allanbank.mongodb.bson.element.BinaryElement;
36 import com.allanbank.mongodb.bson.element.BooleanElement;
37 import com.allanbank.mongodb.bson.element.DocumentElement;
38 import com.allanbank.mongodb.bson.element.DoubleElement;
39 import com.allanbank.mongodb.bson.element.IntegerElement;
40 import com.allanbank.mongodb.bson.element.JavaScriptElement;
41 import com.allanbank.mongodb.bson.element.JavaScriptWithScopeElement;
42 import com.allanbank.mongodb.bson.element.LongElement;
43 import com.allanbank.mongodb.bson.element.MaxKeyElement;
44 import com.allanbank.mongodb.bson.element.MinKeyElement;
45 import com.allanbank.mongodb.bson.element.MongoTimestampElement;
46 import com.allanbank.mongodb.bson.element.NullElement;
47 import com.allanbank.mongodb.bson.element.ObjectId;
48 import com.allanbank.mongodb.bson.element.ObjectIdElement;
49 import com.allanbank.mongodb.bson.element.RegularExpressionElement;
50 import com.allanbank.mongodb.bson.element.StringElement;
51 import com.allanbank.mongodb.bson.element.SymbolElement;
52 import com.allanbank.mongodb.bson.element.TimestampElement;
53 import com.allanbank.mongodb.bson.element.UuidElement;
54 import com.allanbank.mongodb.bson.impl.RootDocument;
55
56
57
58
59
60
61
62
63
64
65
66 public class BsonInputStream extends InputStream {
67
68
69 public final static Charset UTF8 = StringDecoder.UTF8;
70
71
72 private byte[] myBuffer;
73
74
75 private int myBufferLimit;
76
77
78 private int myBufferOffset;
79
80
81 private long myBytesRead;
82
83
84 private final InputStream myInput;
85
86
87 private final StringDecoder myStringDecoder;
88
89
90
91
92
93
94
95 public BsonInputStream(final InputStream input) {
96 this(input, 8 * 1024);
97 }
98
99
100
101
102
103
104
105
106
107
108
109 public BsonInputStream(final InputStream input,
110 final int expectedMaxDocumentSize) {
111 this(input, expectedMaxDocumentSize, new StringDecoderCache());
112 }
113
114
115
116
117
118
119
120
121
122
123
124
125
126 public BsonInputStream(final InputStream input,
127 final int expectedMaxDocumentSize, final StringDecoderCache cache) {
128 myInput = input;
129 myBuffer = new byte[expectedMaxDocumentSize];
130 myBufferOffset = 0;
131 myBufferLimit = 0;
132 myBytesRead = 0;
133
134 myStringDecoder = new StringDecoder(cache);
135 }
136
137
138
139
140
141
142
143
144
145 public BsonInputStream(final InputStream input,
146 final StringDecoderCache cache) {
147 this(input, 8 * 1024, cache);
148 }
149
150
151
152
153
154
155
156
157 @Override
158 public int available() throws IOException {
159 return availableInBuffer() + myInput.available();
160 }
161
162
163
164
165
166
167
168 @Override
169 public void close() throws IOException {
170 myInput.close();
171 }
172
173
174
175
176
177
178 public long getBytesRead() {
179 return myBytesRead + myBufferOffset;
180 }
181
182
183
184
185
186
187
188
189
190
191
192 @Deprecated
193 public int getMaxCachedStringEntries() {
194 return myStringDecoder.getCache().getMaxCacheEntries();
195 }
196
197
198
199
200
201
202
203
204
205
206
207 @Deprecated
208 public int getMaxCachedStringLength() {
209 return myStringDecoder.getCache().getMaxCacheLength();
210 }
211
212
213
214
215
216
217
218 @Override
219 public synchronized void mark(final int readlimit) {
220 throw new UnsupportedOperationException("Mark not supported.");
221 }
222
223
224
225
226
227
228
229 @Override
230 public boolean markSupported() {
231 return false;
232 }
233
234
235
236
237
238
239
240
241
242
243 public final void prefetch(final int size) throws IOException {
244 fetch(size, false);
245 }
246
247
248
249
250
251
252
253 @Override
254 public int read() throws IOException {
255 if (ensureFetched(1) != 1) {
256 return -1;
257 }
258
259 final int read = (myBuffer[myBufferOffset] & 0xFF);
260
261 myBufferOffset += 1;
262
263 return read;
264 }
265
266
267
268
269
270
271
272 @Override
273 public int read(final byte b[]) throws IOException {
274 return read(b, 0, b.length);
275 }
276
277
278
279
280
281
282
283 @Override
284 public int read(final byte b[], final int off, final int len)
285 throws IOException {
286 final int read = ensureFetched(len - off);
287
288 System.arraycopy(myBuffer, myBufferOffset, b, off, read);
289
290 myBufferOffset += read;
291
292 return read;
293 }
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313 public String readCString() throws EOFException, IOException {
314
315 while (true) {
316 for (int i = myBufferOffset; i < myBufferLimit; ++i) {
317 final byte b = myBuffer[i];
318 if (b == 0) {
319
320 final int offset = myBufferOffset;
321 final int length = (1 + i) - offset;
322
323
324 myBufferOffset = i + 1;
325
326 return myStringDecoder.decode(myBuffer, offset, length);
327 }
328 }
329
330
331 ensureFetched(availableInBuffer() + 1);
332 }
333 }
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348 public Document readDocument() throws IOException {
349
350
351 final int size = readInt();
352
353 prefetch(size - 4);
354
355 return new RootDocument(readElements(), false, size);
356 }
357
358
359
360
361
362
363
364
365
366
367
368
369
370 public void readFully(final byte[] buffer) throws EOFException, IOException {
371 readFully(buffer, 0, buffer.length);
372 }
373
374
375
376
377
378
379
380
381
382
383 public int readInt() throws EOFException, IOException {
384 if (ensureFetched(4) != 4) {
385 throw new EOFException();
386 }
387
388
389 int result = (myBuffer[myBufferOffset] & 0xFF);
390 result += (myBuffer[myBufferOffset + 1] & 0xFF) << 8;
391 result += (myBuffer[myBufferOffset + 2] & 0xFF) << 16;
392 result += (myBuffer[myBufferOffset + 3] & 0xFF) << 24;
393
394 myBufferOffset += 4;
395
396 return result;
397 }
398
399
400
401
402
403
404
405
406
407
408 public long readLong() throws EOFException, IOException {
409 if (ensureFetched(8) != 8) {
410 throw new EOFException();
411 }
412
413
414 long result = (myBuffer[myBufferOffset] & 0xFFL);
415 result += (myBuffer[myBufferOffset + 1] & 0xFFL) << 8;
416 result += (myBuffer[myBufferOffset + 2] & 0xFFL) << 16;
417 result += (myBuffer[myBufferOffset + 3] & 0xFFL) << 24;
418 result += (myBuffer[myBufferOffset + 4] & 0xFFL) << 32;
419 result += (myBuffer[myBufferOffset + 5] & 0xFFL) << 40;
420 result += (myBuffer[myBufferOffset + 6] & 0xFFL) << 48;
421 result += (myBuffer[myBufferOffset + 7] & 0xFFL) << 56;
422
423 myBufferOffset += 8;
424
425 return result;
426 }
427
428
429
430
431
432
433
434 @Override
435 public synchronized void reset() throws UnsupportedOperationException {
436 throw new UnsupportedOperationException("Mark not supported.");
437 }
438
439
440
441
442
443
444
445
446
447
448
449
450 @Deprecated
451 public void setMaxCachedStringEntries(final int maxCacheEntries) {
452 myStringDecoder.getCache().setMaxCacheEntries(maxCacheEntries);
453 }
454
455
456
457
458
459
460
461
462
463
464
465
466
467 @Deprecated
468 public void setMaxCachedStringLength(final int maxlength) {
469 myStringDecoder.getCache().setMaxCacheLength(maxlength);
470
471 }
472
473
474
475
476
477
478
479 @Override
480 public long skip(final long n) throws IOException {
481
482 long skipped = Math.min(n, availableInBuffer());
483 myBufferOffset += skipped;
484
485 if (skipped < n) {
486
487 final long streamSkipped = myInput.skip(n - skipped);
488 myBytesRead += streamSkipped;
489 skipped += streamSkipped;
490 }
491
492 return skipped;
493 }
494
495
496
497
498
499
500 protected final int availableInBuffer() {
501 return myBufferLimit - myBufferOffset;
502 }
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517 protected ArrayElement readArrayElement() throws IOException {
518
519 final long start = getBytesRead() - 1;
520
521 final String name = readCString();
522 final int fetch = readInt();
523 prefetch(fetch - 4);
524 final List<Element> elements = readElements();
525 final long size = getBytesRead() - start;
526
527 return new ArrayElement(name, elements, size);
528 }
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546 protected BinaryElement readBinaryElement() throws IOException {
547
548 final long start = getBytesRead() - 1;
549
550 final String name = readCString();
551 int length = readInt();
552
553 final int subType = read();
554 if (subType < 0) {
555 throw new EOFException();
556 }
557
558
559 if (subType == 2) {
560 final int anotherLength = readInt();
561
562 assert (anotherLength == (length - 4)) : "Binary Element Subtye 2 "
563 + "length should be outer length - 4.";
564
565 length -= 4;
566 }
567 else if ((subType == UuidElement.LEGACY_UUID_SUBTTYPE)
568 || (subType == UuidElement.UUID_SUBTTYPE)) {
569
570 final byte[] binary = new byte[length];
571 readFully(binary);
572
573 final long size = getBytesRead() - start;
574 try {
575 return new UuidElement(name, (byte) subType, binary, size);
576 }
577 catch (final IllegalArgumentException iae) {
578
579 return new BinaryElement(name, (byte) subType, binary, size);
580 }
581 }
582
583 final long size = getBytesRead() - start;
584 return new BinaryElement(name, (byte) subType, this, length, size
585 + length);
586 }
587
588
589
590
591
592
593
594
595
596 protected BooleanElement readBooleanElement() throws IOException {
597 final long start = getBytesRead() - 1;
598
599 final String name = readCString();
600 final boolean value = (read() == 1);
601
602 final long size = getBytesRead() - start;
603
604 return new BooleanElement(name, value, size);
605 }
606
607
608
609
610
611
612
613
614
615
616 @Deprecated
617 protected Element readDBPointerElement() throws IOException {
618 final long start = getBytesRead() - 1;
619
620 final String name = readCString();
621 final String dbDotCollection = readString();
622 final int timestamp = EndianUtils.swap(readInt());
623 final long machineId = EndianUtils.swap(readLong());
624
625 final long size = getBytesRead() - start;
626
627 String db = dbDotCollection;
628 String collection = "";
629 final int firstDot = dbDotCollection.indexOf('.');
630 if (0 <= firstDot) {
631 db = dbDotCollection.substring(0, firstDot);
632 collection = dbDotCollection.substring(firstDot + 1);
633 }
634 return new com.allanbank.mongodb.bson.element.DBPointerElement(name,
635 db, collection, new ObjectId(timestamp, machineId), size);
636 }
637
638
639
640
641
642
643
644
645
646
647
648
649 protected DocumentElement readDocumentElement() throws IOException {
650 final long start = getBytesRead() - 1;
651
652 final String name = readCString();
653 final int fetch = readInt();
654
655 prefetch(fetch - 4);
656 final List<Element> elements = readElements();
657
658 final long size = getBytesRead() - start;
659
660 return new DocumentElement(name, elements, true, size);
661 }
662
663
664
665
666
667
668
669
670
671 protected DoubleElement readDoubleElement() throws IOException {
672 final long start = getBytesRead() - 1;
673
674 final String name = readCString();
675 final double value = Double.longBitsToDouble(readLong());
676
677 final long size = getBytesRead() - start;
678
679 return new DoubleElement(name, value, size);
680 }
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717 @SuppressWarnings("deprecation")
718 protected Element readElement(final byte token) throws EOFException,
719 IOException {
720 final ElementType type = ElementType.valueOf(token);
721 if (type == null) {
722 throw new StreamCorruptedException("Unknown element type: 0x"
723 + Integer.toHexString(token & 0xFF) + ".");
724 }
725 switch (type) {
726 case ARRAY: {
727 return readArrayElement();
728 }
729 case BINARY: {
730 return readBinaryElement();
731 }
732 case DB_POINTER: {
733 return readDBPointerElement();
734 }
735 case DOCUMENT: {
736 return readDocumentElement();
737 }
738 case DOUBLE: {
739 return readDoubleElement();
740 }
741 case BOOLEAN: {
742 return readBooleanElement();
743 }
744 case INTEGER: {
745 return readIntegerElement();
746 }
747 case JAVA_SCRIPT: {
748 return readJavaScriptElement();
749 }
750 case JAVA_SCRIPT_WITH_SCOPE: {
751 return readJavaScriptWithScopeElement();
752 }
753 case LONG: {
754 return readLongElement();
755 }
756 case MAX_KEY: {
757 return readMaxKeyElement();
758 }
759 case MIN_KEY: {
760 return readMinKeyElement();
761 }
762 case MONGO_TIMESTAMP: {
763 return readMongoTimestampElement();
764 }
765 case NULL: {
766 return readNullElement();
767 }
768 case OBJECT_ID: {
769 return readObjectIdElement();
770 }
771 case REGEX: {
772 return readRegularExpressionElement();
773 }
774 case STRING: {
775 return readStringElement();
776 }
777 case SYMBOL: {
778 return readSymbolElement();
779 }
780 case UTC_TIMESTAMP: {
781 return readTimestampElement();
782 }
783 }
784
785 throw new StreamCorruptedException("Unknown element type: "
786 + type.name() + ".");
787 }
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802 protected List<Element> readElements() throws EOFException, IOException {
803 final List<Element> elements = new ArrayList<Element>();
804 int elementToken = read();
805 while (elementToken > 0) {
806 elements.add(readElement((byte) elementToken));
807
808 elementToken = read();
809 }
810 if (elementToken < 0) {
811 throw new EOFException();
812 }
813 return elements;
814 }
815
816
817
818
819
820
821
822
823
824 protected IntegerElement readIntegerElement() throws IOException {
825 final long start = getBytesRead() - 1;
826
827 final String name = readCString();
828 final int value = readInt();
829
830 final long size = getBytesRead() - start;
831
832 return new IntegerElement(name, value, size);
833 }
834
835
836
837
838
839
840
841
842
843 protected JavaScriptElement readJavaScriptElement() throws IOException {
844 final long start = getBytesRead() - 1;
845
846 final String name = readCString();
847 final String javascript = readString();
848
849 final long size = getBytesRead() - start;
850
851 return new JavaScriptElement(name, javascript, size);
852 }
853
854
855
856
857
858
859
860
861
862 protected JavaScriptWithScopeElement readJavaScriptWithScopeElement()
863 throws IOException {
864 final long start = getBytesRead() - 1;
865
866 final String name = readCString();
867 readInt();
868 final String javascript = readString();
869 final Document scope = readDocument();
870
871 final long size = getBytesRead() - start;
872
873 return new JavaScriptWithScopeElement(name, javascript, scope, size);
874 }
875
876
877
878
879
880
881
882
883 protected LongElement readLongElement() throws IOException {
884 final long start = getBytesRead() - 1;
885
886 final String name = readCString();
887 final long value = readLong();
888
889 final long size = getBytesRead() - start;
890
891 return new LongElement(name, value, size);
892 }
893
894
895
896
897
898
899
900
901
902 protected MaxKeyElement readMaxKeyElement() throws IOException {
903 final long start = getBytesRead() - 1;
904
905 final String name = readCString();
906
907 final long size = getBytesRead() - start;
908
909 return new MaxKeyElement(name, size);
910 }
911
912
913
914
915
916
917
918
919
920 protected MinKeyElement readMinKeyElement() throws IOException {
921 final long start = getBytesRead() - 1;
922
923 final String name = readCString();
924
925 final long size = getBytesRead() - start;
926
927 return new MinKeyElement(name, size);
928 }
929
930
931
932
933
934
935
936
937
938 protected MongoTimestampElement readMongoTimestampElement()
939 throws IOException {
940 final long start = getBytesRead() - 1;
941
942 final String name = readCString();
943 final long timestamp = readLong();
944
945 final long size = getBytesRead() - start;
946
947 return new MongoTimestampElement(name, timestamp, size);
948 }
949
950
951
952
953
954
955
956
957 protected NullElement readNullElement() throws IOException {
958 final long start = getBytesRead() - 1;
959
960 final String name = readCString();
961
962 final long size = getBytesRead() - start;
963
964 return new NullElement(name, size);
965 }
966
967
968
969
970
971
972
973
974
975 protected ObjectIdElement readObjectIdElement() throws IOException {
976 final long start = getBytesRead() - 1;
977
978 final String name = readCString();
979 final int timestamp = EndianUtils.swap(readInt());
980 final long machineId = EndianUtils.swap(readLong());
981
982 final long size = getBytesRead() - start;
983
984 return new ObjectIdElement(name, new ObjectId(timestamp, machineId),
985 size);
986 }
987
988
989
990
991
992
993
994
995
996 protected RegularExpressionElement readRegularExpressionElement()
997 throws IOException {
998 final long start = getBytesRead() - 1;
999
1000 final String name = readCString();
1001 final String pattern = readCString();
1002 final String options = readCString();
1003
1004 final long size = getBytesRead() - start;
1005
1006 return new RegularExpressionElement(name, pattern, options, size);
1007 }
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027 protected String readString() throws EOFException, IOException {
1028 final int length = readInt();
1029
1030 if (ensureFetched(length) != length) {
1031 throw new EOFException();
1032 }
1033
1034 final int offset = myBufferOffset;
1035
1036
1037 myBufferOffset += length;
1038
1039 return myStringDecoder.decode(myBuffer, offset, length);
1040 }
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050 protected StringElement readStringElement() throws IOException {
1051 final long start = getBytesRead() - 1;
1052
1053 final String name = readCString();
1054 final String value = readString();
1055
1056 final long size = getBytesRead() - start;
1057
1058 return new StringElement(name, value, size);
1059 }
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069 protected SymbolElement readSymbolElement() throws IOException {
1070 final long start = getBytesRead() - 1;
1071
1072 final String name = readCString();
1073 final String symbol = readString();
1074
1075 final long size = getBytesRead() - start;
1076
1077 return new SymbolElement(name, symbol, size);
1078 }
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088 protected TimestampElement readTimestampElement() throws IOException {
1089 final long start = getBytesRead() - 1;
1090
1091 final String name = readCString();
1092 final long time = readLong();
1093
1094 final long size = getBytesRead() - start;
1095
1096 return new TimestampElement(name, time, size);
1097 }
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111 private final int ensureFetched(final int size) throws IOException {
1112 return fetch(size, true);
1113 }
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130 private final int fetch(final int size, final boolean forceRead)
1131 throws IOException {
1132
1133 int available = availableInBuffer();
1134 if (available < size) {
1135
1136
1137
1138 if (myBuffer.length < size) {
1139
1140 final byte[] newBuffer = new byte[size];
1141
1142
1143 System.arraycopy(myBuffer, myBufferOffset, newBuffer, 0,
1144 available);
1145 myBuffer = newBuffer;
1146 }
1147 else if (0 < available) {
1148
1149 System.arraycopy(myBuffer, myBufferOffset, myBuffer, 0,
1150 available);
1151 }
1152
1153
1154 myBytesRead += myBufferOffset;
1155 myBufferOffset = 0;
1156 myBufferLimit = available;
1157
1158
1159 int read;
1160 do {
1161 read = myInput.read(myBuffer, myBufferLimit, myBuffer.length
1162 - myBufferLimit);
1163 if (0 < read) {
1164 available += read;
1165 myBufferLimit += read;
1166 }
1167 }
1168 while (forceRead && (0 <= read) && (available < size));
1169
1170 return Math.min(size, available);
1171 }
1172
1173 return size;
1174 }
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193 private void readFully(final byte[] buffer, final int offset,
1194 final int length) throws EOFException, IOException {
1195
1196 int read = Math.min(length, availableInBuffer());
1197 System.arraycopy(myBuffer, myBufferOffset, buffer, offset, read);
1198 myBufferOffset += read;
1199
1200
1201 while (read < length) {
1202 final int count = myInput
1203 .read(buffer, offset + read, length - read);
1204 if (count < 0) {
1205 throw new EOFException();
1206 }
1207 read += count;
1208
1209
1210 myBytesRead += read;
1211 }
1212 }
1213 }