1 /*
2 * #%L
3 * CursorStreamingCallback.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20
21 package com.allanbank.mongodb.client.callback;
22
23 import java.util.List;
24
25 import com.allanbank.mongodb.MongoCursorControl;
26 import com.allanbank.mongodb.MongoDbException;
27 import com.allanbank.mongodb.ReadPreference;
28 import com.allanbank.mongodb.StreamCallback;
29 import com.allanbank.mongodb.bson.Document;
30 import com.allanbank.mongodb.bson.NumericElement;
31 import com.allanbank.mongodb.bson.builder.BuilderFactory;
32 import com.allanbank.mongodb.bson.builder.DocumentBuilder;
33 import com.allanbank.mongodb.bson.element.StringElement;
34 import com.allanbank.mongodb.client.Client;
35 import com.allanbank.mongodb.client.MongoIteratorImpl;
36 import com.allanbank.mongodb.client.message.CursorableMessage;
37 import com.allanbank.mongodb.client.message.GetMore;
38 import com.allanbank.mongodb.client.message.KillCursors;
39 import com.allanbank.mongodb.client.message.Query;
40 import com.allanbank.mongodb.client.message.Reply;
41 import com.allanbank.mongodb.error.ReplyException;
42
43 /**
44 * Callback to convert a {@link CursorableMessage} {@link Reply} into a series
45 * of callback for each document received.
46 *
47 * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
48 * mutated in incompatible ways between any two releases of the driver.
49 * @copyright 2012-2014, Allanbank Consulting, Inc., All Rights Reserved
50 */
51 public final class CursorStreamingCallback extends
52 AbstractValidatingReplyCallback implements MongoCursorControl,
53 AddressAware {
54
55 /** The server the original request was sent to. */
56 private volatile String myAddress;
57
58 /** The requested batch size. */
59 private int myBatchSize;
60
61 /** The original query. */
62 private final Client myClient;
63
64 /**
65 * Flag to indictate that the stream has been closed.
66 */
67 private volatile boolean myClosed = false;
68
69 /** The name of the collection the query was originally created on. */
70 private final String myCollectionName;
71
72 /** If true then the callback should expect a command formated cursor reply. */
73 private boolean myCommand;
74
75 /** The original query. */
76 private long myCursorId = 0;
77
78 /** The name of the database the query was originally created on. */
79 private final String myDatabaseName;
80
81 /** The callback to forward the returned documents to. */
82 private final StreamCallback<Document> myForwardCallback;
83
84 /**
85 * The maximum number of document to return from the cursor. Zero or
86 * negative means all.
87 */
88 private int myLimit = 0;
89
90 /** The original message that started the cursor, if known. */
91 private final CursorableMessage myMessage;
92
93 /** The last reply. */
94 private volatile Reply myReply;
95
96 /**
97 * Flag to shutdown this iterator gracefully without closing the cursor on
98 * the server.
99 */
100 private boolean myShutdown = false;
101
102 /**
103 * Create a new CursorCallback.
104 *
105 * @param client
106 * The client interface to the server.
107 * @param originalMessage
108 * The original message.
109 * @param command
110 * If true then the callback should expect a command formated
111 * cursor reply.
112 * @param results
113 * The callback to update with each document.
114 */
115 public CursorStreamingCallback(final Client client,
116 final CursorableMessage originalMessage, final boolean command,
117 final StreamCallback<Document> results) {
118
119 myClient = client;
120 myDatabaseName = originalMessage.getDatabaseName();
121 myCollectionName = originalMessage.getCollectionName();
122 myBatchSize = originalMessage.getBatchSize();
123 myMessage = originalMessage;
124 myCommand = command;
125 myForwardCallback = results;
126 myLimit = originalMessage.getLimit();
127 }
128
129 /**
130 * Create a new CursorCallback from a cursor document.
131 *
132 * @param client
133 * The client interface to the server.
134 * @param cursorDocument
135 * The original query.
136 * @param results
137 * The callback to update with each document.
138 *
139 * @see MongoIteratorImpl#asDocument()
140 */
141 public CursorStreamingCallback(final Client client,
142 final Document cursorDocument,
143 final StreamCallback<Document> results) {
144
145 final String ns = cursorDocument.get(StringElement.class,
146 NAME_SPACE_FIELD).getValue();
147 String db = ns;
148 String collection = ns;
149 final int index = ns.indexOf('.');
150 if (0 < index) {
151 db = ns.substring(0, index);
152 collection = ns.substring(index + 1);
153 }
154
155 myMessage = null;
156 myCommand = false;
157 myClient = client;
158 myDatabaseName = db;
159 myCollectionName = collection;
160 myForwardCallback = results;
161 myCursorId = cursorDocument.get(NumericElement.class, CURSOR_ID_FIELD)
162 .getLongValue();
163 myLimit = cursorDocument.get(NumericElement.class, LIMIT_FIELD)
164 .getIntValue();
165 myBatchSize = cursorDocument
166 .get(NumericElement.class, BATCH_SIZE_FIELD).getIntValue();
167 myAddress = cursorDocument.get(StringElement.class, SERVER_FIELD)
168 .getValue();
169 }
170
171 /**
172 * {@inheritDoc}
173 * <p>
174 * Overridden to return the current state of the stream as a document.
175 * </p>
176 */
177 @Override
178 public Document asDocument() {
179 final long cursorId = myCursorId;
180
181 if (cursorId != 0) {
182 final DocumentBuilder b = BuilderFactory.start();
183 b.add(NAME_SPACE_FIELD, myDatabaseName + "." + myCollectionName);
184 b.add(CURSOR_ID_FIELD, cursorId);
185 b.add(SERVER_FIELD, myAddress);
186 b.add(LIMIT_FIELD, myLimit);
187 b.add(BATCH_SIZE_FIELD, myBatchSize);
188
189 return b.build();
190 }
191 return null;
192 }
193
194 /**
195 * Overridden to close the iterator and send a {@link KillCursors} for the
196 * open cursor, if any.
197 */
198 @Override
199 public void close() {
200 synchronized (myForwardCallback) {
201 myClosed = true;
202 sendKill();
203 }
204 }
205
206 /**
207 * {@inheritDoc}
208 * <p>
209 * Overridden to forward the error the the user.
210 * </p>
211 */
212 @Override
213 public void exception(final Throwable thrown) {
214 try {
215 synchronized (myForwardCallback) {
216 myForwardCallback.exception(thrown);
217 }
218 }
219 finally {
220 close();
221 }
222 }
223
224 /**
225 * Returns the server the original request was sent to.
226 *
227 * @return The server the original request was sent to.
228 */
229 public String getAddress() {
230 return myAddress;
231 }
232
233 /**
234 * {@inheritDoc}
235 * <p>
236 * Overridden to set the batch size.
237 * </p>
238 */
239 @Override
240 public int getBatchSize() {
241 return myBatchSize;
242 }
243
244 /**
245 * Returns the client value.
246 *
247 * @return The client value.
248 */
249 public Client getClient() {
250 return myClient;
251 }
252
253 /**
254 * Returns the collection name.
255 *
256 * @return The collection name.
257 */
258 public String getCollectionName() {
259 return myCollectionName;
260 }
261
262 /**
263 * Returns the cursor Id value.
264 *
265 * @return The cursor Id value.
266 */
267 public long getCursorId() {
268 return myCursorId;
269 }
270
271 /**
272 * Returns the database name value.
273 *
274 * @return The database name value.
275 */
276 public String getDatabaseName() {
277 return myDatabaseName;
278 }
279
280 /**
281 * Returns the limit value.
282 *
283 * @return The limit value.
284 */
285 public int getLimit() {
286 return myLimit;
287 }
288
289 /**
290 * {@inheritDoc}
291 * <p>
292 * Overridden to return false.
293 * </p>
294 */
295 @Override
296 public boolean isLightWeight() {
297 return false;
298 }
299
300 /**
301 * Restarts the stream by sending a request for the next batch of documents.
302 *
303 * @throws MongoDbException
304 * On a failure to send the request for more document.
305 */
306 public void restart() {
307 sendRequest();
308 }
309
310 /**
311 * Sets the value of the server the original request was sent to.
312 *
313 * @param address
314 * The new value for the server the original request was sent to.
315 */
316 @Override
317 public void setAddress(final String address) {
318 myAddress = address;
319 // For races make sure that the push has the server name.
320 if (myReply != null) {
321 final Reply reply = myReply;
322 myReply = null;
323 push(reply);
324 }
325 }
326
327 /**
328 * {@inheritDoc}
329 * <p>
330 * Overridden to get the batch size.
331 * </p>
332 */
333 @Override
334 public void setBatchSize(final int batchSize) {
335 myBatchSize = batchSize;
336 }
337
338 /**
339 * {@inheritDoc}
340 * <p>
341 * Overridden to stop requesting more batches of documents.
342 * </p>
343 */
344 @Override
345 public void stop() {
346 myShutdown = true;
347 }
348
349 /**
350 * {@inheritDoc}
351 * <p>
352 * Overridden to add the {@link Query} to the exception.
353 * </p>
354 *
355 * @see AbstractReplyCallback#asError(Reply, int, int, String)
356 */
357 @Override
358 protected MongoDbException asError(final Reply reply, final int okValue,
359 final int errorNumber, final String errorMessage) {
360 return new ReplyException(okValue, errorNumber, errorMessage,
361 myMessage, reply);
362 }
363
364 /**
365 * {@inheritDoc}
366 * <p>
367 * Overridden to push the documents to the application's callback.
368 * </p>
369 *
370 * @see AbstractReplyCallback#convert(Reply)
371 */
372 @Override
373 protected void handle(final Reply reply) throws MongoDbException {
374 // Handle the first reply being from a command.
375 Reply result = reply;
376 if (isCommand()) {
377 result = CommandCursorTranslator.translate(reply);
378
379 // But only the first reply...
380 myCommand = false;
381 }
382
383 myReply = result;
384 if (myAddress != null) {
385 push(result);
386 }
387 }
388
389 /**
390 * Returns true if the callback should expect a command formated cursor
391 * reply.
392 *
393 * @return True if the callback should expect a command formated cursor
394 * reply.
395 */
396 protected boolean isCommand() {
397 return myCommand;
398 }
399
400 /**
401 * Loads more documents. This issues a get_more command as soon as the
402 * previous results start to be used.
403 *
404 * @param reply
405 * The last reply received.
406 * @return The list of loaded documents.
407 *
408 * @throws RuntimeException
409 * On a failure to load documents.
410 */
411 protected List<Document> loadDocuments(final Reply reply)
412 throws RuntimeException {
413
414 myCursorId = reply.getCursorId();
415
416 // Setup the documents and adjust the limit for the documents we have.
417 // Do this before the fetch again so the nextBatchSize() has the updated
418 // limit.
419 List<Document> docs = reply.getResults();
420 if (0 < myLimit) {
421 // Check if we have too many docs.
422 if (myLimit <= docs.size()) {
423 docs = docs.subList(0, myLimit);
424 close();
425 }
426 myLimit -= docs.size();
427 }
428
429 // Pre-fetch the next set of documents while we iterate over the
430 // documents we just got.
431 if ((myCursorId != 0) && !myShutdown) {
432 sendRequest();
433 }
434 // Exhausted the cursor - no more results.
435 // Don't need to kill the cursor since we exhausted it.
436
437 return docs;
438 }
439
440 /**
441 * Computes the size for the next batch of documents to get.
442 *
443 * @return The returnNex
444 */
445 protected int nextBatchSize() {
446 if ((0 < myLimit) && (myLimit <= myBatchSize)) {
447 return myLimit;
448 }
449 return myBatchSize;
450 }
451
452 /**
453 * Sends a {@link KillCursors} message if there is an active cursor.
454 *
455 * @throws MongoDbException
456 * On a failure to send the {@link KillCursors} message.
457 */
458 protected void sendKill() throws MongoDbException {
459 final long cursorId = myCursorId;
460 if ((cursorId != 0) && !myShutdown) {
461 myCursorId = 0;
462 myClient.send(new KillCursors(new long[] { cursorId },
463 ReadPreference.server(myAddress)), null);
464 }
465 }
466
467 /**
468 * Sends a request to start the next match of documents.
469 *
470 * @throws MongoDbException
471 * On a failure to send the request.
472 */
473 protected void sendRequest() throws MongoDbException {
474 final GetMore getMore = new GetMore(myDatabaseName, myCollectionName,
475 myCursorId, nextBatchSize(), ReadPreference.server(myAddress));
476
477 myClient.send(getMore, this);
478 }
479
480 /**
481 * Pushes the results from the reply to the application's callback.
482 *
483 * @param reply
484 * The reply containing the results to push to the application's
485 * callback.
486 */
487 private void push(final Reply reply) {
488 // Request the load in the synchronized block so there is only 1
489 // outstanding request.
490 synchronized (myForwardCallback) {
491 if (myClosed) {
492 myCursorId = reply.getCursorId();
493 sendKill();
494 }
495 else {
496 try {
497 for (final Document document : loadDocuments(reply)) {
498 myForwardCallback.callback(document);
499 }
500 if (myCursorId == 0) {
501 // Signal the end of the results.
502 myForwardCallback.done();
503 }
504 }
505 catch (final RuntimeException re) {
506 exception(re);
507 close();
508 }
509 }
510 }
511 }
512 }