Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
MongoIteratorImpl |
|
| 2.3846153846153846;2.385 |
1 | /* | |
2 | * #%L | |
3 | * MongoIteratorImpl.java - mongodb-async-driver - Allanbank Consulting, Inc. | |
4 | * %% | |
5 | * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc. | |
6 | * %% | |
7 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
8 | * you may not use this file except in compliance with the License. | |
9 | * You may obtain a copy of the License at | |
10 | * | |
11 | * http://www.apache.org/licenses/LICENSE-2.0 | |
12 | * | |
13 | * Unless required by applicable law or agreed to in writing, software | |
14 | * distributed under the License is distributed on an "AS IS" BASIS, | |
15 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
16 | * See the License for the specific language governing permissions and | |
17 | * limitations under the License. | |
18 | * #L% | |
19 | */ | |
20 | package com.allanbank.mongodb.client; | |
21 | ||
22 | import java.util.ArrayList; | |
23 | import java.util.Iterator; | |
24 | import java.util.List; | |
25 | import java.util.NoSuchElementException; | |
26 | import java.util.concurrent.ExecutionException; | |
27 | import java.util.concurrent.Future; | |
28 | ||
29 | import com.allanbank.mongodb.MongoClient; | |
30 | import com.allanbank.mongodb.MongoDbException; | |
31 | import com.allanbank.mongodb.MongoIterator; | |
32 | import com.allanbank.mongodb.ReadPreference; | |
33 | import com.allanbank.mongodb.bson.Document; | |
34 | import com.allanbank.mongodb.bson.NumericElement; | |
35 | import com.allanbank.mongodb.bson.builder.BuilderFactory; | |
36 | import com.allanbank.mongodb.bson.builder.DocumentBuilder; | |
37 | import com.allanbank.mongodb.bson.element.StringElement; | |
38 | import com.allanbank.mongodb.client.callback.FutureReplyCallback; | |
39 | import com.allanbank.mongodb.client.message.CursorableMessage; | |
40 | import com.allanbank.mongodb.client.message.GetMore; | |
41 | import com.allanbank.mongodb.client.message.KillCursors; | |
42 | import com.allanbank.mongodb.client.message.Reply; | |
43 | import com.allanbank.mongodb.error.CursorNotFoundException; | |
44 | import com.allanbank.mongodb.util.log.Log; | |
45 | import com.allanbank.mongodb.util.log.LogFactory; | |
46 | ||
47 | /** | |
48 | * Iterator over the results of the MongoDB cursor. | |
49 | * | |
50 | * @api.no This class is <b>NOT</b> part of the drivers API. This class may be | |
51 | * mutated in incompatible ways between any two releases of the driver. | |
52 | * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved | |
53 | */ | |
54 | 25 | public class MongoIteratorImpl implements MongoIterator<Document> { |
55 | ||
56 | /** The log for the iterator. */ | |
57 | 1 | private static final Log LOG = LogFactory.getLog(MongoIteratorImpl.class); |
58 | ||
59 | /** The size of batches that are requested from the servers. */ | |
60 | 50 | private int myBatchSize = 0; |
61 | ||
62 | /** The client for sending get_more requests to the server. */ | |
63 | private final Client myClient; | |
64 | ||
65 | /** The name of the collection the query was originally created on. */ | |
66 | private final String myCollectionName; | |
67 | ||
68 | /** The iterator over the current set of documents. */ | |
69 | private Iterator<Document> myCurrentIterator; | |
70 | ||
71 | /** The original query. */ | |
72 | 50 | private long myCursorId = 0; |
73 | ||
74 | /** The name of the database the query was originally created on. */ | |
75 | private final String myDatabaseName; | |
76 | ||
77 | /** | |
78 | * The maximum number of document to return from the cursor. Zero or | |
79 | * negative means all. | |
80 | */ | |
81 | 50 | private int myLimit = 0; |
82 | ||
83 | /** The {@link Future} that will be updated with the next set of results. */ | |
84 | private FutureReplyCallback myNextReply; | |
85 | ||
86 | /** The read preference to subsequent requests. */ | |
87 | private final ReadPreference myReadPerference; | |
88 | ||
89 | /** | |
90 | * Flag to shutdown this iterator gracefully without closing the cursor on | |
91 | * the server. | |
92 | */ | |
93 | 50 | private boolean myShutdown = false; |
94 | ||
95 | /** | |
96 | * Create a new MongoDBInterator. | |
97 | * | |
98 | * @param originalQuery | |
99 | * The original query being iterated over. | |
100 | * @param client | |
101 | * The client for issuing more requests. | |
102 | * @param server | |
103 | * The server that received the original query request. | |
104 | * @param reply | |
105 | * The initial results of the query that are available. | |
106 | */ | |
107 | public MongoIteratorImpl(final CursorableMessage originalQuery, | |
108 | 46 | final Client client, final String server, final Reply reply) { |
109 | 46 | myNextReply = new FutureReplyCallback(); |
110 | 46 | myNextReply.callback(reply); |
111 | ||
112 | 46 | myReadPerference = ReadPreference.server(server); |
113 | 46 | myCursorId = 0; |
114 | 46 | myClient = client; |
115 | 46 | myCurrentIterator = null; |
116 | 46 | myBatchSize = originalQuery.getBatchSize(); |
117 | 46 | myLimit = originalQuery.getLimit(); |
118 | 46 | myDatabaseName = originalQuery.getDatabaseName(); |
119 | 46 | myCollectionName = originalQuery.getCollectionName(); |
120 | ||
121 | 46 | } |
122 | ||
123 | /** | |
124 | * Create a new MongoIteratorImpl from a cursor document. | |
125 | * | |
126 | * @param client | |
127 | * The client interface to the server. | |
128 | * @param cursorDocument | |
129 | * The original query. | |
130 | * | |
131 | * @see MongoIteratorImpl#asDocument() | |
132 | */ | |
133 | 4 | public MongoIteratorImpl(final Document cursorDocument, final Client client) { |
134 | 4 | final String ns = cursorDocument.get(StringElement.class, |
135 | NAME_SPACE_FIELD).getValue(); | |
136 | 4 | String db = ns; |
137 | 4 | String collection = ns; |
138 | 4 | final int index = ns.indexOf('.'); |
139 | 4 | if (0 < index) { |
140 | 3 | db = ns.substring(0, index); |
141 | 3 | collection = ns.substring(index + 1); |
142 | } | |
143 | ||
144 | 4 | myClient = client; |
145 | 4 | myDatabaseName = db; |
146 | 4 | myCollectionName = collection; |
147 | 4 | myCursorId = cursorDocument.get(NumericElement.class, CURSOR_ID_FIELD) |
148 | .getLongValue(); | |
149 | 4 | myLimit = cursorDocument.get(NumericElement.class, LIMIT_FIELD) |
150 | .getIntValue(); | |
151 | 4 | myBatchSize = cursorDocument |
152 | .get(NumericElement.class, BATCH_SIZE_FIELD).getIntValue(); | |
153 | 4 | myReadPerference = ReadPreference.server(cursorDocument.get( |
154 | StringElement.class, SERVER_FIELD).getValue()); | |
155 | 4 | } |
156 | ||
157 | /** | |
158 | * {@inheritDoc} | |
159 | * <p> | |
160 | * Overridden to return the active cursor in the defined format. | |
161 | * </p> | |
162 | * | |
163 | * @see ClientImpl#isCursorDocument(Document) | |
164 | */ | |
165 | @Override | |
166 | public Document asDocument() { | |
167 | 5 | long cursorId = myCursorId; |
168 | 5 | final Future<Reply> replyFuture = myNextReply; |
169 | ||
170 | 5 | cursorId = retreiveCursorIdFromPendingRequest(cursorId, replyFuture); |
171 | ||
172 | 5 | if (cursorId != 0) { |
173 | 3 | final DocumentBuilder b = BuilderFactory.start(); |
174 | 3 | b.add(NAME_SPACE_FIELD, myDatabaseName + "." + myCollectionName); |
175 | 3 | b.add(CURSOR_ID_FIELD, cursorId); |
176 | 3 | b.add(SERVER_FIELD, myReadPerference.getServer()); |
177 | 3 | b.add(LIMIT_FIELD, myLimit); |
178 | 3 | b.add(BATCH_SIZE_FIELD, myBatchSize); |
179 | ||
180 | 3 | return b.build(); |
181 | } | |
182 | ||
183 | 2 | return null; |
184 | } | |
185 | ||
186 | /** | |
187 | * {@inheritDoc} | |
188 | * <p> | |
189 | * Overridden to close the iterator and send a {@link KillCursors} for the | |
190 | * open cursor, if any. | |
191 | * </p> | |
192 | */ | |
193 | @Override | |
194 | public void close() { | |
195 | 24 | long cursorId = myCursorId; |
196 | 24 | final Future<Reply> replyFuture = myNextReply; |
197 | ||
198 | 24 | myCurrentIterator = null; |
199 | 24 | myNextReply = null; |
200 | 24 | myCursorId = 0; |
201 | ||
202 | 24 | cursorId = retreiveCursorIdFromPendingRequest(cursorId, replyFuture); |
203 | ||
204 | 24 | if ((cursorId != 0) && !myShutdown) { |
205 | // The user asked us to leave the cursor be. | |
206 | 8 | myClient.send(new KillCursors(new long[] { cursorId }, |
207 | myReadPerference), null); | |
208 | } | |
209 | 24 | } |
210 | ||
211 | /** | |
212 | * {@inheritDoc} | |
213 | * <p> | |
214 | * Overridden to get the batch size from the original query or set | |
215 | * explicitly. | |
216 | * </p> | |
217 | */ | |
218 | @Override | |
219 | public int getBatchSize() { | |
220 | 5 | return myBatchSize; |
221 | } | |
222 | ||
223 | /** | |
224 | * Returns the iterator's read preference which points to the original | |
225 | * server performing the query. | |
226 | * | |
227 | * @return The iterator's read preference which points to the original | |
228 | * server performing the query. | |
229 | */ | |
230 | public ReadPreference getReadPerference() { | |
231 | 5 | return myReadPerference; |
232 | } | |
233 | ||
234 | /** | |
235 | * {@inheritDoc} | |
236 | * <p> | |
237 | * Overridden to return true if there are more documents. | |
238 | * </p> | |
239 | */ | |
240 | @Override | |
241 | public boolean hasNext() { | |
242 | 287 | if (myCurrentIterator == null) { |
243 | 33 | loadDocuments(); |
244 | } | |
245 | 254 | else if (!myCurrentIterator.hasNext() && (myNextReply != null)) { |
246 | 11 | loadDocuments(); |
247 | } | |
248 | 284 | return myCurrentIterator.hasNext(); |
249 | } | |
250 | ||
251 | /** | |
252 | * {@inheritDoc} | |
253 | * <p> | |
254 | * Overridden to return this iterator. | |
255 | * </p> | |
256 | */ | |
257 | @Override | |
258 | public Iterator<Document> iterator() { | |
259 | 14 | return this; |
260 | } | |
261 | ||
262 | /** | |
263 | * {@inheritDoc} | |
264 | * <p> | |
265 | * Overridden to return the next document from the query. | |
266 | * </p> | |
267 | * | |
268 | * @see java.util.Iterator#next() | |
269 | */ | |
270 | @Override | |
271 | public Document next() { | |
272 | 128 | if (hasNext()) { |
273 | 127 | return myCurrentIterator.next(); |
274 | } | |
275 | 1 | throw new NoSuchElementException("No more documents."); |
276 | } | |
277 | ||
278 | /** | |
279 | * Computes the size for the next batch of documents to get. | |
280 | * | |
281 | * @return The returnNex | |
282 | */ | |
283 | public int nextBatchSize() { | |
284 | 1000520 | if ((0 < myLimit) && (myLimit <= myBatchSize)) { |
285 | 1 | return myLimit; |
286 | } | |
287 | 1000519 | return myBatchSize; |
288 | } | |
289 | ||
290 | /** | |
291 | * {@inheritDoc} | |
292 | * <p> | |
293 | * Overridden to throw and {@link UnsupportedOperationException}. | |
294 | * </p> | |
295 | * | |
296 | * @see java.util.Iterator#remove() | |
297 | */ | |
298 | @Override | |
299 | public void remove() { | |
300 | 1 | throw new UnsupportedOperationException( |
301 | "Cannot remove a document via a MongoDB iterator."); | |
302 | } | |
303 | ||
304 | /** | |
305 | * Restarts the iterator by sending a request for more documents. | |
306 | * | |
307 | * @throws MongoDbException | |
308 | * On a failure to send the request for more document. | |
309 | */ | |
310 | public void restart() throws MongoDbException { | |
311 | 2 | sendRequest(); |
312 | 2 | } |
313 | ||
314 | /** | |
315 | * {@inheritDoc} | |
316 | * <p> | |
317 | * Overridden to set the batch size. | |
318 | * </p> | |
319 | */ | |
320 | @Override | |
321 | public void setBatchSize(final int batchSize) { | |
322 | 1 | myBatchSize = batchSize; |
323 | 1 | } |
324 | ||
325 | /** | |
326 | * Stops the iterator after consuming any received and/or requested batches. | |
327 | * <p> | |
328 | * <b>WARNING</b>: This will leave the cursor open on the server. Users | |
329 | * should persist the state of the cursor as returned from | |
330 | * {@link #asDocument()} and restart the cursor using one of the | |
331 | * {@link MongoClient#restart(com.allanbank.mongodb.bson.DocumentAssignable)} | |
332 | * or | |
333 | * {@link MongoClient#restart(com.allanbank.mongodb.StreamCallback, com.allanbank.mongodb.bson.DocumentAssignable)} | |
334 | * methods. Use with extreme caution. | |
335 | * </p> | |
336 | * <p> | |
337 | * The iterator will naturally stop ({@link #hasNext()} will return false) | |
338 | * when the current batch and any already requested batches are finished. | |
339 | * </p> | |
340 | */ | |
341 | @Override | |
342 | public void stop() { | |
343 | 1 | myShutdown = true; |
344 | 1 | } |
345 | ||
346 | /** | |
347 | * {@inheritDoc} | |
348 | * <p> | |
349 | * Overridden to return the remaining elements as a array. | |
350 | * </p> | |
351 | */ | |
352 | @Override | |
353 | public Object[] toArray() { | |
354 | 1 | final List<Document> remaining = toList(); |
355 | ||
356 | 1 | return remaining.toArray(); |
357 | } | |
358 | ||
359 | /** | |
360 | * {@inheritDoc} | |
361 | * <p> | |
362 | * Overridden to return the remaining elements as a array. | |
363 | * </p> | |
364 | */ | |
365 | @Override | |
366 | public <S> S[] toArray(final S[] to) { | |
367 | 1 | final List<Document> remaining = toList(); |
368 | ||
369 | 1 | return remaining.toArray(to); |
370 | } | |
371 | ||
372 | /** | |
373 | * {@inheritDoc} | |
374 | * <p> | |
375 | * Overridden to return the remaining elements as a list. | |
376 | * </p> | |
377 | */ | |
378 | @Override | |
379 | public List<Document> toList() { | |
380 | 3 | final List<Document> remaining = new ArrayList<Document>(); |
381 | ||
382 | 18 | while (hasNext()) { |
383 | 15 | remaining.add(next()); |
384 | } | |
385 | ||
386 | 3 | return remaining; |
387 | } | |
388 | ||
389 | /** | |
390 | * Returns the client value. | |
391 | * | |
392 | * @return The client value. | |
393 | */ | |
394 | protected Client getClient() { | |
395 | 3 | return myClient; |
396 | } | |
397 | ||
398 | /** | |
399 | * Returns the collection name. | |
400 | * | |
401 | * @return The collection name. | |
402 | */ | |
403 | protected String getCollectionName() { | |
404 | 3 | return myCollectionName; |
405 | } | |
406 | ||
407 | /** | |
408 | * Returns the cursor Id value. | |
409 | * | |
410 | * @return The cursor Id value. | |
411 | */ | |
412 | protected long getCursorId() { | |
413 | 3 | return myCursorId; |
414 | } | |
415 | ||
416 | /** | |
417 | * Returns the database name value. | |
418 | * | |
419 | * @return The database name value. | |
420 | */ | |
421 | protected String getDatabaseName() { | |
422 | 3 | return myDatabaseName; |
423 | } | |
424 | ||
425 | /** | |
426 | * Returns the limit value. | |
427 | * | |
428 | * @return The limit value. | |
429 | */ | |
430 | protected int getLimit() { | |
431 | 3 | return myLimit; |
432 | } | |
433 | ||
434 | /** | |
435 | * Loads more documents into the iterator. This iterator issues a get_more | |
436 | * command as soon as the previous results start to be used. | |
437 | * | |
438 | * @throws RuntimeException | |
439 | * On a failure to load documents. | |
440 | */ | |
441 | protected void loadDocuments() throws RuntimeException { | |
442 | 44 | loadDocuments(true); |
443 | 41 | } |
444 | ||
445 | /** | |
446 | * Loads more documents into the iterator. This iterator issues a get_more | |
447 | * command as soon as the previous results start to be used. | |
448 | * | |
449 | * @param blockForTailable | |
450 | * If true then the method will recursively call itself on a | |
451 | * tailable cursor with no results. This makes the call blocking. | |
452 | * It false then the call will not block. This is used by the | |
453 | * method to ensure that the outermost load blocks but the | |
454 | * recursion is not inifinite. | |
455 | * @return The list of loaded documents. | |
456 | * | |
457 | * @throws RuntimeException | |
458 | * On a failure to load documents. | |
459 | */ | |
460 | protected List<Document> loadDocuments(final boolean blockForTailable) | |
461 | throws RuntimeException { | |
462 | List<Document> docs; | |
463 | try { | |
464 | // Pull the reply from the future. Hopefully it is already there! | |
465 | 1000545 | final Reply reply = myNextReply.get(); |
466 | 1000544 | if (reply.isCursorNotFound() || reply.isQueryFailed()) { |
467 | 2 | final long cursorid = myCursorId; |
468 | 2 | myCursorId = 0; |
469 | 2 | throw new CursorNotFoundException(reply, "Cursor id (" |
470 | + cursorid + ") not found by the MongoDB server."); | |
471 | } | |
472 | ||
473 | 1000542 | myCursorId = reply.getCursorId(); |
474 | ||
475 | // Setup and iterator over the documents and adjust the limit | |
476 | // for the documents we have. Do this before the fetch again | |
477 | // so the nextBatchSize() has the updated limit. | |
478 | 1000542 | docs = reply.getResults(); |
479 | 1000542 | myCurrentIterator = docs.iterator(); |
480 | 1000542 | if (0 < myLimit) { |
481 | // Check if we have too many docs. | |
482 | 5 | if (myLimit <= docs.size()) { |
483 | 2 | myCurrentIterator = docs.subList(0, myLimit).iterator(); |
484 | 2 | if (myCursorId != 0) { |
485 | // Kill the cursor. | |
486 | 1 | myClient.send(new KillCursors( |
487 | new long[] { myCursorId }, myReadPerference), | |
488 | null); | |
489 | 1 | myCursorId = 0; |
490 | } | |
491 | } | |
492 | 5 | myLimit -= docs.size(); |
493 | } | |
494 | ||
495 | // Pre-fetch the next set of documents while we iterate over the | |
496 | // documents we just got. | |
497 | 1000542 | if ((myCursorId != 0) && !myShutdown) { |
498 | 1000515 | sendRequest(); |
499 | ||
500 | // Include the (myNextReply != null) to catch failures on the | |
501 | // server. | |
502 | 2001016 | while (docs.isEmpty() && blockForTailable |
503 | && (myNextReply != null)) { | |
504 | // Tailable - Wait for a reply with documents. | |
505 | 1000501 | docs = loadDocuments(false); |
506 | } | |
507 | } | |
508 | else { | |
509 | // Exhausted the cursor or are shutting down - no more results. | |
510 | 27 | myNextReply = null; |
511 | ||
512 | // Don't need to kill the cursor since we exhausted it or are | |
513 | // shutting down. | |
514 | } | |
515 | ||
516 | } | |
517 | 1 | catch (final InterruptedException e) { |
518 | 1 | throw new RuntimeException(e); |
519 | } | |
520 | 0 | catch (final ExecutionException e) { |
521 | 0 | throw new RuntimeException(e); |
522 | 1000542 | } |
523 | ||
524 | 1000542 | return docs; |
525 | } | |
526 | ||
527 | /** | |
528 | * If the current cursor id is zero then waits for the response from the | |
529 | * pending request to determine the real cursor id. | |
530 | * | |
531 | * @param cursorId | |
532 | * The presumed cursor id. | |
533 | * @param replyFuture | |
534 | * The pending reply's future. | |
535 | * @return The best known cursor id. | |
536 | */ | |
537 | protected long retreiveCursorIdFromPendingRequest(final long cursorId, | |
538 | final Future<Reply> replyFuture) { | |
539 | // May not have processed any of the results yet... | |
540 | 29 | if ((cursorId == 0) && (replyFuture != null)) { |
541 | try { | |
542 | 9 | final Reply reply = replyFuture.get(); |
543 | ||
544 | 9 | return reply.getCursorId(); |
545 | } | |
546 | 0 | catch (final InterruptedException e) { |
547 | 0 | LOG.warn(e, "Interrupted waiting for a query reply: {}", |
548 | e.getMessage()); | |
549 | } | |
550 | 0 | catch (final ExecutionException e) { |
551 | 0 | LOG.warn(e, "Interrupted waiting for a query reply: {}", |
552 | e.getMessage()); | |
553 | 0 | } |
554 | } | |
555 | 20 | return cursorId; |
556 | } | |
557 | ||
558 | /** | |
559 | * Sends a request for more documents. | |
560 | * | |
561 | * @throws MongoDbException | |
562 | * On a failure to send the request for more document. | |
563 | */ | |
564 | protected void sendRequest() throws MongoDbException { | |
565 | 1000517 | final GetMore getMore = new GetMore(myDatabaseName, myCollectionName, |
566 | myCursorId, nextBatchSize(), myReadPerference); | |
567 | ||
568 | 1000517 | myNextReply = new FutureReplyCallback(); |
569 | 1000517 | myClient.send(getMore, myNextReply); |
570 | 1000517 | } |
571 | } |