1 /*
2 * #%L
3 * RandomAccessOutputStream.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20 package com.allanbank.mongodb.bson.io;
21
22 import java.io.IOException;
23 import java.io.OutputStream;
24 import java.nio.charset.Charset;
25 import java.util.ArrayList;
26 import java.util.List;
27
28 /**
29 * Provides a capability similar to the <tt>ByteArrayOutputStream</tt> but also
30 * provides the ability to re-write portions of the stream already written and
31 * to determine the current size (or position) of the written data.
32 * <p>
33 * Instead of allocating a single large byte array this implementation tracks a
34 * group of (increasing in size) buffers. This should reduce the runtime cost of
35 * buffer reallocations since it avoids the copy of contents from one buffer to
36 * another.
37 * </p>
38 *
39 * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
40 * mutated in incompatible ways between any two releases of the driver.
41 * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved
42 */
43 public class RandomAccessOutputStream extends OutputStream {
44 /** UTF-8 Character set for encoding strings. */
45 public final static Charset UTF8 = StringDecoder.UTF8;
46
47 /** The maximum size buffer to allocate. Must be a power of 2. */
48 private static final int BUFFER_SIZE;
49
50 /** Mask for the buffer position within a specific buffer. */
51 private static final int BUFFER_SIZE_MASK;
52
53 /** The number of bits in the buffer size. */
54 private static final int BUFFER_SIZE_SHIFT;
55
56 static {
57 BUFFER_SIZE = Integer.highestOneBit(8192);
58 BUFFER_SIZE_MASK = BUFFER_SIZE - 1;
59 BUFFER_SIZE_SHIFT = Integer.numberOfTrailingZeros(BUFFER_SIZE);
60 }
61
62 /** The set of buffers allocated. */
63 private final List<byte[]> myBuffers;
64
65 /** The current buffer being written. */
66 private byte[] myCurrentBuffer;
67
68 /** The index of the current buffer. */
69 private int myCurrentBufferIndex;
70
71 /** The offset into the current buffer. */
72 private int myCurrentBufferOffset;
73
74 /**
75 * Buffer for serialization of integer types. Not needed for normal integer
76 * writes since the {@link RandomAccessOutputStream} will coalesce the
77 * single byte writes but for the {@link RandomAccessOutputStream#writeAt}
78 * operation a seek to the appropriate backing buffer is required. For large
79 * documents the seeks could be significant. This buffer ensures there is
80 * only 1 seek for each {@link #writeIntAt(long, int)}.
81 */
82 private final byte[] myIntegerBytes;
83
84 /** The current buffer being written. */
85 private long mySize;
86
87 /** The offset into the current buffer. */
88 private final StringEncoder myStringEncoder;
89
90 /**
91 * Creates a new {@link RandomAccessOutputStream}.
92 */
93 public RandomAccessOutputStream() {
94 this(new StringEncoderCache());
95 }
96
97 /**
98 * Creates a new {@link RandomAccessOutputStream}.
99 *
100 * @param cache
101 * The cache for encoding string.
102 */
103 public RandomAccessOutputStream(final StringEncoderCache cache) {
104 mySize = 0;
105 myCurrentBufferOffset = 0;
106 myCurrentBufferIndex = 0;
107 myCurrentBuffer = new byte[BUFFER_SIZE];
108
109 myStringEncoder = new StringEncoder(cache);
110
111 myBuffers = new ArrayList<byte[]>();
112 myBuffers.add(myCurrentBuffer);
113
114 myIntegerBytes = new byte[8];
115 }
116
117 /**
118 * {@inheritDoc}
119 */
120 @Override
121 public void close() {
122 // Nothing.
123 }
124
125 /**
126 * {@inheritDoc}
127 */
128 @Override
129 public void flush() {
130 // Nothing.
131 }
132
133 /**
134 * Returns the maximum number of strings that may have their encoded form
135 * cached.
136 *
137 * @return The maximum number of strings that may have their encoded form
138 * cached.
139 * @deprecated The cache {@link StringEncoderCache} should be controlled
140 * directory. This method will be removed after the 2.1.0
141 * release.
142 */
143 @Deprecated
144 public int getMaxCachedStringEntries() {
145 return myStringEncoder.getCache().getMaxCacheEntries();
146 }
147
148 /**
149 * Returns the maximum length for a string that the stream is allowed to
150 * cache.
151 *
152 * @return The maximum length for a string that the stream is allowed to
153 * cache.
154 * @deprecated The cache {@link StringEncoderCache} should be controlled
155 * directory. This method will be removed after the 2.1.0
156 * release.
157 */
158 @Deprecated
159 public int getMaxCachedStringLength() {
160 return myStringEncoder.getCache().getMaxCacheLength();
161 }
162
163 /**
164 * Returns the current position in the stream. This is equivalent to
165 * {@link #getSize()}.
166 *
167 * @return The current position in the stream.
168 */
169 public long getPosition() {
170 return getSize();
171 }
172
173 /**
174 * Returns the number of bytes written to the stream.
175 *
176 * @return The current number of bytes written to the stream.
177 */
178 public long getSize() {
179 return mySize;
180 }
181
182 /**
183 * Resets the <code>size</code> of the buffer to zero. All buffers can be
184 * re-used.
185 */
186 public void reset() {
187 mySize = 0;
188 myCurrentBufferOffset = 0;
189 myCurrentBufferIndex = 0;
190 myCurrentBuffer = myBuffers.get(0);
191 }
192
193 /**
194 * Sets the value of maximum number of strings that may have their encoded
195 * form cached.
196 *
197 * @param maxCacheEntries
198 * The new value for the maximum number of strings that may have
199 * their encoded form cached.
200 * @deprecated The cache {@link StringEncoderCache} should be controlled
201 * directory. This method will be removed after the 2.1.0
202 * release.
203 */
204 @Deprecated
205 public void setMaxCachedStringEntries(final int maxCacheEntries) {
206 myStringEncoder.getCache().setMaxCacheEntries(maxCacheEntries);
207 }
208
209 /**
210 * Sets the value of length for a string that the stream is allowed to cache
211 * to the new value. This can be used to stop a single long string from
212 * pushing useful values out of the cache.
213 *
214 * @param maxlength
215 * The new value for the length for a string that the encoder is
216 * allowed to cache.
217 * @deprecated The cache {@link StringEncoderCache} should be controlled
218 * directory. This method will be removed after the 2.1.0
219 * release.
220 */
221 @Deprecated
222 public void setMaxCachedStringLength(final int maxlength) {
223 myStringEncoder.getCache().setMaxCacheLength(maxlength);
224
225 }
226
227 /**
228 * {@inheritDoc}
229 *
230 * @param buffer
231 * the data.
232 */
233 @Override
234 public void write(final byte buffer[]) {
235 write(buffer, 0, buffer.length);
236 }
237
238 /**
239 * {@inheritDoc}
240 *
241 * @param buffer
242 * the data.
243 * @param offset
244 * the start offset in the data.
245 * @param length
246 * the number of bytes to write.
247 */
248 @Override
249 public void write(final byte buffer[], final int offset, final int length) {
250 if (buffer == null) {
251 throw new NullPointerException();
252 }
253 else if ((offset < 0) || (offset > buffer.length) || (length < 0)
254 || ((offset + length) > buffer.length)
255 || ((offset + length) < 0)) {
256 throw new IndexOutOfBoundsException();
257 }
258 else if (length == 0) {
259 return;
260 }
261
262 int wrote = 0;
263 while (wrote < length) {
264 if (myCurrentBuffer.length <= myCurrentBufferOffset) {
265 nextBuffer();
266 }
267
268 final int available = myCurrentBuffer.length
269 - myCurrentBufferOffset;
270 final int toWrite = Math.min(length - wrote, available);
271
272 System.arraycopy(buffer, offset + wrote, myCurrentBuffer,
273 myCurrentBufferOffset, toWrite);
274
275 myCurrentBufferOffset += toWrite;
276 mySize += toWrite;
277 wrote += toWrite;
278 }
279 }
280
281 /**
282 * {@inheritDoc}
283 */
284 @Override
285 public void write(final int b) {
286 if (myCurrentBuffer.length <= myCurrentBufferOffset) {
287 nextBuffer();
288 }
289
290 myCurrentBuffer[myCurrentBufferOffset] = (byte) b;
291 myCurrentBufferOffset += 1;
292 mySize += 1;
293 }
294
295 /**
296 * Similar to {@link #write(byte[])} but allows a portion of the already
297 * written buffer to be re-written.
298 * <p>
299 * Equivalent to <code>writeAt(position, buffer, 0, buffer.length);</code>.
300 * </p>
301 *
302 * @param position
303 * The position to write at. This location should have already
304 * been written.
305 * @param buffer
306 * the data.
307 */
308 public void writeAt(final long position, final byte buffer[]) {
309 writeAt(position, buffer, 0, buffer.length);
310 }
311
312 /**
313 * Similar to {@link #write(byte[], int, int)} but allows a portion of the
314 * already written buffer to be re-written.
315 *
316 * @param position
317 * The position to write at. This location should have already
318 * been written.
319 * @param buffer
320 * the data.
321 * @param offset
322 * the start offset in the data.
323 * @param length
324 * the number of bytes to write.
325 */
326 public void writeAt(final long position, final byte buffer[],
327 final int offset, final int length) {
328 if (buffer == null) {
329 throw new NullPointerException();
330 }
331 else if ((offset < 0) || (offset > buffer.length) || (length < 0)
332 || ((offset + length) > buffer.length)
333 || ((offset + length) < 0) || ((position + length) > getSize())) {
334 throw new IndexOutOfBoundsException();
335 }
336 else if (length == 0) {
337 return;
338 }
339
340 // Find the start buffer.
341 final long start = position & BUFFER_SIZE_MASK;
342 int bufferIndex = (int) (position >> BUFFER_SIZE_SHIFT);
343 byte[] internalBuffer = myBuffers.get(bufferIndex);
344
345 // Write into the correct position.
346 int wrote = 0;
347 int internalOffset = (int) start;
348 while (wrote < length) {
349 if (internalBuffer.length <= internalOffset) {
350 bufferIndex += 1;
351 internalBuffer = myBuffers.get(bufferIndex);
352 internalOffset = 0;
353 }
354
355 final int available = internalBuffer.length - internalOffset;
356 final int toWrite = Math.min(length - wrote, available);
357
358 System.arraycopy(buffer, offset + wrote, internalBuffer,
359 internalOffset, toWrite);
360
361 internalOffset += toWrite;
362 wrote += toWrite;
363 }
364 }
365
366 /**
367 * Similar to {@link #write(int)} but allows a portion of the already
368 * written buffer to be re-written.
369 *
370 * @param position
371 * The position to write at. This location should have already
372 * been written.
373 * @param b
374 * The byte value to write.
375 */
376 public void writeAt(final long position, final int b) {
377 // Find the start buffer.
378 final long start = position & BUFFER_SIZE_MASK;
379 final int bufferIndex = (int) (position >> BUFFER_SIZE_SHIFT);
380 final byte[] internalBuffer = myBuffers.get(bufferIndex);
381
382 internalBuffer[(int) start] = (byte) b;
383 }
384
385 /**
386 * Writes a single byte to the stream.
387 *
388 * @param b
389 * The byte to write.
390 */
391 public void writeByte(final byte b) {
392 write(b & 0xFF);
393 }
394
395 /**
396 * Writes a sequence of bytes to the under lying stream.
397 *
398 * @param data
399 * The bytes to write.
400 */
401 public void writeBytes(final byte[] data) {
402 write(data);
403 }
404
405 /**
406 * Writes a "Cstring" to the stream.
407 *
408 * @param strings
409 * The CString to write. The strings are concatenated into a
410 * single CString value.
411 */
412 public void writeCString(final String... strings) {
413 for (final String string : strings) {
414 // writeBytes(string.getBytes(UTF8));
415 try {
416 myStringEncoder.encode(string, this);
417 }
418 catch (final IOException cannotHappen) {
419 // We never throw so should not throw from the encoder.
420 throw new IllegalStateException(
421 "Encoder should not throw when writing to a buffer.");
422 }
423 }
424 writeByte((byte) 0);
425 }
426
427 /**
428 * Write the integer value in little-endian byte order.
429 *
430 * @param value
431 * The integer to write.
432 */
433 public void writeInt(final int value) {
434 myIntegerBytes[0] = (byte) (value & 0xFF);
435 myIntegerBytes[1] = (byte) ((value >> 8) & 0xFF);
436 myIntegerBytes[2] = (byte) ((value >> 16) & 0xFF);
437 myIntegerBytes[3] = (byte) ((value >> 24) & 0xFF);
438
439 write(myIntegerBytes, 0, 4);
440 }
441
442 /**
443 * Write the integer value in little-endian byte order at the specified
444 * position in the stream.
445 *
446 * @param position
447 * The position in the stream to write the integer.
448 * @param value
449 * The long to write.
450 */
451 public void writeIntAt(final long position, final int value) {
452 myIntegerBytes[0] = (byte) (value & 0xFF);
453 myIntegerBytes[1] = (byte) ((value >> 8) & 0xFF);
454 myIntegerBytes[2] = (byte) ((value >> 16) & 0xFF);
455 myIntegerBytes[3] = (byte) ((value >> 24) & 0xFF);
456
457 writeAt(position, myIntegerBytes, 0, 4);
458 }
459
460 /**
461 * Write the long value in little-endian byte order.
462 *
463 * @param value
464 * The long to write.
465 */
466 public void writeLong(final long value) {
467 myIntegerBytes[0] = (byte) (value & 0xFF);
468 myIntegerBytes[1] = (byte) ((value >> 8) & 0xFF);
469 myIntegerBytes[2] = (byte) ((value >> 16) & 0xFF);
470 myIntegerBytes[3] = (byte) ((value >> 24) & 0xFF);
471 myIntegerBytes[4] = (byte) ((value >> 32) & 0xFF);
472 myIntegerBytes[5] = (byte) ((value >> 40) & 0xFF);
473 myIntegerBytes[6] = (byte) ((value >> 48) & 0xFF);
474 myIntegerBytes[7] = (byte) ((value >> 56) & 0xFF);
475
476 write(myIntegerBytes, 0, 8);
477 }
478
479 /**
480 * Writes a "string" to the stream.
481 *
482 * @param string
483 * The String to write.
484 */
485 public void writeString(final String string) {
486 // final byte[] bytes = string.getBytes(UTF8);
487 //
488 // writeInt(bytes.length + 1);
489 // writeBytes(bytes);
490 // writeByte((byte) 0);
491
492 final long position = getPosition();
493 writeInt(0); // For size.
494 try {
495 myStringEncoder.encode(string, this);
496 }
497 catch (final IOException cannotHappen) {
498 // We never throw so should not throw from the encoder.
499 throw new IllegalStateException(
500 "Encoder should not throw when writing to a buffer.");
501 }
502 writeByte((byte) 0);
503
504 final int size = (int) (getPosition() - position - 4);
505 writeIntAt(position, size);
506
507 }
508
509 /**
510 * Writes the complete contents of this byte array output stream to the
511 * specified output stream argument, as if by calling the output stream's
512 * write method using <code>out.write(buf, 0, count)</code>.
513 *
514 * @param out
515 * the output stream to which to write the data.
516 * @exception IOException
517 * if an I/O error occurs.
518 */
519 public void writeTo(final OutputStream out) throws IOException {
520 for (int i = 0; i < myCurrentBufferIndex; ++i) {
521 out.write(myBuffers.get(i), 0, BUFFER_SIZE);
522 }
523 out.write(myCurrentBuffer, 0, myCurrentBufferOffset);
524 }
525
526 /**
527 * Allocates a new buffer to use.
528 */
529 protected void nextBuffer() {
530 // Need a new buffer.
531 myCurrentBufferIndex += 1;
532
533 if (myCurrentBufferIndex < myBuffers.size()) {
534 myCurrentBuffer = myBuffers.get(myCurrentBufferIndex);
535 }
536 else {
537 myCurrentBuffer = new byte[BUFFER_SIZE];
538 myBuffers.add(myCurrentBuffer);
539 }
540
541 myCurrentBufferOffset = 0;
542 }
543 }