Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
GridFs |
|
| 3.909090909090909;3.909 |
1 | /* | |
2 | * #%L | |
3 | * GridFs.java - mongodb-async-driver - Allanbank Consulting, Inc. | |
4 | * %% | |
5 | * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc. | |
6 | * %% | |
7 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
8 | * you may not use this file except in compliance with the License. | |
9 | * You may obtain a copy of the License at | |
10 | * | |
11 | * http://www.apache.org/licenses/LICENSE-2.0 | |
12 | * | |
13 | * Unless required by applicable law or agreed to in writing, software | |
14 | * distributed under the License is distributed on an "AS IS" BASIS, | |
15 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
16 | * See the License for the specific language governing permissions and | |
17 | * limitations under the License. | |
18 | * #L% | |
19 | */ | |
20 | ||
21 | package com.allanbank.mongodb.gridfs; | |
22 | ||
23 | import static com.allanbank.mongodb.builder.QueryBuilder.where; | |
24 | import static com.allanbank.mongodb.builder.Sort.asc; | |
25 | ||
26 | import java.io.FileNotFoundException; | |
27 | import java.io.IOException; | |
28 | import java.io.InputStream; | |
29 | import java.io.InterruptedIOException; | |
30 | import java.io.OutputStream; | |
31 | import java.security.MessageDigest; | |
32 | import java.security.NoSuchAlgorithmException; | |
33 | import java.util.ArrayList; | |
34 | import java.util.Arrays; | |
35 | import java.util.HashMap; | |
36 | import java.util.List; | |
37 | import java.util.Map; | |
38 | import java.util.concurrent.ExecutionException; | |
39 | import java.util.concurrent.Future; | |
40 | ||
41 | import com.allanbank.mongodb.Durability; | |
42 | import com.allanbank.mongodb.MongoCollection; | |
43 | import com.allanbank.mongodb.MongoDatabase; | |
44 | import com.allanbank.mongodb.MongoDbException; | |
45 | import com.allanbank.mongodb.MongoDbUri; | |
46 | import com.allanbank.mongodb.MongoFactory; | |
47 | import com.allanbank.mongodb.MongoIterator; | |
48 | import com.allanbank.mongodb.bson.Document; | |
49 | import com.allanbank.mongodb.bson.Element; | |
50 | import com.allanbank.mongodb.bson.NumericElement; | |
51 | import com.allanbank.mongodb.bson.builder.BuilderFactory; | |
52 | import com.allanbank.mongodb.bson.builder.DocumentBuilder; | |
53 | import com.allanbank.mongodb.bson.element.BinaryElement; | |
54 | import com.allanbank.mongodb.bson.element.ObjectId; | |
55 | import com.allanbank.mongodb.bson.element.StringElement; | |
56 | import com.allanbank.mongodb.builder.Find; | |
57 | import com.allanbank.mongodb.builder.Index; | |
58 | import com.allanbank.mongodb.util.IOUtils; | |
59 | ||
60 | /** | |
61 | * GridFs provides an interface for working with a GridFS collection. | |
62 | * <p> | |
63 | * This implementation uses a {@link ObjectId} as the id when writing and stores | |
64 | * the name of the file in the files collection document's "filename" field. To | |
65 | * {@link #unlink(String)} or {@link #read(String, OutputStream)} a file from | |
66 | * the collection the _id field may contain any value but the filename field | |
67 | * must be present. | |
68 | * </p> | |
69 | * | |
70 | * @api.yes This class is part of the driver's API. Public and protected members | |
71 | * will be deprecated for at least 1 non-bugfix release (version | |
72 | * numbers are <major>.<minor>.<bugfix>) before being | |
73 | * removed or modified. | |
74 | * @copyright 2012-2013, Allanbank Consulting, Inc., All Rights Reserved | |
75 | */ | |
76 | public class GridFs { | |
77 | ||
78 | /** | |
79 | * The field in the {@link #CHUNKS_SUFFIX chunks} collection containing the | |
80 | * chunk's number. | |
81 | */ | |
82 | public static final String CHUNK_NUMBER_FIELD = "n"; | |
83 | ||
84 | /** The amount of overhead in a chunk document in bytes: {@value} */ | |
85 | public static final int CHUNK_OVERHEAD = 62; | |
86 | ||
87 | /** | |
88 | * The field in the {@link #FILES_SUFFIX files} collection containing the | |
89 | * file's chunk size. | |
90 | */ | |
91 | public static final String CHUNK_SIZE_FIELD = "chunkSize"; | |
92 | ||
93 | /** The suffix for the chunks collection. */ | |
94 | public static final String CHUNKS_SUFFIX = ".chunks"; | |
95 | ||
96 | /** | |
97 | * The field in the {@link #CHUNKS_SUFFIX chunks} collection containing the | |
98 | * chunk's data. | |
99 | */ | |
100 | public static final String DATA_FIELD = "data"; | |
101 | ||
102 | /** | |
103 | * The default chunk size. This is slightly less than 256K to allow for the | |
104 | * {@link #CHUNK_OVERHEAD} when using the power of two allocator. | |
105 | */ | |
106 | public static final int DEFAULT_CHUNK_SIZE; | |
107 | ||
108 | /** The suffix for the files collection. */ | |
109 | public static final String DEFAULT_ROOT = "fs"; | |
110 | ||
111 | /** | |
112 | * The field in the {@link #FILES_SUFFIX files} collection containing the | |
113 | * file's name. | |
114 | */ | |
115 | public static final String FILENAME_FIELD = "filename"; | |
116 | ||
117 | /** | |
118 | * The field in the {@link #CHUNKS_SUFFIX chunks} collection containing the | |
119 | * chunk's related file id. | |
120 | */ | |
121 | public static final String FILES_ID_FIELD = "files_id"; | |
122 | ||
123 | /** The suffix for the files collection. */ | |
124 | public static final String FILES_SUFFIX = ".files"; | |
125 | ||
126 | /** The {@code _id} field name. */ | |
127 | public static final String ID_FIELD = "_id"; | |
128 | ||
129 | /** | |
130 | * The field in the {@link #FILES_SUFFIX files} collection containing the | |
131 | * file's length. | |
132 | */ | |
133 | public static final String LENGTH_FIELD = "length"; | |
134 | ||
135 | /** | |
136 | * The field in the {@link #FILES_SUFFIX files} collection containing the | |
137 | * file's MD5. | |
138 | */ | |
139 | public static final String MD5_FIELD = "md5"; | |
140 | ||
141 | /** | |
142 | * The field in the {@link #FILES_SUFFIX files} collection containing the | |
143 | * file's upload date. | |
144 | */ | |
145 | public static final String UPLOAD_DATE_FIELD = "uploadDate"; | |
146 | ||
147 | static { | |
148 | 1 | DEFAULT_CHUNK_SIZE = (256 * 1024) - CHUNK_OVERHEAD; |
149 | 1 | } |
150 | ||
151 | /** The GridFS chunks collection. */ | |
152 | private final MongoCollection myChunksCollection; | |
153 | ||
154 | /** The size for a chunk written. */ | |
155 | 45 | private int myChunkSize = DEFAULT_CHUNK_SIZE; |
156 | ||
157 | /** The GridFS database. */ | |
158 | private final MongoDatabase myDatabase; | |
159 | ||
160 | /** The GridFS files collection. */ | |
161 | private final MongoCollection myFilesCollection; | |
162 | ||
163 | /** The root name for the GridFS collections. */ | |
164 | private final String myRootName; | |
165 | ||
166 | /** | |
167 | * Creates a new GridFs. | |
168 | * <p> | |
169 | * The GridFS objects will be stored in the 'fs' collection. | |
170 | * </p> | |
171 | * | |
172 | * @param database | |
173 | * The database containing the GridFS collections. | |
174 | */ | |
175 | public GridFs(final MongoDatabase database) { | |
176 | 42 | this(database, DEFAULT_ROOT); |
177 | 42 | } |
178 | ||
179 | /** | |
180 | * Creates a new GridFs. | |
181 | * | |
182 | * | |
183 | * @param database | |
184 | * The database containing the GridFS collections. | |
185 | * @param rootName | |
186 | * The rootName for the collections. The {@link #FILES_SUFFIX} | |
187 | * and {@link #CHUNKS_SUFFIX} will be appended to create the two | |
188 | * collection names. | |
189 | */ | |
190 | 42 | public GridFs(final MongoDatabase database, final String rootName) { |
191 | 42 | myRootName = rootName; |
192 | 42 | myDatabase = database; |
193 | 42 | myFilesCollection = database.getCollection(rootName + FILES_SUFFIX); |
194 | 42 | myChunksCollection = database.getCollection(rootName + CHUNKS_SUFFIX); |
195 | 42 | } |
196 | ||
197 | /** | |
198 | * Creates a new GridFs. | |
199 | * | |
200 | * @param mongoDbUri | |
201 | * The configuration for the connection to MongoDB expressed as a | |
202 | * MongoDB URL. | |
203 | * @throws IllegalArgumentException | |
204 | * If the <tt>mongoDbUri</tt> is not a properly formated MongoDB | |
205 | * style URL. | |
206 | * | |
207 | * @see <a href="http://www.mongodb.org/display/DOCS/Connections"> MongoDB | |
208 | * Connections</a> | |
209 | */ | |
210 | public GridFs(final String mongoDbUri) { | |
211 | 3 | this(mongoDbUri, DEFAULT_ROOT); |
212 | 3 | } |
213 | ||
214 | /** | |
215 | * Creates a new GridFs. | |
216 | * | |
217 | * @param mongoDbUri | |
218 | * The configuration for the connection to MongoDB expressed as a | |
219 | * MongoDB URL. | |
220 | * @param rootName | |
221 | * The rootName for the collections. The {@link #FILES_SUFFIX} | |
222 | * and {@link #CHUNKS_SUFFIX} will be appended to create the two | |
223 | * collection names. | |
224 | * @throws IllegalArgumentException | |
225 | * If the <tt>mongoDbUri</tt> is not a properly formated MongoDB | |
226 | * style URL. | |
227 | * | |
228 | * @see <a href="http://www.mongodb.org/display/DOCS/Connections"> MongoDB | |
229 | * Connections</a> | |
230 | */ | |
231 | 3 | public GridFs(final String mongoDbUri, final String rootName) { |
232 | 3 | final MongoDbUri uri = new MongoDbUri(mongoDbUri); |
233 | ||
234 | 3 | final MongoDatabase database = MongoFactory.createClient(uri) |
235 | .getDatabase(uri.getDatabase()); | |
236 | ||
237 | 3 | myRootName = rootName; |
238 | 3 | myDatabase = database; |
239 | 3 | myFilesCollection = database.getCollection(rootName + FILES_SUFFIX); |
240 | 3 | myChunksCollection = database.getCollection(rootName + CHUNKS_SUFFIX); |
241 | 3 | } |
242 | ||
243 | /** | |
244 | * Creates the following indexes: | |
245 | * <ul> | |
246 | * <li> | |
247 | * Files Collection: | |
248 | * <ul> | |
249 | * <li><code>{ 'filename' : 1, 'uploadDate' : 1 }</code></li> | |
250 | * </ul> | |
251 | * </li> | |
252 | * <li> | |
253 | * Chunks Collection: | |
254 | * <ul> | |
255 | * <li><code>{ 'files_id' : 1, 'n' : 1 }</code></li> | |
256 | * </ul> | |
257 | * </li> | |
258 | * </ul> | |
259 | * If in a non-sharded environment the indexes will be unique. | |
260 | */ | |
261 | public void createIndexes() { | |
262 | try { | |
263 | 10 | myFilesCollection.createIndex(true, Index.asc(FILENAME_FIELD), |
264 | Index.asc(UPLOAD_DATE_FIELD)); | |
265 | } | |
266 | 1 | catch (final MongoDbException error) { |
267 | // Can't be unique in a sharded environment. | |
268 | 1 | myFilesCollection.createIndex(false, Index.asc(FILENAME_FIELD), |
269 | Index.asc(UPLOAD_DATE_FIELD)); | |
270 | 9 | } |
271 | ||
272 | try { | |
273 | 10 | myChunksCollection.createIndex(true, Index.asc(FILES_ID_FIELD), |
274 | Index.asc(CHUNK_NUMBER_FIELD)); | |
275 | } | |
276 | 1 | catch (final MongoDbException error) { |
277 | // Can't be unique in a sharded environment. | |
278 | 1 | myChunksCollection.createIndex(false, Index.asc(FILES_ID_FIELD), |
279 | Index.asc(CHUNK_NUMBER_FIELD)); | |
280 | 9 | } |
281 | 10 | } |
282 | ||
283 | /** | |
284 | * Validates and optionally tries to repair the GridFS collections. | |
285 | * <ul> | |
286 | * <li> | |
287 | * Ensure the following indexes exist: | |
288 | * <ul> | |
289 | * <li> | |
290 | * Files Collection: | |
291 | * <ul> | |
292 | * <li><code>{ 'filename' : 1, 'uploadDate' : 1 }</code></li> | |
293 | * </ul> | |
294 | * </li> | |
295 | * <li> | |
296 | * Chunks Collection: | |
297 | * <ul> | |
298 | * <li><code>{ 'files_id' : 1, 'n' : 1 }</code></li> | |
299 | * </ul> | |
300 | * </li> | |
301 | * </ul> | |
302 | * </li> | |
303 | * <li> | |
304 | * Ensure there are no duplicate {@code n} values for the chunks of a file. | |
305 | * If {@code repair} is true then the {@code n} values will be updated to be | |
306 | * sequential based on the ordering <tt>{ 'n' : 1, '_id' 1 }</tt>.</li> | |
307 | * <li> | |
308 | * Validates the MD5 sum for each file via the <a | |
309 | * href="http://docs.mongodb.org/manual/reference/command/filemd5" | |
310 | * >filemd5</a> command.</li> | |
311 | * </ul> | |
312 | * <p> | |
313 | * <b>Warning:</b> This function iterates over every file in the GridFS | |
314 | * collection and can take a considerable amount of time and resources on | |
315 | * the client and the server. | |
316 | * </p> | |
317 | * <p> | |
318 | * <b>Note:</b> Due to a limitation in the MongoDB server this method will | |
319 | * return false positives when used with a sharded cluster when the shard | |
320 | * key for the chunks collection is not one of <code>{files_id:1}</code> or | |
321 | * <code>{files_id:1, n:1}</code>. See <a | |
322 | * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>. | |
323 | * </p> | |
324 | * | |
325 | * @param repair | |
326 | * If set to <code>true</code> then the fsck will attempt to | |
327 | * repair common errors. | |
328 | * @return A map of the file ids to the errors found for the file and the | |
329 | * repair status. If no errors are found an empty map is returned. | |
330 | * @throws IOException | |
331 | * On a failure to execute the fsck. | |
332 | * | |
333 | * @see <a | |
334 | * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a> | |
335 | */ | |
336 | public Map<Object, List<String>> fsck(final boolean repair) | |
337 | throws IOException { | |
338 | ||
339 | 8 | final Map<Object, List<String>> faults = new HashMap<Object, List<String>>(); |
340 | ||
341 | 8 | createIndexes(); |
342 | ||
343 | // Use the filemd5 command to locate files to inspect more closely. | |
344 | 8 | final MongoIterator<Document> iter = myFilesCollection.find(Find.ALL); |
345 | try { | |
346 | 8 | for (final Document fileDoc : iter) { |
347 | 8 | final Element id = fileDoc.get(ID_FIELD); |
348 | ||
349 | 8 | final DocumentBuilder commandDoc = BuilderFactory.start(); |
350 | 8 | commandDoc.add(id.withName("filemd5")); |
351 | 8 | commandDoc.add("root", myRootName); |
352 | ||
353 | 8 | final Document commandResult = myDatabase.runCommand(commandDoc |
354 | .build()); | |
355 | 8 | if (!doVerifyFileMd5(faults, fileDoc, commandResult) && repair) { |
356 | 5 | doTryAndRepair(fileDoc, faults); |
357 | } | |
358 | 8 | } |
359 | } | |
360 | finally { | |
361 | 8 | iter.close(); |
362 | 8 | } |
363 | 8 | return faults; |
364 | } | |
365 | ||
366 | /** | |
367 | * Returns the size for a chunk written. | |
368 | * | |
369 | * @return The size for a chunk written. | |
370 | */ | |
371 | public int getChunkSize() { | |
372 | 7 | return myChunkSize; |
373 | } | |
374 | ||
375 | /** | |
376 | * Reads a file from the GridFS collections and writes the contents to the | |
377 | * {@code sink} | |
378 | * | |
379 | * @param id | |
380 | * The id of the file. | |
381 | * @param sink | |
382 | * The stream to write the data to. This stream will not be | |
383 | * closed by this method. | |
384 | * @throws IOException | |
385 | * On a failure reading the data from MongoDB or writing to the | |
386 | * {@code sink}. | |
387 | */ | |
388 | public void read(final ObjectId id, final OutputStream sink) | |
389 | throws IOException { | |
390 | // Find the document with the specified name. | |
391 | 2 | final Document fileDoc = myFilesCollection.findOne(where(ID_FIELD) |
392 | .equals(id)); | |
393 | 2 | if (fileDoc == null) { |
394 | 1 | throw new FileNotFoundException(id.toString()); |
395 | } | |
396 | ||
397 | 1 | doRead(fileDoc, sink); |
398 | 1 | } |
399 | ||
400 | /** | |
401 | * Reads a file from the GridFS collections and writes the contents to the | |
402 | * {@code sink} | |
403 | * | |
404 | * @param name | |
405 | * The name of the file. | |
406 | * @param sink | |
407 | * The stream to write the data to. This stream will not be | |
408 | * closed by this method. | |
409 | * @throws IOException | |
410 | * On a failure reading the data from MongoDB or writing to the | |
411 | * {@code sink}. | |
412 | */ | |
413 | public void read(final String name, final OutputStream sink) | |
414 | throws IOException { | |
415 | ||
416 | // Find the document with the specified name. | |
417 | 11 | final Document fileDoc = myFilesCollection |
418 | .findOne(where(FILENAME_FIELD).equals(name)); | |
419 | 11 | if (fileDoc == null) { |
420 | 1 | throw new FileNotFoundException(name); |
421 | } | |
422 | ||
423 | 10 | doRead(fileDoc, sink); |
424 | 5 | } |
425 | ||
426 | /** | |
427 | * Sets the value of size for a chunk written. | |
428 | * | |
429 | * @param chunkSize | |
430 | * The new value for the size for a chunk written. | |
431 | */ | |
432 | public void setChunkSize(final int chunkSize) { | |
433 | 1 | myChunkSize = chunkSize; |
434 | 1 | } |
435 | ||
436 | /** | |
437 | * Unlinks (deletes) the file from the GridFS collections. | |
438 | * | |
439 | * @param id | |
440 | * The id of the file to be deleted. | |
441 | * @return True if a file was deleted, false otherwise. | |
442 | * @throws IOException | |
443 | * On a failure to delete the file. | |
444 | */ | |
445 | public boolean unlink(final ObjectId id) throws IOException { | |
446 | ||
447 | // Find the document with the specified name. | |
448 | 2 | final Document fileDoc = myFilesCollection.findOne(where(ID_FIELD) |
449 | .equals(id)); | |
450 | 2 | if (fileDoc == null) { |
451 | 1 | return false; |
452 | } | |
453 | ||
454 | 1 | return doUnlink(fileDoc); |
455 | } | |
456 | ||
457 | /** | |
458 | * Unlinks (deletes) the file from the GridFS collections. | |
459 | * | |
460 | * @param name | |
461 | * The name of the file to be deleted. | |
462 | * @return True if a file was deleted, false otherwise. | |
463 | * @throws IOException | |
464 | * On a failure to validate the file. | |
465 | */ | |
466 | public boolean unlink(final String name) throws IOException { | |
467 | ||
468 | // Find the document with the specified name. | |
469 | 6 | final Document fileDoc = myFilesCollection |
470 | .findOne(where(FILENAME_FIELD).equals(name)); | |
471 | 6 | if (fileDoc == null) { |
472 | 1 | return false; |
473 | } | |
474 | ||
475 | 5 | return doUnlink(fileDoc); |
476 | } | |
477 | ||
478 | /** | |
479 | * Validates the file from the GridFS collections using the {@code filemd5} | |
480 | * command. | |
481 | * <p> | |
482 | * <b>Note:</b> Due to a limitation in the MongoDB server this method will | |
483 | * always return <code>false</code> when used with a sharded cluster when | |
484 | * the shard key for the chunks collection is not one of | |
485 | * <code>{files_id:1}</code> or <code>{files_id:1, n:1}</code>. See <a | |
486 | * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>. | |
487 | * </p> | |
488 | * | |
489 | * @param id | |
490 | * The id of the file to be validate. | |
491 | * @return True if a file was validated (md5 hash matches), false otherwise. | |
492 | * @throws IOException | |
493 | * On a failure to validate the file. | |
494 | * | |
495 | * @see <a | |
496 | * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a> | |
497 | */ | |
498 | public boolean validate(final ObjectId id) throws IOException { | |
499 | ||
500 | // Find the document with the specified name. | |
501 | 2 | final Document fileDoc = myFilesCollection.findOne(where(ID_FIELD) |
502 | .equals(id)); | |
503 | 2 | if (fileDoc == null) { |
504 | 1 | throw new FileNotFoundException(id.toString()); |
505 | } | |
506 | ||
507 | 1 | return doValidate(fileDoc); |
508 | } | |
509 | ||
510 | /** | |
511 | * Validates the file from the GridFS collections using the {@code filemd5} | |
512 | * command. | |
513 | * <p> | |
514 | * <b>Note:</b> Due to a limitation in the MongoDB server this method will | |
515 | * always return <code>false</code> when used with a sharded cluster when | |
516 | * the shard key for the chunks collection is not one of | |
517 | * <code>{files_id:1}</code> or <code>{files_id:1, n:1}</code>. See <a | |
518 | * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>. | |
519 | * </p> | |
520 | * | |
521 | * @param name | |
522 | * The name of the file to be validate. | |
523 | * @return True if a file was validated (md5 hash matches), false otherwise. | |
524 | * @throws IOException | |
525 | * On a failure to validate the file. | |
526 | * | |
527 | * @see <a | |
528 | * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a> | |
529 | */ | |
530 | public boolean validate(final String name) throws IOException { | |
531 | ||
532 | // Find the document with the specified name. | |
533 | 4 | final Document fileDoc = myFilesCollection |
534 | .findOne(where(FILENAME_FIELD).equals(name)); | |
535 | 4 | if (fileDoc == null) { |
536 | 1 | throw new FileNotFoundException(name); |
537 | } | |
538 | ||
539 | 3 | return doValidate(fileDoc); |
540 | } | |
541 | ||
542 | /** | |
543 | * Attempts to write a file into the GridFS collections using the specified | |
544 | * name for the file and deriving the chunks from the data read from the | |
545 | * <tt>source</tt>. | |
546 | * | |
547 | * @param name | |
548 | * The name of the file being written. | |
549 | * @param source | |
550 | * The source of the bits in the file. This stream will not be | |
551 | * closed. | |
552 | * @return The {@link ObjectId} associted with the file. | |
553 | * @throws IOException | |
554 | * On a failure writing the documents or reading the file | |
555 | * contents. In the case of a failure an attempt is made to | |
556 | * remove the documents written to the collections. | |
557 | */ | |
558 | public ObjectId write(final String name, final InputStream source) | |
559 | throws IOException { | |
560 | 4 | final ObjectId id = new ObjectId(); |
561 | 4 | boolean failed = false; |
562 | try { | |
563 | 4 | final byte[] buffer = new byte[myChunkSize]; |
564 | 4 | final MessageDigest md5Digest = MessageDigest.getInstance("MD5"); |
565 | ||
566 | 4 | final List<Future<Integer>> results = new ArrayList<Future<Integer>>(); |
567 | 4 | final DocumentBuilder doc = BuilderFactory.start(); |
568 | 4 | int n = 0; |
569 | 4 | long length = 0; |
570 | 4 | int read = readFully(source, buffer); |
571 | 9 | while (read > 0) { |
572 | ||
573 | 5 | final ObjectId chunkId = new ObjectId(); |
574 | ||
575 | 5 | doc.reset(); |
576 | 5 | doc.addObjectId(ID_FIELD, chunkId); |
577 | 5 | doc.addObjectId(FILES_ID_FIELD, id); |
578 | 5 | doc.addInteger(CHUNK_NUMBER_FIELD, n); |
579 | ||
580 | 5 | final byte[] data = (read == buffer.length) ? buffer : Arrays |
581 | .copyOf(buffer, read); | |
582 | 5 | md5Digest.update(data); |
583 | 5 | doc.addBinary(DATA_FIELD, data); |
584 | ||
585 | 5 | results.add(myChunksCollection.insertAsync(doc.build())); |
586 | ||
587 | 5 | length += data.length; |
588 | 5 | read = readFully(source, buffer); |
589 | 5 | n += 1; |
590 | 5 | } |
591 | ||
592 | 4 | doc.reset(); |
593 | 4 | doc.addObjectId(ID_FIELD, id); |
594 | 4 | doc.addString(FILENAME_FIELD, name); |
595 | 4 | doc.addTimestamp(UPLOAD_DATE_FIELD, System.currentTimeMillis()); |
596 | 4 | doc.addInteger(CHUNK_SIZE_FIELD, buffer.length); |
597 | 4 | doc.addLong(LENGTH_FIELD, length); |
598 | 4 | doc.addString(MD5_FIELD, IOUtils.toHex(md5Digest.digest())); |
599 | ||
600 | 4 | results.add(myFilesCollection.insertAsync(doc.build())); |
601 | ||
602 | // Make sure everything made it to the server. | |
603 | 4 | for (final Future<Integer> f : results) { |
604 | 8 | f.get(); |
605 | 6 | } |
606 | } | |
607 | 0 | catch (final NoSuchAlgorithmException e) { |
608 | 0 | failed = true; |
609 | 0 | throw new IOException(e); |
610 | } | |
611 | 1 | catch (final InterruptedException e) { |
612 | 1 | failed = true; |
613 | 1 | final InterruptedIOException error = new InterruptedIOException( |
614 | e.getMessage()); | |
615 | 1 | error.initCause(e); |
616 | 1 | throw error; |
617 | } | |
618 | 1 | catch (final ExecutionException e) { |
619 | 1 | failed = true; |
620 | 1 | throw new IOException(e.getCause()); |
621 | } | |
622 | finally { | |
623 | 4 | if (failed) { |
624 | 2 | myFilesCollection.delete(where(ID_FIELD).equals(id)); |
625 | 2 | myChunksCollection.delete(where(FILES_ID_FIELD).equals(id)); |
626 | } | |
627 | } | |
628 | ||
629 | 2 | return id; |
630 | } | |
631 | ||
632 | /** | |
633 | * Adds a fault message to the faults map. | |
634 | * | |
635 | * @param faults | |
636 | * The map of file ids to the error messages. | |
637 | * @param idObj | |
638 | * The id for the file. | |
639 | * @param message | |
640 | * The message to add. | |
641 | */ | |
642 | protected void doAddFault(final Map<Object, List<String>> faults, | |
643 | final Element idObj, final String message) { | |
644 | 14 | List<String> docFaults = faults.get(idObj.getValueAsObject()); |
645 | 14 | if (docFaults == null) { |
646 | 8 | docFaults = new ArrayList<String>(); |
647 | 8 | faults.put(idObj.getValueAsObject(), docFaults); |
648 | } | |
649 | 14 | docFaults.add(message); |
650 | 14 | } |
651 | ||
652 | /** | |
653 | * Reads a file from the GridFS collections and writes the contents to the | |
654 | * {@code sink} | |
655 | * | |
656 | * @param fileDoc | |
657 | * The document for the file. | |
658 | * @param sink | |
659 | * The stream to write the data to. This stream will not be | |
660 | * closed by this method. | |
661 | * @throws IOException | |
662 | * On a failure reading the data from MongoDB or writing to the | |
663 | * {@code sink}. | |
664 | */ | |
665 | protected void doRead(final Document fileDoc, final OutputStream sink) | |
666 | throws IOException { | |
667 | ||
668 | 11 | final Element id = fileDoc.get(ID_FIELD); |
669 | ||
670 | 11 | long length = -1; |
671 | 11 | final NumericElement lengthElement = fileDoc.get(NumericElement.class, |
672 | LENGTH_FIELD); | |
673 | 11 | if (lengthElement != null) { |
674 | 10 | length = lengthElement.getLongValue(); |
675 | } | |
676 | ||
677 | 11 | long chunkSize = -1; |
678 | 11 | final NumericElement chunkSizeElement = fileDoc.get( |
679 | NumericElement.class, CHUNK_SIZE_FIELD); | |
680 | 11 | if (chunkSizeElement != null) { |
681 | 9 | chunkSize = chunkSizeElement.getLongValue(); |
682 | } | |
683 | ||
684 | 11 | long numberChunks = -1; |
685 | 11 | if ((0 <= length) && (0 < chunkSize)) { |
686 | 9 | numberChunks = (long) Math.ceil((double) length |
687 | / (double) chunkSize); | |
688 | } | |
689 | ||
690 | 11 | final Element queryElement = id.withName(FILES_ID_FIELD); |
691 | 11 | final DocumentBuilder queryDoc = BuilderFactory.start(); |
692 | 11 | queryDoc.add(queryElement); |
693 | ||
694 | 11 | final Find.Builder findBuilder = new Find.Builder(queryDoc.build()); |
695 | 11 | findBuilder.setSort(asc(CHUNK_NUMBER_FIELD)); |
696 | ||
697 | // Small batch size since the docs are big and we can do parallel I/O. | |
698 | 11 | findBuilder.setBatchSize(2); |
699 | ||
700 | 11 | long expectedChunk = 0; |
701 | 11 | long totalSize = 0; |
702 | 11 | final MongoIterator<Document> iter = myChunksCollection |
703 | .find(findBuilder.build()); | |
704 | try { | |
705 | 11 | for (final Document chunk : iter) { |
706 | ||
707 | 9 | final NumericElement n = chunk.get(NumericElement.class, |
708 | CHUNK_NUMBER_FIELD); | |
709 | 9 | final BinaryElement bytes = chunk.get(BinaryElement.class, |
710 | DATA_FIELD); | |
711 | ||
712 | 9 | if (n == null) { |
713 | 1 | throw new IOException("Missing chunk number '" |
714 | + (expectedChunk + 1) + "' of '" + numberChunks | |
715 | + "'."); | |
716 | } | |
717 | 8 | else if (n.getLongValue() != expectedChunk) { |
718 | 1 | throw new IOException("Skipped chunk '" |
719 | + (expectedChunk + 1) + "', retreived '" | |
720 | + n.getLongValue() + "' of '" + numberChunks + "'."); | |
721 | } | |
722 | 7 | else if (bytes == null) { |
723 | 1 | throw new IOException("Missing bytes in chunk '" |
724 | + (expectedChunk + 1) + "' of '" + numberChunks | |
725 | + "'."); | |
726 | } | |
727 | else { | |
728 | ||
729 | 6 | final byte[] buffer = bytes.getValue(); |
730 | ||
731 | 6 | sink.write(buffer); |
732 | 6 | expectedChunk += 1; |
733 | 6 | totalSize += buffer.length; |
734 | } | |
735 | 6 | } |
736 | } | |
737 | finally { | |
738 | 11 | iter.close(); |
739 | 11 | sink.flush(); |
740 | 8 | } |
741 | ||
742 | 8 | if ((0 <= numberChunks) && (expectedChunk < numberChunks)) { |
743 | 1 | throw new IOException("Missing chunks after '" + expectedChunk |
744 | + "' of '" + numberChunks + "'."); | |
745 | } | |
746 | 7 | if ((0 <= length) && (totalSize != length)) { |
747 | 1 | throw new IOException("File size mismatch. Expected '" + length |
748 | + "' but only read '" + totalSize + "' bytes."); | |
749 | } | |
750 | 6 | } |
751 | ||
752 | /** | |
753 | * Tries to repair the file. | |
754 | * <p> | |
755 | * Currently the only strategy is to reorder the chunk's into _id order. The | |
756 | * operation verifies that the reorder fixes the file prior to modifying | |
757 | * anything. it also verifies that the reordering worked after reordering | |
758 | * the chunks. | |
759 | * | |
760 | * @param fileDoc | |
761 | * The document representing the file. | |
762 | * @param faults | |
763 | * The map to update with the status of the repair. | |
764 | */ | |
765 | protected void doTryAndRepair(final Document fileDoc, | |
766 | final Map<Object, List<String>> faults) { | |
767 | // First see if the MD5 for the file's chunks in _id order returns the | |
768 | // right results. | |
769 | 5 | final List<Element> chunkIds = new ArrayList<Element>(); |
770 | ||
771 | 5 | final Element id = fileDoc.get(ID_FIELD); |
772 | 5 | final Element md5 = fileDoc.get(MD5_FIELD); |
773 | 5 | final Element queryElement = id.withName(FILES_ID_FIELD); |
774 | 5 | final DocumentBuilder queryDoc = BuilderFactory.start().add( |
775 | queryElement); | |
776 | ||
777 | 5 | final Find.Builder findBuilder = new Find.Builder(queryDoc.build()); |
778 | 5 | findBuilder.setSort(asc(ID_FIELD)); |
779 | ||
780 | // Small batch size since the docs are big and we can do parallel I/O. | |
781 | 5 | findBuilder.setBatchSize(2); |
782 | ||
783 | 5 | MongoIterator<Document> iter = null; |
784 | try { | |
785 | 5 | final MessageDigest md5Digest = MessageDigest.getInstance("MD5"); |
786 | 5 | iter = myChunksCollection.find(findBuilder); |
787 | 4 | for (final Document chunkDoc : iter) { |
788 | ||
789 | 4 | chunkIds.add(chunkDoc.get(ID_FIELD)); |
790 | ||
791 | 4 | final BinaryElement chunk = chunkDoc.get(BinaryElement.class, |
792 | DATA_FIELD); | |
793 | 4 | if (chunk != null) { |
794 | 3 | md5Digest.update(chunk.getValue()); |
795 | } | |
796 | 4 | } |
797 | ||
798 | 4 | final String digest = IOUtils.toHex(md5Digest.digest()); |
799 | 4 | final StringElement computed = new StringElement(MD5_FIELD, digest); |
800 | 4 | if (computed.equals(md5)) { |
801 | // Update the 'n' fields for each chunk to be in the right | |
802 | // order. | |
803 | 2 | int n = 0; |
804 | 2 | for (final Element idElement : chunkIds) { |
805 | 2 | final DocumentBuilder query = BuilderFactory.start(); |
806 | 2 | query.add(idElement); |
807 | 2 | query.add(queryElement); // Direct to the right shard. |
808 | ||
809 | 2 | final DocumentBuilder update = BuilderFactory.start(); |
810 | 2 | update.push("$set").add(CHUNK_NUMBER_FIELD, n); |
811 | ||
812 | // Use a multi-update to ensure the write happens when a | |
813 | // files chunks are across shards. | |
814 | 2 | myChunksCollection.update(query.build(), update.build(), |
815 | true /* =multi */, false, Durability.ACK); | |
816 | ||
817 | 2 | n += 1; |
818 | 2 | } |
819 | ||
820 | 2 | if (doValidate(fileDoc)) { |
821 | 1 | doAddFault(faults, id, "File repaired."); |
822 | ||
823 | } | |
824 | else { | |
825 | 1 | doAddFault(faults, id, |
826 | "Repair failed: Chunks reordered but sill not validating."); | |
827 | } | |
828 | 2 | } |
829 | else { | |
830 | 2 | doAddFault(faults, id, |
831 | "Repair failed: Could not determine correct chunk order."); | |
832 | } | |
833 | } | |
834 | 0 | catch (final NoSuchAlgorithmException e) { |
835 | 0 | doAddFault(faults, id, |
836 | "Repair failed: Could not compute the MD5 for the file: " | |
837 | + e.getMessage()); | |
838 | } | |
839 | 1 | catch (final RuntimeException e) { |
840 | 1 | doAddFault(faults, id, "Potential Repair Failure: Runtime error: " |
841 | + e.getMessage()); | |
842 | } | |
843 | finally { | |
844 | 5 | IOUtils.close(iter); |
845 | 5 | } |
846 | 5 | } |
847 | ||
848 | /** | |
849 | * Unlinks (deletes) the file from the GridFS collections. | |
850 | * | |
851 | * @param fileDoc | |
852 | * The document for the file to delete. | |
853 | * @return True if a file was deleted, false otherwise. | |
854 | * @throws IOException | |
855 | * On a failure to delete the file. | |
856 | */ | |
857 | protected boolean doUnlink(final Document fileDoc) throws IOException { | |
858 | 6 | final Element id = fileDoc.get(ID_FIELD); |
859 | ||
860 | 6 | final DocumentBuilder queryDoc = BuilderFactory.start(); |
861 | 6 | queryDoc.add(id.withName(FILES_ID_FIELD)); |
862 | 6 | final Future<Long> cFuture = myChunksCollection.deleteAsync(queryDoc); |
863 | ||
864 | 6 | queryDoc.reset(); |
865 | 6 | queryDoc.add(id); |
866 | 6 | final Future<Long> fFuture = myFilesCollection.deleteAsync(queryDoc); |
867 | ||
868 | try { | |
869 | 6 | return (cFuture.get().longValue() >= 0) |
870 | && (fFuture.get().longValue() > 0); | |
871 | } | |
872 | 1 | catch (final InterruptedException e) { |
873 | 1 | return false; |
874 | } | |
875 | 1 | catch (final ExecutionException e) { |
876 | 1 | return false; |
877 | } | |
878 | } | |
879 | ||
880 | /** | |
881 | * Validates the file from the GridFS collections using the {@code filemd5} | |
882 | * command. | |
883 | * <p> | |
884 | * <b>Note:</b> Due to a limitation in the MongoDB server this method will | |
885 | * always return <code>false</code> when used with a sharded cluster when | |
886 | * the shard key for the chunks collection is not one of | |
887 | * <code>{files_id:1}</code> or <code>{files_id:1, n:1}</code>. See <a | |
888 | * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>. | |
889 | * </p> | |
890 | * | |
891 | * @param fileDoc | |
892 | * The document for the file to delete. | |
893 | * @return True if a file was deleted, false otherwise. | |
894 | * | |
895 | * @see <a | |
896 | * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a> | |
897 | */ | |
898 | protected boolean doValidate(final Document fileDoc) { | |
899 | 6 | final Element id = fileDoc.get(ID_FIELD); |
900 | 6 | final Element md5 = fileDoc.get(MD5_FIELD); |
901 | ||
902 | 6 | final DocumentBuilder commandDoc = BuilderFactory.start(); |
903 | 6 | commandDoc.add(id.withName("filemd5")); |
904 | 6 | commandDoc.add("root", myRootName); |
905 | 6 | final Document result = myDatabase.runCommand(commandDoc.build()); |
906 | ||
907 | 6 | return (md5 != null) && md5.equals(result.findFirst(MD5_FIELD)); |
908 | } | |
909 | ||
910 | /** | |
911 | * Verifies the MD5 result for the filemd5 command. | |
912 | * | |
913 | * @param faults | |
914 | * The faults for to update if the verify fails. | |
915 | * @param fileDoc | |
916 | * The document representing the file. | |
917 | * @param cmdResult | |
918 | * The document returned from the 'filemd5' command. | |
919 | * @return True if the file was successful. | |
920 | */ | |
921 | protected boolean doVerifyFileMd5(final Map<Object, List<String>> faults, | |
922 | final Document fileDoc, final Document cmdResult) { | |
923 | 8 | boolean ok = false; |
924 | 8 | final Element idElement = fileDoc.get(ID_FIELD); |
925 | ||
926 | 8 | final Element md5 = fileDoc.get(MD5_FIELD); |
927 | 8 | final Element commandMd5 = cmdResult.findFirst(MD5_FIELD); |
928 | ||
929 | 8 | ok = (md5 != null) && md5.equals(commandMd5); |
930 | 8 | if (!ok) { |
931 | 7 | doAddFault(faults, idElement, |
932 | "MD5 sums do not match. File document contains '" + md5 | |
933 | + "' and the filemd5 command produced '" | |
934 | + commandMd5 + "'."); | |
935 | } | |
936 | ||
937 | 8 | return ok; |
938 | } | |
939 | ||
940 | /** | |
941 | * Read the full contents of the stream until an EOF into the buffer. | |
942 | * | |
943 | * @param source | |
944 | * The source if bytes to read. | |
945 | * @param buffer | |
946 | * The buffer to read into. | |
947 | * @return The number of bytes read. If less than <tt>buffer.length</tt> | |
948 | * then the stream reach the end-of-file. | |
949 | * @throws IOException | |
950 | * On a failure reading from the stream. | |
951 | */ | |
952 | private int readFully(final InputStream source, final byte[] buffer) | |
953 | throws IOException { | |
954 | ||
955 | 9 | int offset = 0; |
956 | ||
957 | while (true) { | |
958 | 11 | final int read = source |
959 | .read(buffer, offset, buffer.length - offset); | |
960 | 11 | if (read < 0) { |
961 | 6 | return offset; |
962 | } | |
963 | ||
964 | 5 | offset += read; |
965 | ||
966 | 5 | if (offset == buffer.length) { |
967 | 3 | return offset; |
968 | } | |
969 | 2 | } |
970 | } | |
971 | } |