Coverage Report - com.allanbank.mongodb.bson.io.RandomAccessOutputStream
 
Classes in this File Line Coverage Branch Coverage Complexity
RandomAccessOutputStream
92%
131/142
97%
45/46
2.269
 
 1  
 /*
 2  
  * #%L
 3  
  * RandomAccessOutputStream.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 package com.allanbank.mongodb.bson.io;
 21  
 
 22  
 import java.io.IOException;
 23  
 import java.io.OutputStream;
 24  
 import java.nio.charset.Charset;
 25  
 import java.util.ArrayList;
 26  
 import java.util.List;
 27  
 
 28  
 /**
 29  
  * Provides a capability similar to the <tt>ByteArrayOutputStream</tt> but also
 30  
  * provides the ability to re-write portions of the stream already written and
 31  
  * to determine the current size (or position) of the written data.
 32  
  * <p>
 33  
  * Instead of allocating a single large byte array this implementation tracks a
 34  
  * group of (increasing in size) buffers. This should reduce the runtime cost of
 35  
  * buffer reallocations since it avoids the copy of contents from one buffer to
 36  
  * another.
 37  
  * </p>
 38  
  * 
 39  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 40  
  *         mutated in incompatible ways between any two releases of the driver.
 41  
  * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved
 42  
  */
 43  
 public class RandomAccessOutputStream extends OutputStream {
 44  
     /** UTF-8 Character set for encoding strings. */
 45  1
     public final static Charset UTF8 = StringDecoder.UTF8;
 46  
 
 47  
     /** The maximum size buffer to allocate. Must be a power of 2. */
 48  
     private static final int BUFFER_SIZE;
 49  
 
 50  
     /** Mask for the buffer position within a specific buffer. */
 51  
     private static final int BUFFER_SIZE_MASK;
 52  
 
 53  
     /** The number of bits in the buffer size. */
 54  
     private static final int BUFFER_SIZE_SHIFT;
 55  
 
 56  
     static {
 57  1
         BUFFER_SIZE = Integer.highestOneBit(8192);
 58  1
         BUFFER_SIZE_MASK = BUFFER_SIZE - 1;
 59  1
         BUFFER_SIZE_SHIFT = Integer.numberOfTrailingZeros(BUFFER_SIZE);
 60  1
     }
 61  
 
 62  
     /** The set of buffers allocated. */
 63  
     private final List<byte[]> myBuffers;
 64  
 
 65  
     /** The current buffer being written. */
 66  
     private byte[] myCurrentBuffer;
 67  
 
 68  
     /** The index of the current buffer. */
 69  
     private int myCurrentBufferIndex;
 70  
 
 71  
     /** The offset into the current buffer. */
 72  
     private int myCurrentBufferOffset;
 73  
 
 74  
     /**
 75  
      * Buffer for serialization of integer types. Not needed for normal integer
 76  
      * writes since the {@link RandomAccessOutputStream} will coalesce the
 77  
      * single byte writes but for the {@link RandomAccessOutputStream#writeAt}
 78  
      * operation a seek to the appropriate backing buffer is required. For large
 79  
      * documents the seeks could be significant. This buffer ensures there is
 80  
      * only 1 seek for each {@link #writeIntAt(long, int)}.
 81  
      */
 82  
     private final byte[] myIntegerBytes;
 83  
 
 84  
     /** The current buffer being written. */
 85  
     private long mySize;
 86  
 
 87  
     /** The offset into the current buffer. */
 88  
     private final StringEncoder myStringEncoder;
 89  
 
 90  
     /**
 91  
      * Creates a new {@link RandomAccessOutputStream}.
 92  
      */
 93  
     public RandomAccessOutputStream() {
 94  11
         this(new StringEncoderCache());
 95  11
     }
 96  
 
 97  
     /**
 98  
      * Creates a new {@link RandomAccessOutputStream}.
 99  
      * 
 100  
      * @param cache
 101  
      *            The cache for encoding string.
 102  
      */
 103  130
     public RandomAccessOutputStream(final StringEncoderCache cache) {
 104  130
         mySize = 0;
 105  130
         myCurrentBufferOffset = 0;
 106  130
         myCurrentBufferIndex = 0;
 107  130
         myCurrentBuffer = new byte[BUFFER_SIZE];
 108  
 
 109  130
         myStringEncoder = new StringEncoder(cache);
 110  
 
 111  130
         myBuffers = new ArrayList<byte[]>();
 112  130
         myBuffers.add(myCurrentBuffer);
 113  
 
 114  130
         myIntegerBytes = new byte[8];
 115  130
     }
 116  
 
 117  
     /**
 118  
      * {@inheritDoc}
 119  
      */
 120  
     @Override
 121  
     public void close() {
 122  
         // Nothing.
 123  11
     }
 124  
 
 125  
     /**
 126  
      * {@inheritDoc}
 127  
      */
 128  
     @Override
 129  
     public void flush() {
 130  
         // Nothing.
 131  1
     }
 132  
 
 133  
     /**
 134  
      * Returns the maximum number of strings that may have their encoded form
 135  
      * cached.
 136  
      * 
 137  
      * @return The maximum number of strings that may have their encoded form
 138  
      *         cached.
 139  
      * @deprecated The cache {@link StringEncoderCache} should be controlled
 140  
      *             directory. This method will be removed after the 2.1.0
 141  
      *             release.
 142  
      */
 143  
     @Deprecated
 144  
     public int getMaxCachedStringEntries() {
 145  0
         return myStringEncoder.getCache().getMaxCacheEntries();
 146  
     }
 147  
 
 148  
     /**
 149  
      * Returns the maximum length for a string that the stream is allowed to
 150  
      * cache.
 151  
      * 
 152  
      * @return The maximum length for a string that the stream is allowed to
 153  
      *         cache.
 154  
      * @deprecated The cache {@link StringEncoderCache} should be controlled
 155  
      *             directory. This method will be removed after the 2.1.0
 156  
      *             release.
 157  
      */
 158  
     @Deprecated
 159  
     public int getMaxCachedStringLength() {
 160  0
         return myStringEncoder.getCache().getMaxCacheLength();
 161  
     }
 162  
 
 163  
     /**
 164  
      * Returns the current position in the stream. This is equivalent to
 165  
      * {@link #getSize()}.
 166  
      * 
 167  
      * @return The current position in the stream.
 168  
      */
 169  
     public long getPosition() {
 170  69895
         return getSize();
 171  
     }
 172  
 
 173  
     /**
 174  
      * Returns the number of bytes written to the stream.
 175  
      * 
 176  
      * @return The current number of bytes written to the stream.
 177  
      */
 178  
     public long getSize() {
 179  102691
         return mySize;
 180  
     }
 181  
 
 182  
     /**
 183  
      * Resets the <code>size</code> of the buffer to zero. All buffers can be
 184  
      * re-used.
 185  
      */
 186  
     public void reset() {
 187  334
         mySize = 0;
 188  334
         myCurrentBufferOffset = 0;
 189  334
         myCurrentBufferIndex = 0;
 190  334
         myCurrentBuffer = myBuffers.get(0);
 191  334
     }
 192  
 
 193  
     /**
 194  
      * Sets the value of maximum number of strings that may have their encoded
 195  
      * form cached.
 196  
      * 
 197  
      * @param maxCacheEntries
 198  
      *            The new value for the maximum number of strings that may have
 199  
      *            their encoded form cached.
 200  
      * @deprecated The cache {@link StringEncoderCache} should be controlled
 201  
      *             directory. This method will be removed after the 2.1.0
 202  
      *             release.
 203  
      */
 204  
     @Deprecated
 205  
     public void setMaxCachedStringEntries(final int maxCacheEntries) {
 206  0
         myStringEncoder.getCache().setMaxCacheEntries(maxCacheEntries);
 207  0
     }
 208  
 
 209  
     /**
 210  
      * Sets the value of length for a string that the stream is allowed to cache
 211  
      * to the new value. This can be used to stop a single long string from
 212  
      * pushing useful values out of the cache.
 213  
      * 
 214  
      * @param maxlength
 215  
      *            The new value for the length for a string that the encoder is
 216  
      *            allowed to cache.
 217  
      * @deprecated The cache {@link StringEncoderCache} should be controlled
 218  
      *             directory. This method will be removed after the 2.1.0
 219  
      *             release.
 220  
      */
 221  
     @Deprecated
 222  
     public void setMaxCachedStringLength(final int maxlength) {
 223  0
         myStringEncoder.getCache().setMaxCacheLength(maxlength);
 224  
 
 225  0
     }
 226  
 
 227  
     /**
 228  
      * {@inheritDoc}
 229  
      * 
 230  
      * @param buffer
 231  
      *            the data.
 232  
      */
 233  
     @Override
 234  
     public void write(final byte buffer[]) {
 235  32303
         write(buffer, 0, buffer.length);
 236  32303
     }
 237  
 
 238  
     /**
 239  
      * {@inheritDoc}
 240  
      * 
 241  
      * @param buffer
 242  
      *            the data.
 243  
      * @param offset
 244  
      *            the start offset in the data.
 245  
      * @param length
 246  
      *            the number of bytes to write.
 247  
      */
 248  
     @Override
 249  
     public void write(final byte buffer[], final int offset, final int length) {
 250  164651
         if (buffer == null) {
 251  1
             throw new NullPointerException();
 252  
         }
 253  164650
         else if ((offset < 0) || (offset > buffer.length) || (length < 0)
 254  
                 || ((offset + length) > buffer.length)
 255  
                 || ((offset + length) < 0)) {
 256  5
             throw new IndexOutOfBoundsException();
 257  
         }
 258  164645
         else if (length == 0) {
 259  0
             return;
 260  
         }
 261  
 
 262  164645
         int wrote = 0;
 263  333340
         while (wrote < length) {
 264  168695
             if (myCurrentBuffer.length <= myCurrentBufferOffset) {
 265  4072
                 nextBuffer();
 266  
             }
 267  
 
 268  168695
             final int available = myCurrentBuffer.length
 269  
                     - myCurrentBufferOffset;
 270  168695
             final int toWrite = Math.min(length - wrote, available);
 271  
 
 272  168695
             System.arraycopy(buffer, offset + wrote, myCurrentBuffer,
 273  
                     myCurrentBufferOffset, toWrite);
 274  
 
 275  168695
             myCurrentBufferOffset += toWrite;
 276  168695
             mySize += toWrite;
 277  168695
             wrote += toWrite;
 278  168695
         }
 279  164645
     }
 280  
 
 281  
     /**
 282  
      * {@inheritDoc}
 283  
      */
 284  
     @Override
 285  
     public void write(final int b) {
 286  240879
         if (myCurrentBuffer.length <= myCurrentBufferOffset) {
 287  24
             nextBuffer();
 288  
         }
 289  
 
 290  240879
         myCurrentBuffer[myCurrentBufferOffset] = (byte) b;
 291  240879
         myCurrentBufferOffset += 1;
 292  240879
         mySize += 1;
 293  240879
     }
 294  
 
 295  
     /**
 296  
      * Similar to {@link #write(byte[])} but allows a portion of the already
 297  
      * written buffer to be re-written.
 298  
      * <p>
 299  
      * Equivalent to <code>writeAt(position, buffer, 0, buffer.length);</code>.
 300  
      * </p>
 301  
      * 
 302  
      * @param position
 303  
      *            The position to write at. This location should have already
 304  
      *            been written.
 305  
      * @param buffer
 306  
      *            the data.
 307  
      */
 308  
     public void writeAt(final long position, final byte buffer[]) {
 309  5
         writeAt(position, buffer, 0, buffer.length);
 310  5
     }
 311  
 
 312  
     /**
 313  
      * Similar to {@link #write(byte[], int, int)} but allows a portion of the
 314  
      * already written buffer to be re-written.
 315  
      * 
 316  
      * @param position
 317  
      *            The position to write at. This location should have already
 318  
      *            been written.
 319  
      * @param buffer
 320  
      *            the data.
 321  
      * @param offset
 322  
      *            the start offset in the data.
 323  
      * @param length
 324  
      *            the number of bytes to write.
 325  
      */
 326  
     public void writeAt(final long position, final byte buffer[],
 327  
             final int offset, final int length) {
 328  32801
         if (buffer == null) {
 329  1
             throw new NullPointerException();
 330  
         }
 331  32800
         else if ((offset < 0) || (offset > buffer.length) || (length < 0)
 332  
                 || ((offset + length) > buffer.length)
 333  
                 || ((offset + length) < 0) || ((position + length) > getSize())) {
 334  6
             throw new IndexOutOfBoundsException();
 335  
         }
 336  32794
         else if (length == 0) {
 337  11
             return;
 338  
         }
 339  
 
 340  
         // Find the start buffer.
 341  32783
         final long start = position & BUFFER_SIZE_MASK;
 342  32783
         int bufferIndex = (int) (position >> BUFFER_SIZE_SHIFT);
 343  32783
         byte[] internalBuffer = myBuffers.get(bufferIndex);
 344  
 
 345  
         // Write into the correct position.
 346  32783
         int wrote = 0;
 347  32783
         int internalOffset = (int) start;
 348  65580
         while (wrote < length) {
 349  32797
             if (internalBuffer.length <= internalOffset) {
 350  14
                 bufferIndex += 1;
 351  14
                 internalBuffer = myBuffers.get(bufferIndex);
 352  14
                 internalOffset = 0;
 353  
             }
 354  
 
 355  32797
             final int available = internalBuffer.length - internalOffset;
 356  32797
             final int toWrite = Math.min(length - wrote, available);
 357  
 
 358  32797
             System.arraycopy(buffer, offset + wrote, internalBuffer,
 359  
                     internalOffset, toWrite);
 360  
 
 361  32797
             internalOffset += toWrite;
 362  32797
             wrote += toWrite;
 363  32797
         }
 364  32783
     }
 365  
 
 366  
     /**
 367  
      * Similar to {@link #write(int)} but allows a portion of the already
 368  
      * written buffer to be re-written.
 369  
      * 
 370  
      * @param position
 371  
      *            The position to write at. This location should have already
 372  
      *            been written.
 373  
      * @param b
 374  
      *            The byte value to write.
 375  
      */
 376  
     public void writeAt(final long position, final int b) {
 377  
         // Find the start buffer.
 378  4352
         final long start = position & BUFFER_SIZE_MASK;
 379  4352
         final int bufferIndex = (int) (position >> BUFFER_SIZE_SHIFT);
 380  4352
         final byte[] internalBuffer = myBuffers.get(bufferIndex);
 381  
 
 382  4352
         internalBuffer[(int) start] = (byte) b;
 383  4352
     }
 384  
 
 385  
     /**
 386  
      * Writes a single byte to the stream.
 387  
      * 
 388  
      * @param b
 389  
      *            The byte to write.
 390  
      */
 391  
     public void writeByte(final byte b) {
 392  193811
         write(b & 0xFF);
 393  193811
     }
 394  
 
 395  
     /**
 396  
      * Writes a sequence of bytes to the under lying stream.
 397  
      * 
 398  
      * @param data
 399  
      *            The bytes to write.
 400  
      */
 401  
     public void writeBytes(final byte[] data) {
 402  32012
         write(data);
 403  32012
     }
 404  
 
 405  
     /**
 406  
      * Writes a "Cstring" to the stream.
 407  
      * 
 408  
      * @param strings
 409  
      *            The CString to write. The strings are concatenated into a
 410  
      *            single CString value.
 411  
      */
 412  
     public void writeCString(final String... strings) {
 413  130322
         for (final String string : strings) {
 414  
             // writeBytes(string.getBytes(UTF8));
 415  
             try {
 416  65486
                 myStringEncoder.encode(string, this);
 417  
             }
 418  0
             catch (final IOException cannotHappen) {
 419  
                 // We never throw so should not throw from the encoder.
 420  0
                 throw new IllegalStateException(
 421  
                         "Encoder should not throw when writing to a buffer.");
 422  65486
             }
 423  
         }
 424  64836
         writeByte((byte) 0);
 425  64836
     }
 426  
 
 427  
     /**
 428  
      * Write the integer value in little-endian byte order.
 429  
      * 
 430  
      * @param value
 431  
      *            The integer to write.
 432  
      */
 433  
     public void writeInt(final int value) {
 434  67035
         myIntegerBytes[0] = (byte) (value & 0xFF);
 435  67035
         myIntegerBytes[1] = (byte) ((value >> 8) & 0xFF);
 436  67035
         myIntegerBytes[2] = (byte) ((value >> 16) & 0xFF);
 437  67035
         myIntegerBytes[3] = (byte) ((value >> 24) & 0xFF);
 438  
 
 439  67035
         write(myIntegerBytes, 0, 4);
 440  67035
     }
 441  
 
 442  
     /**
 443  
      * Write the integer value in little-endian byte order at the specified
 444  
      * position in the stream.
 445  
      * 
 446  
      * @param position
 447  
      *            The position in the stream to write the integer.
 448  
      * @param value
 449  
      *            The long to write.
 450  
      */
 451  
     public void writeIntAt(final long position, final int value) {
 452  32768
         myIntegerBytes[0] = (byte) (value & 0xFF);
 453  32768
         myIntegerBytes[1] = (byte) ((value >> 8) & 0xFF);
 454  32768
         myIntegerBytes[2] = (byte) ((value >> 16) & 0xFF);
 455  32768
         myIntegerBytes[3] = (byte) ((value >> 24) & 0xFF);
 456  
 
 457  32768
         writeAt(position, myIntegerBytes, 0, 4);
 458  32768
     }
 459  
 
 460  
     /**
 461  
      * Write the long value in little-endian byte order.
 462  
      * 
 463  
      * @param value
 464  
      *            The long to write.
 465  
      */
 466  
     public void writeLong(final long value) {
 467  43
         myIntegerBytes[0] = (byte) (value & 0xFF);
 468  43
         myIntegerBytes[1] = (byte) ((value >> 8) & 0xFF);
 469  43
         myIntegerBytes[2] = (byte) ((value >> 16) & 0xFF);
 470  43
         myIntegerBytes[3] = (byte) ((value >> 24) & 0xFF);
 471  43
         myIntegerBytes[4] = (byte) ((value >> 32) & 0xFF);
 472  43
         myIntegerBytes[5] = (byte) ((value >> 40) & 0xFF);
 473  43
         myIntegerBytes[6] = (byte) ((value >> 48) & 0xFF);
 474  43
         myIntegerBytes[7] = (byte) ((value >> 56) & 0xFF);
 475  
 
 476  43
         write(myIntegerBytes, 0, 8);
 477  43
     }
 478  
 
 479  
     /**
 480  
      * Writes a "string" to the stream.
 481  
      * 
 482  
      * @param string
 483  
      *            The String to write.
 484  
      */
 485  
     public void writeString(final String string) {
 486  
         // final byte[] bytes = string.getBytes(UTF8);
 487  
         //
 488  
         // writeInt(bytes.length + 1);
 489  
         // writeBytes(bytes);
 490  
         // writeByte((byte) 0);
 491  
 
 492  75
         final long position = getPosition();
 493  75
         writeInt(0); // For size.
 494  
         try {
 495  75
             myStringEncoder.encode(string, this);
 496  
         }
 497  0
         catch (final IOException cannotHappen) {
 498  
             // We never throw so should not throw from the encoder.
 499  0
             throw new IllegalStateException(
 500  
                     "Encoder should not throw when writing to a buffer.");
 501  75
         }
 502  75
         writeByte((byte) 0);
 503  
 
 504  75
         final int size = (int) (getPosition() - position - 4);
 505  75
         writeIntAt(position, size);
 506  
 
 507  75
     }
 508  
 
 509  
     /**
 510  
      * Writes the complete contents of this byte array output stream to the
 511  
      * specified output stream argument, as if by calling the output stream's
 512  
      * write method using <code>out.write(buf, 0, count)</code>.
 513  
      * 
 514  
      * @param out
 515  
      *            the output stream to which to write the data.
 516  
      * @exception IOException
 517  
      *                if an I/O error occurs.
 518  
      */
 519  
     public void writeTo(final OutputStream out) throws IOException {
 520  4438
         for (int i = 0; i < myCurrentBufferIndex; ++i) {
 521  4096
             out.write(myBuffers.get(i), 0, BUFFER_SIZE);
 522  
         }
 523  342
         out.write(myCurrentBuffer, 0, myCurrentBufferOffset);
 524  342
     }
 525  
 
 526  
     /**
 527  
      * Allocates a new buffer to use.
 528  
      */
 529  
     protected void nextBuffer() {
 530  
         // Need a new buffer.
 531  4096
         myCurrentBufferIndex += 1;
 532  
 
 533  4096
         if (myCurrentBufferIndex < myBuffers.size()) {
 534  2046
             myCurrentBuffer = myBuffers.get(myCurrentBufferIndex);
 535  
         }
 536  
         else {
 537  2050
             myCurrentBuffer = new byte[BUFFER_SIZE];
 538  2050
             myBuffers.add(myCurrentBuffer);
 539  
         }
 540  
 
 541  4096
         myCurrentBufferOffset = 0;
 542  4096
     }
 543  
 }