1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package com.allanbank.mongodb.client;
21
22 import java.util.ArrayList;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.NoSuchElementException;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.Future;
28
29 import com.allanbank.mongodb.MongoClient;
30 import com.allanbank.mongodb.MongoDbException;
31 import com.allanbank.mongodb.MongoIterator;
32 import com.allanbank.mongodb.ReadPreference;
33 import com.allanbank.mongodb.bson.Document;
34 import com.allanbank.mongodb.bson.NumericElement;
35 import com.allanbank.mongodb.bson.builder.BuilderFactory;
36 import com.allanbank.mongodb.bson.builder.DocumentBuilder;
37 import com.allanbank.mongodb.bson.element.StringElement;
38 import com.allanbank.mongodb.client.callback.FutureReplyCallback;
39 import com.allanbank.mongodb.client.message.CursorableMessage;
40 import com.allanbank.mongodb.client.message.GetMore;
41 import com.allanbank.mongodb.client.message.KillCursors;
42 import com.allanbank.mongodb.client.message.Reply;
43 import com.allanbank.mongodb.error.CursorNotFoundException;
44 import com.allanbank.mongodb.util.log.Log;
45 import com.allanbank.mongodb.util.log.LogFactory;
46
47
48
49
50
51
52
53
54 public class MongoIteratorImpl implements MongoIterator<Document> {
55
56
57 private static final Log LOG = LogFactory.getLog(MongoIteratorImpl.class);
58
59
60 private int myBatchSize = 0;
61
62
63 private final Client myClient;
64
65
66 private final String myCollectionName;
67
68
69 private Iterator<Document> myCurrentIterator;
70
71
72 private long myCursorId = 0;
73
74
75 private final String myDatabaseName;
76
77
78
79
80
81 private int myLimit = 0;
82
83
84 private FutureReplyCallback myNextReply;
85
86
87 private final ReadPreference myReadPerference;
88
89
90
91
92
93 private boolean myShutdown = false;
94
95
96
97
98
99
100
101
102
103
104
105
106
107 public MongoIteratorImpl(final CursorableMessage originalQuery,
108 final Client client, final String server, final Reply reply) {
109 myNextReply = new FutureReplyCallback();
110 myNextReply.callback(reply);
111
112 myReadPerference = ReadPreference.server(server);
113 myCursorId = 0;
114 myClient = client;
115 myCurrentIterator = null;
116 myBatchSize = originalQuery.getBatchSize();
117 myLimit = originalQuery.getLimit();
118 myDatabaseName = originalQuery.getDatabaseName();
119 myCollectionName = originalQuery.getCollectionName();
120
121 }
122
123
124
125
126
127
128
129
130
131
132
133 public MongoIteratorImpl(final Document cursorDocument, final Client client) {
134 final String ns = cursorDocument.get(StringElement.class,
135 NAME_SPACE_FIELD).getValue();
136 String db = ns;
137 String collection = ns;
138 final int index = ns.indexOf('.');
139 if (0 < index) {
140 db = ns.substring(0, index);
141 collection = ns.substring(index + 1);
142 }
143
144 myClient = client;
145 myDatabaseName = db;
146 myCollectionName = collection;
147 myCursorId = cursorDocument.get(NumericElement.class, CURSOR_ID_FIELD)
148 .getLongValue();
149 myLimit = cursorDocument.get(NumericElement.class, LIMIT_FIELD)
150 .getIntValue();
151 myBatchSize = cursorDocument
152 .get(NumericElement.class, BATCH_SIZE_FIELD).getIntValue();
153 myReadPerference = ReadPreference.server(cursorDocument.get(
154 StringElement.class, SERVER_FIELD).getValue());
155 }
156
157
158
159
160
161
162
163
164
165 @Override
166 public Document asDocument() {
167 long cursorId = myCursorId;
168 final Future<Reply> replyFuture = myNextReply;
169
170 cursorId = retreiveCursorIdFromPendingRequest(cursorId, replyFuture);
171
172 if (cursorId != 0) {
173 final DocumentBuilder b = BuilderFactory.start();
174 b.add(NAME_SPACE_FIELD, myDatabaseName + "." + myCollectionName);
175 b.add(CURSOR_ID_FIELD, cursorId);
176 b.add(SERVER_FIELD, myReadPerference.getServer());
177 b.add(LIMIT_FIELD, myLimit);
178 b.add(BATCH_SIZE_FIELD, myBatchSize);
179
180 return b.build();
181 }
182
183 return null;
184 }
185
186
187
188
189
190
191
192
193 @Override
194 public void close() {
195 long cursorId = myCursorId;
196 final Future<Reply> replyFuture = myNextReply;
197
198 myCurrentIterator = null;
199 myNextReply = null;
200 myCursorId = 0;
201
202 cursorId = retreiveCursorIdFromPendingRequest(cursorId, replyFuture);
203
204 if ((cursorId != 0) && !myShutdown) {
205
206 myClient.send(new KillCursors(new long[] { cursorId },
207 myReadPerference), null);
208 }
209 }
210
211
212
213
214
215
216
217
218 @Override
219 public int getBatchSize() {
220 return myBatchSize;
221 }
222
223
224
225
226
227
228
229
230 public ReadPreference getReadPerference() {
231 return myReadPerference;
232 }
233
234
235
236
237
238
239
240 @Override
241 public boolean hasNext() {
242 if (myCurrentIterator == null) {
243 loadDocuments();
244 }
245 else if (!myCurrentIterator.hasNext() && (myNextReply != null)) {
246 loadDocuments();
247 }
248 return myCurrentIterator.hasNext();
249 }
250
251
252
253
254
255
256
257 @Override
258 public Iterator<Document> iterator() {
259 return this;
260 }
261
262
263
264
265
266
267
268
269
270 @Override
271 public Document next() {
272 if (hasNext()) {
273 return myCurrentIterator.next();
274 }
275 throw new NoSuchElementException("No more documents.");
276 }
277
278
279
280
281
282
283 public int nextBatchSize() {
284 if ((0 < myLimit) && (myLimit <= myBatchSize)) {
285 return myLimit;
286 }
287 return myBatchSize;
288 }
289
290
291
292
293
294
295
296
297
298 @Override
299 public void remove() {
300 throw new UnsupportedOperationException(
301 "Cannot remove a document via a MongoDB iterator.");
302 }
303
304
305
306
307
308
309
310 public void restart() throws MongoDbException {
311 sendRequest();
312 }
313
314
315
316
317
318
319
320 @Override
321 public void setBatchSize(final int batchSize) {
322 myBatchSize = batchSize;
323 }
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341 @Override
342 public void stop() {
343 myShutdown = true;
344 }
345
346
347
348
349
350
351
352 @Override
353 public Object[] toArray() {
354 final List<Document> remaining = toList();
355
356 return remaining.toArray();
357 }
358
359
360
361
362
363
364
365 @Override
366 public <S> S[] toArray(final S[] to) {
367 final List<Document> remaining = toList();
368
369 return remaining.toArray(to);
370 }
371
372
373
374
375
376
377
378 @Override
379 public List<Document> toList() {
380 final List<Document> remaining = new ArrayList<Document>();
381
382 while (hasNext()) {
383 remaining.add(next());
384 }
385
386 return remaining;
387 }
388
389
390
391
392
393
394 protected Client getClient() {
395 return myClient;
396 }
397
398
399
400
401
402
403 protected String getCollectionName() {
404 return myCollectionName;
405 }
406
407
408
409
410
411
412 protected long getCursorId() {
413 return myCursorId;
414 }
415
416
417
418
419
420
421 protected String getDatabaseName() {
422 return myDatabaseName;
423 }
424
425
426
427
428
429
430 protected int getLimit() {
431 return myLimit;
432 }
433
434
435
436
437
438
439
440
441 protected void loadDocuments() throws RuntimeException {
442 loadDocuments(true);
443 }
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460 protected List<Document> loadDocuments(final boolean blockForTailable)
461 throws RuntimeException {
462 List<Document> docs;
463 try {
464
465 final Reply reply = myNextReply.get();
466 if (reply.isCursorNotFound() || reply.isQueryFailed()) {
467 final long cursorid = myCursorId;
468 myCursorId = 0;
469 throw new CursorNotFoundException(reply, "Cursor id ("
470 + cursorid + ") not found by the MongoDB server.");
471 }
472
473 myCursorId = reply.getCursorId();
474
475
476
477
478 docs = reply.getResults();
479 myCurrentIterator = docs.iterator();
480 if (0 < myLimit) {
481
482 if (myLimit <= docs.size()) {
483 myCurrentIterator = docs.subList(0, myLimit).iterator();
484 if (myCursorId != 0) {
485
486 myClient.send(new KillCursors(
487 new long[] { myCursorId }, myReadPerference),
488 null);
489 myCursorId = 0;
490 }
491 }
492 myLimit -= docs.size();
493 }
494
495
496
497 if ((myCursorId != 0) && !myShutdown) {
498 sendRequest();
499
500
501
502 while (docs.isEmpty() && blockForTailable
503 && (myNextReply != null)) {
504
505 docs = loadDocuments(false);
506 }
507 }
508 else {
509
510 myNextReply = null;
511
512
513
514 }
515
516 }
517 catch (final InterruptedException e) {
518 throw new RuntimeException(e);
519 }
520 catch (final ExecutionException e) {
521 throw new RuntimeException(e);
522 }
523
524 return docs;
525 }
526
527
528
529
530
531
532
533
534
535
536
537 protected long retreiveCursorIdFromPendingRequest(final long cursorId,
538 final Future<Reply> replyFuture) {
539
540 if ((cursorId == 0) && (replyFuture != null)) {
541 try {
542 final Reply reply = replyFuture.get();
543
544 return reply.getCursorId();
545 }
546 catch (final InterruptedException e) {
547 LOG.warn(e, "Interrupted waiting for a query reply: {}",
548 e.getMessage());
549 }
550 catch (final ExecutionException e) {
551 LOG.warn(e, "Interrupted waiting for a query reply: {}",
552 e.getMessage());
553 }
554 }
555 return cursorId;
556 }
557
558
559
560
561
562
563
564 protected void sendRequest() throws MongoDbException {
565 final GetMore getMore = new GetMore(myDatabaseName, myCollectionName,
566 myCursorId, nextBatchSize(), myReadPerference);
567
568 myNextReply = new FutureReplyCallback();
569 myClient.send(getMore, myNextReply);
570 }
571 }