1 /*
2 * #%L
3 * GridFs.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20
21 package com.allanbank.mongodb.gridfs;
22
23 import static com.allanbank.mongodb.builder.QueryBuilder.where;
24 import static com.allanbank.mongodb.builder.Sort.asc;
25
26 import java.io.FileNotFoundException;
27 import java.io.IOException;
28 import java.io.InputStream;
29 import java.io.InterruptedIOException;
30 import java.io.OutputStream;
31 import java.security.MessageDigest;
32 import java.security.NoSuchAlgorithmException;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.HashMap;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.Future;
40
41 import com.allanbank.mongodb.Durability;
42 import com.allanbank.mongodb.MongoCollection;
43 import com.allanbank.mongodb.MongoDatabase;
44 import com.allanbank.mongodb.MongoDbException;
45 import com.allanbank.mongodb.MongoDbUri;
46 import com.allanbank.mongodb.MongoFactory;
47 import com.allanbank.mongodb.MongoIterator;
48 import com.allanbank.mongodb.bson.Document;
49 import com.allanbank.mongodb.bson.Element;
50 import com.allanbank.mongodb.bson.NumericElement;
51 import com.allanbank.mongodb.bson.builder.BuilderFactory;
52 import com.allanbank.mongodb.bson.builder.DocumentBuilder;
53 import com.allanbank.mongodb.bson.element.BinaryElement;
54 import com.allanbank.mongodb.bson.element.ObjectId;
55 import com.allanbank.mongodb.bson.element.StringElement;
56 import com.allanbank.mongodb.builder.Find;
57 import com.allanbank.mongodb.builder.Index;
58 import com.allanbank.mongodb.util.IOUtils;
59
60 /**
61 * GridFs provides an interface for working with a GridFS collection.
62 * <p>
63 * This implementation uses a {@link ObjectId} as the id when writing and stores
64 * the name of the file in the files collection document's "filename" field. To
65 * {@link #unlink(String)} or {@link #read(String, OutputStream)} a file from
66 * the collection the _id field may contain any value but the filename field
67 * must be present.
68 * </p>
69 *
70 * @api.yes This class is part of the driver's API. Public and protected members
71 * will be deprecated for at least 1 non-bugfix release (version
72 * numbers are <major>.<minor>.<bugfix>) before being
73 * removed or modified.
74 * @copyright 2012-2013, Allanbank Consulting, Inc., All Rights Reserved
75 */
76 public class GridFs {
77
78 /**
79 * The field in the {@link #CHUNKS_SUFFIX chunks} collection containing the
80 * chunk's number.
81 */
82 public static final String CHUNK_NUMBER_FIELD = "n";
83
84 /** The amount of overhead in a chunk document in bytes: {@value} */
85 public static final int CHUNK_OVERHEAD = 62;
86
87 /**
88 * The field in the {@link #FILES_SUFFIX files} collection containing the
89 * file's chunk size.
90 */
91 public static final String CHUNK_SIZE_FIELD = "chunkSize";
92
93 /** The suffix for the chunks collection. */
94 public static final String CHUNKS_SUFFIX = ".chunks";
95
96 /**
97 * The field in the {@link #CHUNKS_SUFFIX chunks} collection containing the
98 * chunk's data.
99 */
100 public static final String DATA_FIELD = "data";
101
102 /**
103 * The default chunk size. This is slightly less than 256K to allow for the
104 * {@link #CHUNK_OVERHEAD} when using the power of two allocator.
105 */
106 public static final int DEFAULT_CHUNK_SIZE;
107
108 /** The suffix for the files collection. */
109 public static final String DEFAULT_ROOT = "fs";
110
111 /**
112 * The field in the {@link #FILES_SUFFIX files} collection containing the
113 * file's name.
114 */
115 public static final String FILENAME_FIELD = "filename";
116
117 /**
118 * The field in the {@link #CHUNKS_SUFFIX chunks} collection containing the
119 * chunk's related file id.
120 */
121 public static final String FILES_ID_FIELD = "files_id";
122
123 /** The suffix for the files collection. */
124 public static final String FILES_SUFFIX = ".files";
125
126 /** The {@code _id} field name. */
127 public static final String ID_FIELD = "_id";
128
129 /**
130 * The field in the {@link #FILES_SUFFIX files} collection containing the
131 * file's length.
132 */
133 public static final String LENGTH_FIELD = "length";
134
135 /**
136 * The field in the {@link #FILES_SUFFIX files} collection containing the
137 * file's MD5.
138 */
139 public static final String MD5_FIELD = "md5";
140
141 /**
142 * The field in the {@link #FILES_SUFFIX files} collection containing the
143 * file's upload date.
144 */
145 public static final String UPLOAD_DATE_FIELD = "uploadDate";
146
147 static {
148 DEFAULT_CHUNK_SIZE = (256 * 1024) - CHUNK_OVERHEAD;
149 }
150
151 /** The GridFS chunks collection. */
152 private final MongoCollection myChunksCollection;
153
154 /** The size for a chunk written. */
155 private int myChunkSize = DEFAULT_CHUNK_SIZE;
156
157 /** The GridFS database. */
158 private final MongoDatabase myDatabase;
159
160 /** The GridFS files collection. */
161 private final MongoCollection myFilesCollection;
162
163 /** The root name for the GridFS collections. */
164 private final String myRootName;
165
166 /**
167 * Creates a new GridFs.
168 * <p>
169 * The GridFS objects will be stored in the 'fs' collection.
170 * </p>
171 *
172 * @param database
173 * The database containing the GridFS collections.
174 */
175 public GridFs(final MongoDatabase database) {
176 this(database, DEFAULT_ROOT);
177 }
178
179 /**
180 * Creates a new GridFs.
181 *
182 *
183 * @param database
184 * The database containing the GridFS collections.
185 * @param rootName
186 * The rootName for the collections. The {@link #FILES_SUFFIX}
187 * and {@link #CHUNKS_SUFFIX} will be appended to create the two
188 * collection names.
189 */
190 public GridFs(final MongoDatabase database, final String rootName) {
191 myRootName = rootName;
192 myDatabase = database;
193 myFilesCollection = database.getCollection(rootName + FILES_SUFFIX);
194 myChunksCollection = database.getCollection(rootName + CHUNKS_SUFFIX);
195 }
196
197 /**
198 * Creates a new GridFs.
199 *
200 * @param mongoDbUri
201 * The configuration for the connection to MongoDB expressed as a
202 * MongoDB URL.
203 * @throws IllegalArgumentException
204 * If the <tt>mongoDbUri</tt> is not a properly formated MongoDB
205 * style URL.
206 *
207 * @see <a href="http://www.mongodb.org/display/DOCS/Connections"> MongoDB
208 * Connections</a>
209 */
210 public GridFs(final String mongoDbUri) {
211 this(mongoDbUri, DEFAULT_ROOT);
212 }
213
214 /**
215 * Creates a new GridFs.
216 *
217 * @param mongoDbUri
218 * The configuration for the connection to MongoDB expressed as a
219 * MongoDB URL.
220 * @param rootName
221 * The rootName for the collections. The {@link #FILES_SUFFIX}
222 * and {@link #CHUNKS_SUFFIX} will be appended to create the two
223 * collection names.
224 * @throws IllegalArgumentException
225 * If the <tt>mongoDbUri</tt> is not a properly formated MongoDB
226 * style URL.
227 *
228 * @see <a href="http://www.mongodb.org/display/DOCS/Connections"> MongoDB
229 * Connections</a>
230 */
231 public GridFs(final String mongoDbUri, final String rootName) {
232 final MongoDbUri uri = new MongoDbUri(mongoDbUri);
233
234 final MongoDatabase database = MongoFactory.createClient(uri)
235 .getDatabase(uri.getDatabase());
236
237 myRootName = rootName;
238 myDatabase = database;
239 myFilesCollection = database.getCollection(rootName + FILES_SUFFIX);
240 myChunksCollection = database.getCollection(rootName + CHUNKS_SUFFIX);
241 }
242
243 /**
244 * Creates the following indexes:
245 * <ul>
246 * <li>
247 * Files Collection:
248 * <ul>
249 * <li><code>{ 'filename' : 1, 'uploadDate' : 1 }</code></li>
250 * </ul>
251 * </li>
252 * <li>
253 * Chunks Collection:
254 * <ul>
255 * <li><code>{ 'files_id' : 1, 'n' : 1 }</code></li>
256 * </ul>
257 * </li>
258 * </ul>
259 * If in a non-sharded environment the indexes will be unique.
260 */
261 public void createIndexes() {
262 try {
263 myFilesCollection.createIndex(true, Index.asc(FILENAME_FIELD),
264 Index.asc(UPLOAD_DATE_FIELD));
265 }
266 catch (final MongoDbException error) {
267 // Can't be unique in a sharded environment.
268 myFilesCollection.createIndex(false, Index.asc(FILENAME_FIELD),
269 Index.asc(UPLOAD_DATE_FIELD));
270 }
271
272 try {
273 myChunksCollection.createIndex(true, Index.asc(FILES_ID_FIELD),
274 Index.asc(CHUNK_NUMBER_FIELD));
275 }
276 catch (final MongoDbException error) {
277 // Can't be unique in a sharded environment.
278 myChunksCollection.createIndex(false, Index.asc(FILES_ID_FIELD),
279 Index.asc(CHUNK_NUMBER_FIELD));
280 }
281 }
282
283 /**
284 * Validates and optionally tries to repair the GridFS collections.
285 * <ul>
286 * <li>
287 * Ensure the following indexes exist:
288 * <ul>
289 * <li>
290 * Files Collection:
291 * <ul>
292 * <li><code>{ 'filename' : 1, 'uploadDate' : 1 }</code></li>
293 * </ul>
294 * </li>
295 * <li>
296 * Chunks Collection:
297 * <ul>
298 * <li><code>{ 'files_id' : 1, 'n' : 1 }</code></li>
299 * </ul>
300 * </li>
301 * </ul>
302 * </li>
303 * <li>
304 * Ensure there are no duplicate {@code n} values for the chunks of a file.
305 * If {@code repair} is true then the {@code n} values will be updated to be
306 * sequential based on the ordering <tt>{ 'n' : 1, '_id' 1 }</tt>.</li>
307 * <li>
308 * Validates the MD5 sum for each file via the <a
309 * href="http://docs.mongodb.org/manual/reference/command/filemd5"
310 * >filemd5</a> command.</li>
311 * </ul>
312 * <p>
313 * <b>Warning:</b> This function iterates over every file in the GridFS
314 * collection and can take a considerable amount of time and resources on
315 * the client and the server.
316 * </p>
317 * <p>
318 * <b>Note:</b> Due to a limitation in the MongoDB server this method will
319 * return false positives when used with a sharded cluster when the shard
320 * key for the chunks collection is not one of <code>{files_id:1}</code> or
321 * <code>{files_id:1, n:1}</code>. See <a
322 * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>.
323 * </p>
324 *
325 * @param repair
326 * If set to <code>true</code> then the fsck will attempt to
327 * repair common errors.
328 * @return A map of the file ids to the errors found for the file and the
329 * repair status. If no errors are found an empty map is returned.
330 * @throws IOException
331 * On a failure to execute the fsck.
332 *
333 * @see <a
334 * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>
335 */
336 public Map<Object, List<String>> fsck(final boolean repair)
337 throws IOException {
338
339 final Map<Object, List<String>> faults = new HashMap<Object, List<String>>();
340
341 createIndexes();
342
343 // Use the filemd5 command to locate files to inspect more closely.
344 final MongoIterator<Document> iter = myFilesCollection.find(Find.ALL);
345 try {
346 for (final Document fileDoc : iter) {
347 final Element id = fileDoc.get(ID_FIELD);
348
349 final DocumentBuilder commandDoc = BuilderFactory.start();
350 commandDoc.add(id.withName("filemd5"));
351 commandDoc.add("root", myRootName);
352
353 final Document commandResult = myDatabase.runCommand(commandDoc
354 .build());
355 if (!doVerifyFileMd5(faults, fileDoc, commandResult) && repair) {
356 doTryAndRepair(fileDoc, faults);
357 }
358 }
359 }
360 finally {
361 iter.close();
362 }
363 return faults;
364 }
365
366 /**
367 * Returns the size for a chunk written.
368 *
369 * @return The size for a chunk written.
370 */
371 public int getChunkSize() {
372 return myChunkSize;
373 }
374
375 /**
376 * Reads a file from the GridFS collections and writes the contents to the
377 * {@code sink}
378 *
379 * @param id
380 * The id of the file.
381 * @param sink
382 * The stream to write the data to. This stream will not be
383 * closed by this method.
384 * @throws IOException
385 * On a failure reading the data from MongoDB or writing to the
386 * {@code sink}.
387 */
388 public void read(final ObjectId id, final OutputStream sink)
389 throws IOException {
390 // Find the document with the specified name.
391 final Document fileDoc = myFilesCollection.findOne(where(ID_FIELD)
392 .equals(id));
393 if (fileDoc == null) {
394 throw new FileNotFoundException(id.toString());
395 }
396
397 doRead(fileDoc, sink);
398 }
399
400 /**
401 * Reads a file from the GridFS collections and writes the contents to the
402 * {@code sink}
403 *
404 * @param name
405 * The name of the file.
406 * @param sink
407 * The stream to write the data to. This stream will not be
408 * closed by this method.
409 * @throws IOException
410 * On a failure reading the data from MongoDB or writing to the
411 * {@code sink}.
412 */
413 public void read(final String name, final OutputStream sink)
414 throws IOException {
415
416 // Find the document with the specified name.
417 final Document fileDoc = myFilesCollection
418 .findOne(where(FILENAME_FIELD).equals(name));
419 if (fileDoc == null) {
420 throw new FileNotFoundException(name);
421 }
422
423 doRead(fileDoc, sink);
424 }
425
426 /**
427 * Sets the value of size for a chunk written.
428 *
429 * @param chunkSize
430 * The new value for the size for a chunk written.
431 */
432 public void setChunkSize(final int chunkSize) {
433 myChunkSize = chunkSize;
434 }
435
436 /**
437 * Unlinks (deletes) the file from the GridFS collections.
438 *
439 * @param id
440 * The id of the file to be deleted.
441 * @return True if a file was deleted, false otherwise.
442 * @throws IOException
443 * On a failure to delete the file.
444 */
445 public boolean unlink(final ObjectId id) throws IOException {
446
447 // Find the document with the specified name.
448 final Document fileDoc = myFilesCollection.findOne(where(ID_FIELD)
449 .equals(id));
450 if (fileDoc == null) {
451 return false;
452 }
453
454 return doUnlink(fileDoc);
455 }
456
457 /**
458 * Unlinks (deletes) the file from the GridFS collections.
459 *
460 * @param name
461 * The name of the file to be deleted.
462 * @return True if a file was deleted, false otherwise.
463 * @throws IOException
464 * On a failure to validate the file.
465 */
466 public boolean unlink(final String name) throws IOException {
467
468 // Find the document with the specified name.
469 final Document fileDoc = myFilesCollection
470 .findOne(where(FILENAME_FIELD).equals(name));
471 if (fileDoc == null) {
472 return false;
473 }
474
475 return doUnlink(fileDoc);
476 }
477
478 /**
479 * Validates the file from the GridFS collections using the {@code filemd5}
480 * command.
481 * <p>
482 * <b>Note:</b> Due to a limitation in the MongoDB server this method will
483 * always return <code>false</code> when used with a sharded cluster when
484 * the shard key for the chunks collection is not one of
485 * <code>{files_id:1}</code> or <code>{files_id:1, n:1}</code>. See <a
486 * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>.
487 * </p>
488 *
489 * @param id
490 * The id of the file to be validate.
491 * @return True if a file was validated (md5 hash matches), false otherwise.
492 * @throws IOException
493 * On a failure to validate the file.
494 *
495 * @see <a
496 * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>
497 */
498 public boolean validate(final ObjectId id) throws IOException {
499
500 // Find the document with the specified name.
501 final Document fileDoc = myFilesCollection.findOne(where(ID_FIELD)
502 .equals(id));
503 if (fileDoc == null) {
504 throw new FileNotFoundException(id.toString());
505 }
506
507 return doValidate(fileDoc);
508 }
509
510 /**
511 * Validates the file from the GridFS collections using the {@code filemd5}
512 * command.
513 * <p>
514 * <b>Note:</b> Due to a limitation in the MongoDB server this method will
515 * always return <code>false</code> when used with a sharded cluster when
516 * the shard key for the chunks collection is not one of
517 * <code>{files_id:1}</code> or <code>{files_id:1, n:1}</code>. See <a
518 * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>.
519 * </p>
520 *
521 * @param name
522 * The name of the file to be validate.
523 * @return True if a file was validated (md5 hash matches), false otherwise.
524 * @throws IOException
525 * On a failure to validate the file.
526 *
527 * @see <a
528 * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>
529 */
530 public boolean validate(final String name) throws IOException {
531
532 // Find the document with the specified name.
533 final Document fileDoc = myFilesCollection
534 .findOne(where(FILENAME_FIELD).equals(name));
535 if (fileDoc == null) {
536 throw new FileNotFoundException(name);
537 }
538
539 return doValidate(fileDoc);
540 }
541
542 /**
543 * Attempts to write a file into the GridFS collections using the specified
544 * name for the file and deriving the chunks from the data read from the
545 * <tt>source</tt>.
546 *
547 * @param name
548 * The name of the file being written.
549 * @param source
550 * The source of the bits in the file. This stream will not be
551 * closed.
552 * @return The {@link ObjectId} associted with the file.
553 * @throws IOException
554 * On a failure writing the documents or reading the file
555 * contents. In the case of a failure an attempt is made to
556 * remove the documents written to the collections.
557 */
558 public ObjectId write(final String name, final InputStream source)
559 throws IOException {
560 final ObjectId id = new ObjectId();
561 boolean failed = false;
562 try {
563 final byte[] buffer = new byte[myChunkSize];
564 final MessageDigest md5Digest = MessageDigest.getInstance("MD5");
565
566 final List<Future<Integer>> results = new ArrayList<Future<Integer>>();
567 final DocumentBuilder doc = BuilderFactory.start();
568 int n = 0;
569 long length = 0;
570 int read = readFully(source, buffer);
571 while (read > 0) {
572
573 final ObjectId chunkId = new ObjectId();
574
575 doc.reset();
576 doc.addObjectId(ID_FIELD, chunkId);
577 doc.addObjectId(FILES_ID_FIELD, id);
578 doc.addInteger(CHUNK_NUMBER_FIELD, n);
579
580 final byte[] data = (read == buffer.length) ? buffer : Arrays
581 .copyOf(buffer, read);
582 md5Digest.update(data);
583 doc.addBinary(DATA_FIELD, data);
584
585 results.add(myChunksCollection.insertAsync(doc.build()));
586
587 length += data.length;
588 read = readFully(source, buffer);
589 n += 1;
590 }
591
592 doc.reset();
593 doc.addObjectId(ID_FIELD, id);
594 doc.addString(FILENAME_FIELD, name);
595 doc.addTimestamp(UPLOAD_DATE_FIELD, System.currentTimeMillis());
596 doc.addInteger(CHUNK_SIZE_FIELD, buffer.length);
597 doc.addLong(LENGTH_FIELD, length);
598 doc.addString(MD5_FIELD, IOUtils.toHex(md5Digest.digest()));
599
600 results.add(myFilesCollection.insertAsync(doc.build()));
601
602 // Make sure everything made it to the server.
603 for (final Future<Integer> f : results) {
604 f.get();
605 }
606 }
607 catch (final NoSuchAlgorithmException e) {
608 failed = true;
609 throw new IOException(e);
610 }
611 catch (final InterruptedException e) {
612 failed = true;
613 final InterruptedIOException error = new InterruptedIOException(
614 e.getMessage());
615 error.initCause(e);
616 throw error;
617 }
618 catch (final ExecutionException e) {
619 failed = true;
620 throw new IOException(e.getCause());
621 }
622 finally {
623 if (failed) {
624 myFilesCollection.delete(where(ID_FIELD).equals(id));
625 myChunksCollection.delete(where(FILES_ID_FIELD).equals(id));
626 }
627 }
628
629 return id;
630 }
631
632 /**
633 * Adds a fault message to the faults map.
634 *
635 * @param faults
636 * The map of file ids to the error messages.
637 * @param idObj
638 * The id for the file.
639 * @param message
640 * The message to add.
641 */
642 protected void doAddFault(final Map<Object, List<String>> faults,
643 final Element idObj, final String message) {
644 List<String> docFaults = faults.get(idObj.getValueAsObject());
645 if (docFaults == null) {
646 docFaults = new ArrayList<String>();
647 faults.put(idObj.getValueAsObject(), docFaults);
648 }
649 docFaults.add(message);
650 }
651
652 /**
653 * Reads a file from the GridFS collections and writes the contents to the
654 * {@code sink}
655 *
656 * @param fileDoc
657 * The document for the file.
658 * @param sink
659 * The stream to write the data to. This stream will not be
660 * closed by this method.
661 * @throws IOException
662 * On a failure reading the data from MongoDB or writing to the
663 * {@code sink}.
664 */
665 protected void doRead(final Document fileDoc, final OutputStream sink)
666 throws IOException {
667
668 final Element id = fileDoc.get(ID_FIELD);
669
670 long length = -1;
671 final NumericElement lengthElement = fileDoc.get(NumericElement.class,
672 LENGTH_FIELD);
673 if (lengthElement != null) {
674 length = lengthElement.getLongValue();
675 }
676
677 long chunkSize = -1;
678 final NumericElement chunkSizeElement = fileDoc.get(
679 NumericElement.class, CHUNK_SIZE_FIELD);
680 if (chunkSizeElement != null) {
681 chunkSize = chunkSizeElement.getLongValue();
682 }
683
684 long numberChunks = -1;
685 if ((0 <= length) && (0 < chunkSize)) {
686 numberChunks = (long) Math.ceil((double) length
687 / (double) chunkSize);
688 }
689
690 final Element queryElement = id.withName(FILES_ID_FIELD);
691 final DocumentBuilder queryDoc = BuilderFactory.start();
692 queryDoc.add(queryElement);
693
694 final Find.Builder findBuilder = new Find.Builder(queryDoc.build());
695 findBuilder.setSort(asc(CHUNK_NUMBER_FIELD));
696
697 // Small batch size since the docs are big and we can do parallel I/O.
698 findBuilder.setBatchSize(2);
699
700 long expectedChunk = 0;
701 long totalSize = 0;
702 final MongoIterator<Document> iter = myChunksCollection
703 .find(findBuilder.build());
704 try {
705 for (final Document chunk : iter) {
706
707 final NumericElement n = chunk.get(NumericElement.class,
708 CHUNK_NUMBER_FIELD);
709 final BinaryElement bytes = chunk.get(BinaryElement.class,
710 DATA_FIELD);
711
712 if (n == null) {
713 throw new IOException("Missing chunk number '"
714 + (expectedChunk + 1) + "' of '" + numberChunks
715 + "'.");
716 }
717 else if (n.getLongValue() != expectedChunk) {
718 throw new IOException("Skipped chunk '"
719 + (expectedChunk + 1) + "', retreived '"
720 + n.getLongValue() + "' of '" + numberChunks + "'.");
721 }
722 else if (bytes == null) {
723 throw new IOException("Missing bytes in chunk '"
724 + (expectedChunk + 1) + "' of '" + numberChunks
725 + "'.");
726 }
727 else {
728
729 final byte[] buffer = bytes.getValue();
730
731 sink.write(buffer);
732 expectedChunk += 1;
733 totalSize += buffer.length;
734 }
735 }
736 }
737 finally {
738 iter.close();
739 sink.flush();
740 }
741
742 if ((0 <= numberChunks) && (expectedChunk < numberChunks)) {
743 throw new IOException("Missing chunks after '" + expectedChunk
744 + "' of '" + numberChunks + "'.");
745 }
746 if ((0 <= length) && (totalSize != length)) {
747 throw new IOException("File size mismatch. Expected '" + length
748 + "' but only read '" + totalSize + "' bytes.");
749 }
750 }
751
752 /**
753 * Tries to repair the file.
754 * <p>
755 * Currently the only strategy is to reorder the chunk's into _id order. The
756 * operation verifies that the reorder fixes the file prior to modifying
757 * anything. it also verifies that the reordering worked after reordering
758 * the chunks.
759 *
760 * @param fileDoc
761 * The document representing the file.
762 * @param faults
763 * The map to update with the status of the repair.
764 */
765 protected void doTryAndRepair(final Document fileDoc,
766 final Map<Object, List<String>> faults) {
767 // First see if the MD5 for the file's chunks in _id order returns the
768 // right results.
769 final List<Element> chunkIds = new ArrayList<Element>();
770
771 final Element id = fileDoc.get(ID_FIELD);
772 final Element md5 = fileDoc.get(MD5_FIELD);
773 final Element queryElement = id.withName(FILES_ID_FIELD);
774 final DocumentBuilder queryDoc = BuilderFactory.start().add(
775 queryElement);
776
777 final Find.Builder findBuilder = new Find.Builder(queryDoc.build());
778 findBuilder.setSort(asc(ID_FIELD));
779
780 // Small batch size since the docs are big and we can do parallel I/O.
781 findBuilder.setBatchSize(2);
782
783 MongoIterator<Document> iter = null;
784 try {
785 final MessageDigest md5Digest = MessageDigest.getInstance("MD5");
786 iter = myChunksCollection.find(findBuilder);
787 for (final Document chunkDoc : iter) {
788
789 chunkIds.add(chunkDoc.get(ID_FIELD));
790
791 final BinaryElement chunk = chunkDoc.get(BinaryElement.class,
792 DATA_FIELD);
793 if (chunk != null) {
794 md5Digest.update(chunk.getValue());
795 }
796 }
797
798 final String digest = IOUtils.toHex(md5Digest.digest());
799 final StringElement computed = new StringElement(MD5_FIELD, digest);
800 if (computed.equals(md5)) {
801 // Update the 'n' fields for each chunk to be in the right
802 // order.
803 int n = 0;
804 for (final Element idElement : chunkIds) {
805 final DocumentBuilder query = BuilderFactory.start();
806 query.add(idElement);
807 query.add(queryElement); // Direct to the right shard.
808
809 final DocumentBuilder update = BuilderFactory.start();
810 update.push("$set").add(CHUNK_NUMBER_FIELD, n);
811
812 // Use a multi-update to ensure the write happens when a
813 // files chunks are across shards.
814 myChunksCollection.update(query.build(), update.build(),
815 true /* =multi */, false, Durability.ACK);
816
817 n += 1;
818 }
819
820 if (doValidate(fileDoc)) {
821 doAddFault(faults, id, "File repaired.");
822
823 }
824 else {
825 doAddFault(faults, id,
826 "Repair failed: Chunks reordered but sill not validating.");
827 }
828 }
829 else {
830 doAddFault(faults, id,
831 "Repair failed: Could not determine correct chunk order.");
832 }
833 }
834 catch (final NoSuchAlgorithmException e) {
835 doAddFault(faults, id,
836 "Repair failed: Could not compute the MD5 for the file: "
837 + e.getMessage());
838 }
839 catch (final RuntimeException e) {
840 doAddFault(faults, id, "Potential Repair Failure: Runtime error: "
841 + e.getMessage());
842 }
843 finally {
844 IOUtils.close(iter);
845 }
846 }
847
848 /**
849 * Unlinks (deletes) the file from the GridFS collections.
850 *
851 * @param fileDoc
852 * The document for the file to delete.
853 * @return True if a file was deleted, false otherwise.
854 * @throws IOException
855 * On a failure to delete the file.
856 */
857 protected boolean doUnlink(final Document fileDoc) throws IOException {
858 final Element id = fileDoc.get(ID_FIELD);
859
860 final DocumentBuilder queryDoc = BuilderFactory.start();
861 queryDoc.add(id.withName(FILES_ID_FIELD));
862 final Future<Long> cFuture = myChunksCollection.deleteAsync(queryDoc);
863
864 queryDoc.reset();
865 queryDoc.add(id);
866 final Future<Long> fFuture = myFilesCollection.deleteAsync(queryDoc);
867
868 try {
869 return (cFuture.get().longValue() >= 0)
870 && (fFuture.get().longValue() > 0);
871 }
872 catch (final InterruptedException e) {
873 return false;
874 }
875 catch (final ExecutionException e) {
876 return false;
877 }
878 }
879
880 /**
881 * Validates the file from the GridFS collections using the {@code filemd5}
882 * command.
883 * <p>
884 * <b>Note:</b> Due to a limitation in the MongoDB server this method will
885 * always return <code>false</code> when used with a sharded cluster when
886 * the shard key for the chunks collection is not one of
887 * <code>{files_id:1}</code> or <code>{files_id:1, n:1}</code>. See <a
888 * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>.
889 * </p>
890 *
891 * @param fileDoc
892 * The document for the file to delete.
893 * @return True if a file was deleted, false otherwise.
894 *
895 * @see <a
896 * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>
897 */
898 protected boolean doValidate(final Document fileDoc) {
899 final Element id = fileDoc.get(ID_FIELD);
900 final Element md5 = fileDoc.get(MD5_FIELD);
901
902 final DocumentBuilder commandDoc = BuilderFactory.start();
903 commandDoc.add(id.withName("filemd5"));
904 commandDoc.add("root", myRootName);
905 final Document result = myDatabase.runCommand(commandDoc.build());
906
907 return (md5 != null) && md5.equals(result.findFirst(MD5_FIELD));
908 }
909
910 /**
911 * Verifies the MD5 result for the filemd5 command.
912 *
913 * @param faults
914 * The faults for to update if the verify fails.
915 * @param fileDoc
916 * The document representing the file.
917 * @param cmdResult
918 * The document returned from the 'filemd5' command.
919 * @return True if the file was successful.
920 */
921 protected boolean doVerifyFileMd5(final Map<Object, List<String>> faults,
922 final Document fileDoc, final Document cmdResult) {
923 boolean ok = false;
924 final Element idElement = fileDoc.get(ID_FIELD);
925
926 final Element md5 = fileDoc.get(MD5_FIELD);
927 final Element commandMd5 = cmdResult.findFirst(MD5_FIELD);
928
929 ok = (md5 != null) && md5.equals(commandMd5);
930 if (!ok) {
931 doAddFault(faults, idElement,
932 "MD5 sums do not match. File document contains '" + md5
933 + "' and the filemd5 command produced '"
934 + commandMd5 + "'.");
935 }
936
937 return ok;
938 }
939
940 /**
941 * Read the full contents of the stream until an EOF into the buffer.
942 *
943 * @param source
944 * The source if bytes to read.
945 * @param buffer
946 * The buffer to read into.
947 * @return The number of bytes read. If less than <tt>buffer.length</tt>
948 * then the stream reach the end-of-file.
949 * @throws IOException
950 * On a failure reading from the stream.
951 */
952 private int readFully(final InputStream source, final byte[] buffer)
953 throws IOException {
954
955 int offset = 0;
956
957 while (true) {
958 final int read = source
959 .read(buffer, offset, buffer.length - offset);
960 if (read < 0) {
961 return offset;
962 }
963
964 offset += read;
965
966 if (offset == buffer.length) {
967 return offset;
968 }
969 }
970 }
971 }