Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
CursorStreamingCallback |
|
| 1.76;1.76 |
1 | /* | |
2 | * #%L | |
3 | * CursorStreamingCallback.java - mongodb-async-driver - Allanbank Consulting, Inc. | |
4 | * %% | |
5 | * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc. | |
6 | * %% | |
7 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
8 | * you may not use this file except in compliance with the License. | |
9 | * You may obtain a copy of the License at | |
10 | * | |
11 | * http://www.apache.org/licenses/LICENSE-2.0 | |
12 | * | |
13 | * Unless required by applicable law or agreed to in writing, software | |
14 | * distributed under the License is distributed on an "AS IS" BASIS, | |
15 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
16 | * See the License for the specific language governing permissions and | |
17 | * limitations under the License. | |
18 | * #L% | |
19 | */ | |
20 | ||
21 | package com.allanbank.mongodb.client.callback; | |
22 | ||
23 | import java.util.List; | |
24 | ||
25 | import com.allanbank.mongodb.MongoCursorControl; | |
26 | import com.allanbank.mongodb.MongoDbException; | |
27 | import com.allanbank.mongodb.ReadPreference; | |
28 | import com.allanbank.mongodb.StreamCallback; | |
29 | import com.allanbank.mongodb.bson.Document; | |
30 | import com.allanbank.mongodb.bson.NumericElement; | |
31 | import com.allanbank.mongodb.bson.builder.BuilderFactory; | |
32 | import com.allanbank.mongodb.bson.builder.DocumentBuilder; | |
33 | import com.allanbank.mongodb.bson.element.StringElement; | |
34 | import com.allanbank.mongodb.client.Client; | |
35 | import com.allanbank.mongodb.client.MongoIteratorImpl; | |
36 | import com.allanbank.mongodb.client.message.CursorableMessage; | |
37 | import com.allanbank.mongodb.client.message.GetMore; | |
38 | import com.allanbank.mongodb.client.message.KillCursors; | |
39 | import com.allanbank.mongodb.client.message.Query; | |
40 | import com.allanbank.mongodb.client.message.Reply; | |
41 | import com.allanbank.mongodb.error.ReplyException; | |
42 | ||
43 | /** | |
44 | * Callback to convert a {@link CursorableMessage} {@link Reply} into a series | |
45 | * of callback for each document received. | |
46 | * | |
47 | * @api.no This class is <b>NOT</b> part of the drivers API. This class may be | |
48 | * mutated in incompatible ways between any two releases of the driver. | |
49 | * @copyright 2012-2014, Allanbank Consulting, Inc., All Rights Reserved | |
50 | */ | |
51 | public final class CursorStreamingCallback extends | |
52 | AbstractValidatingReplyCallback implements MongoCursorControl, | |
53 | AddressAware { | |
54 | ||
55 | /** The server the original request was sent to. */ | |
56 | private volatile String myAddress; | |
57 | ||
58 | /** The requested batch size. */ | |
59 | private int myBatchSize; | |
60 | ||
61 | /** The original query. */ | |
62 | private final Client myClient; | |
63 | ||
64 | /** | |
65 | * Flag to indictate that the stream has been closed. | |
66 | */ | |
67 | 39 | private volatile boolean myClosed = false; |
68 | ||
69 | /** The name of the collection the query was originally created on. */ | |
70 | private final String myCollectionName; | |
71 | ||
72 | /** If true then the callback should expect a command formated cursor reply. */ | |
73 | private boolean myCommand; | |
74 | ||
75 | /** The original query. */ | |
76 | 39 | private long myCursorId = 0; |
77 | ||
78 | /** The name of the database the query was originally created on. */ | |
79 | private final String myDatabaseName; | |
80 | ||
81 | /** The callback to forward the returned documents to. */ | |
82 | private final StreamCallback<Document> myForwardCallback; | |
83 | ||
84 | /** | |
85 | * The maximum number of document to return from the cursor. Zero or | |
86 | * negative means all. | |
87 | */ | |
88 | 39 | private int myLimit = 0; |
89 | ||
90 | /** The original message that started the cursor, if known. */ | |
91 | private final CursorableMessage myMessage; | |
92 | ||
93 | /** The last reply. */ | |
94 | private volatile Reply myReply; | |
95 | ||
96 | /** | |
97 | * Flag to shutdown this iterator gracefully without closing the cursor on | |
98 | * the server. | |
99 | */ | |
100 | 39 | private boolean myShutdown = false; |
101 | ||
102 | /** | |
103 | * Create a new CursorCallback. | |
104 | * | |
105 | * @param client | |
106 | * The client interface to the server. | |
107 | * @param originalMessage | |
108 | * The original message. | |
109 | * @param command | |
110 | * If true then the callback should expect a command formated | |
111 | * cursor reply. | |
112 | * @param results | |
113 | * The callback to update with each document. | |
114 | */ | |
115 | public CursorStreamingCallback(final Client client, | |
116 | final CursorableMessage originalMessage, final boolean command, | |
117 | 36 | final StreamCallback<Document> results) { |
118 | ||
119 | 36 | myClient = client; |
120 | 36 | myDatabaseName = originalMessage.getDatabaseName(); |
121 | 36 | myCollectionName = originalMessage.getCollectionName(); |
122 | 36 | myBatchSize = originalMessage.getBatchSize(); |
123 | 36 | myMessage = originalMessage; |
124 | 36 | myCommand = command; |
125 | 36 | myForwardCallback = results; |
126 | 36 | myLimit = originalMessage.getLimit(); |
127 | 36 | } |
128 | ||
129 | /** | |
130 | * Create a new CursorCallback from a cursor document. | |
131 | * | |
132 | * @param client | |
133 | * The client interface to the server. | |
134 | * @param cursorDocument | |
135 | * The original query. | |
136 | * @param results | |
137 | * The callback to update with each document. | |
138 | * | |
139 | * @see MongoIteratorImpl#asDocument() | |
140 | */ | |
141 | public CursorStreamingCallback(final Client client, | |
142 | final Document cursorDocument, | |
143 | 3 | final StreamCallback<Document> results) { |
144 | ||
145 | 3 | final String ns = cursorDocument.get(StringElement.class, |
146 | NAME_SPACE_FIELD).getValue(); | |
147 | 3 | String db = ns; |
148 | 3 | String collection = ns; |
149 | 3 | final int index = ns.indexOf('.'); |
150 | 3 | if (0 < index) { |
151 | 2 | db = ns.substring(0, index); |
152 | 2 | collection = ns.substring(index + 1); |
153 | } | |
154 | ||
155 | 3 | myMessage = null; |
156 | 3 | myCommand = false; |
157 | 3 | myClient = client; |
158 | 3 | myDatabaseName = db; |
159 | 3 | myCollectionName = collection; |
160 | 3 | myForwardCallback = results; |
161 | 3 | myCursorId = cursorDocument.get(NumericElement.class, CURSOR_ID_FIELD) |
162 | .getLongValue(); | |
163 | 3 | myLimit = cursorDocument.get(NumericElement.class, LIMIT_FIELD) |
164 | .getIntValue(); | |
165 | 3 | myBatchSize = cursorDocument |
166 | .get(NumericElement.class, BATCH_SIZE_FIELD).getIntValue(); | |
167 | 3 | myAddress = cursorDocument.get(StringElement.class, SERVER_FIELD) |
168 | .getValue(); | |
169 | 3 | } |
170 | ||
171 | /** | |
172 | * {@inheritDoc} | |
173 | * <p> | |
174 | * Overridden to return the current state of the stream as a document. | |
175 | * </p> | |
176 | */ | |
177 | @Override | |
178 | public Document asDocument() { | |
179 | 4 | final long cursorId = myCursorId; |
180 | ||
181 | 4 | if (cursorId != 0) { |
182 | 2 | final DocumentBuilder b = BuilderFactory.start(); |
183 | 2 | b.add(NAME_SPACE_FIELD, myDatabaseName + "." + myCollectionName); |
184 | 2 | b.add(CURSOR_ID_FIELD, cursorId); |
185 | 2 | b.add(SERVER_FIELD, myAddress); |
186 | 2 | b.add(LIMIT_FIELD, myLimit); |
187 | 2 | b.add(BATCH_SIZE_FIELD, myBatchSize); |
188 | ||
189 | 2 | return b.build(); |
190 | } | |
191 | 2 | return null; |
192 | } | |
193 | ||
194 | /** | |
195 | * Overridden to close the iterator and send a {@link KillCursors} for the | |
196 | * open cursor, if any. | |
197 | */ | |
198 | @Override | |
199 | public void close() { | |
200 | 24 | synchronized (myForwardCallback) { |
201 | 24 | myClosed = true; |
202 | 24 | sendKill(); |
203 | 24 | } |
204 | 24 | } |
205 | ||
206 | /** | |
207 | * {@inheritDoc} | |
208 | * <p> | |
209 | * Overridden to forward the error the the user. | |
210 | * </p> | |
211 | */ | |
212 | @Override | |
213 | public void exception(final Throwable thrown) { | |
214 | try { | |
215 | 3 | synchronized (myForwardCallback) { |
216 | 3 | myForwardCallback.exception(thrown); |
217 | 3 | } |
218 | } | |
219 | finally { | |
220 | 3 | close(); |
221 | 3 | } |
222 | 3 | } |
223 | ||
224 | /** | |
225 | * Returns the server the original request was sent to. | |
226 | * | |
227 | * @return The server the original request was sent to. | |
228 | */ | |
229 | public String getAddress() { | |
230 | 5 | return myAddress; |
231 | } | |
232 | ||
233 | /** | |
234 | * {@inheritDoc} | |
235 | * <p> | |
236 | * Overridden to set the batch size. | |
237 | * </p> | |
238 | */ | |
239 | @Override | |
240 | public int getBatchSize() { | |
241 | 5 | return myBatchSize; |
242 | } | |
243 | ||
244 | /** | |
245 | * Returns the client value. | |
246 | * | |
247 | * @return The client value. | |
248 | */ | |
249 | public Client getClient() { | |
250 | 3 | return myClient; |
251 | } | |
252 | ||
253 | /** | |
254 | * Returns the collection name. | |
255 | * | |
256 | * @return The collection name. | |
257 | */ | |
258 | public String getCollectionName() { | |
259 | 3 | return myCollectionName; |
260 | } | |
261 | ||
262 | /** | |
263 | * Returns the cursor Id value. | |
264 | * | |
265 | * @return The cursor Id value. | |
266 | */ | |
267 | public long getCursorId() { | |
268 | 3 | return myCursorId; |
269 | } | |
270 | ||
271 | /** | |
272 | * Returns the database name value. | |
273 | * | |
274 | * @return The database name value. | |
275 | */ | |
276 | public String getDatabaseName() { | |
277 | 3 | return myDatabaseName; |
278 | } | |
279 | ||
280 | /** | |
281 | * Returns the limit value. | |
282 | * | |
283 | * @return The limit value. | |
284 | */ | |
285 | public int getLimit() { | |
286 | 3 | return myLimit; |
287 | } | |
288 | ||
289 | /** | |
290 | * {@inheritDoc} | |
291 | * <p> | |
292 | * Overridden to return false. | |
293 | * </p> | |
294 | */ | |
295 | @Override | |
296 | public boolean isLightWeight() { | |
297 | 0 | return false; |
298 | } | |
299 | ||
300 | /** | |
301 | * Restarts the stream by sending a request for the next batch of documents. | |
302 | * | |
303 | * @throws MongoDbException | |
304 | * On a failure to send the request for more document. | |
305 | */ | |
306 | public void restart() { | |
307 | 2 | sendRequest(); |
308 | 2 | } |
309 | ||
310 | /** | |
311 | * Sets the value of the server the original request was sent to. | |
312 | * | |
313 | * @param address | |
314 | * The new value for the server the original request was sent to. | |
315 | */ | |
316 | @Override | |
317 | public void setAddress(final String address) { | |
318 | 28 | myAddress = address; |
319 | // For races make sure that the push has the server name. | |
320 | 28 | if (myReply != null) { |
321 | 1 | final Reply reply = myReply; |
322 | 1 | myReply = null; |
323 | 1 | push(reply); |
324 | } | |
325 | 28 | } |
326 | ||
327 | /** | |
328 | * {@inheritDoc} | |
329 | * <p> | |
330 | * Overridden to get the batch size. | |
331 | * </p> | |
332 | */ | |
333 | @Override | |
334 | public void setBatchSize(final int batchSize) { | |
335 | 1 | myBatchSize = batchSize; |
336 | 1 | } |
337 | ||
338 | /** | |
339 | * {@inheritDoc} | |
340 | * <p> | |
341 | * Overridden to stop requesting more batches of documents. | |
342 | * </p> | |
343 | */ | |
344 | @Override | |
345 | public void stop() { | |
346 | 1 | myShutdown = true; |
347 | 1 | } |
348 | ||
349 | /** | |
350 | * {@inheritDoc} | |
351 | * <p> | |
352 | * Overridden to add the {@link Query} to the exception. | |
353 | * </p> | |
354 | * | |
355 | * @see AbstractReplyCallback#asError(Reply, int, int, String) | |
356 | */ | |
357 | @Override | |
358 | protected MongoDbException asError(final Reply reply, final int okValue, | |
359 | final int errorNumber, final String errorMessage) { | |
360 | 1 | return new ReplyException(okValue, errorNumber, errorMessage, |
361 | myMessage, reply); | |
362 | } | |
363 | ||
364 | /** | |
365 | * {@inheritDoc} | |
366 | * <p> | |
367 | * Overridden to push the documents to the application's callback. | |
368 | * </p> | |
369 | * | |
370 | * @see AbstractReplyCallback#convert(Reply) | |
371 | */ | |
372 | @Override | |
373 | protected void handle(final Reply reply) throws MongoDbException { | |
374 | // Handle the first reply being from a command. | |
375 | 29 | Reply result = reply; |
376 | 29 | if (isCommand()) { |
377 | 0 | result = CommandCursorTranslator.translate(reply); |
378 | ||
379 | // But only the first reply... | |
380 | 0 | myCommand = false; |
381 | } | |
382 | ||
383 | 29 | myReply = result; |
384 | 29 | if (myAddress != null) { |
385 | 28 | push(result); |
386 | } | |
387 | 29 | } |
388 | ||
389 | /** | |
390 | * Returns true if the callback should expect a command formated cursor | |
391 | * reply. | |
392 | * | |
393 | * @return True if the callback should expect a command formated cursor | |
394 | * reply. | |
395 | */ | |
396 | protected boolean isCommand() { | |
397 | 29 | return myCommand; |
398 | } | |
399 | ||
400 | /** | |
401 | * Loads more documents. This issues a get_more command as soon as the | |
402 | * previous results start to be used. | |
403 | * | |
404 | * @param reply | |
405 | * The last reply received. | |
406 | * @return The list of loaded documents. | |
407 | * | |
408 | * @throws RuntimeException | |
409 | * On a failure to load documents. | |
410 | */ | |
411 | protected List<Document> loadDocuments(final Reply reply) | |
412 | throws RuntimeException { | |
413 | ||
414 | 28 | myCursorId = reply.getCursorId(); |
415 | ||
416 | // Setup the documents and adjust the limit for the documents we have. | |
417 | // Do this before the fetch again so the nextBatchSize() has the updated | |
418 | // limit. | |
419 | 28 | List<Document> docs = reply.getResults(); |
420 | 28 | if (0 < myLimit) { |
421 | // Check if we have too many docs. | |
422 | 9 | if (myLimit <= docs.size()) { |
423 | 3 | docs = docs.subList(0, myLimit); |
424 | 3 | close(); |
425 | } | |
426 | 9 | myLimit -= docs.size(); |
427 | } | |
428 | ||
429 | // Pre-fetch the next set of documents while we iterate over the | |
430 | // documents we just got. | |
431 | 28 | if ((myCursorId != 0) && !myShutdown) { |
432 | 5 | sendRequest(); |
433 | } | |
434 | // Exhausted the cursor - no more results. | |
435 | // Don't need to kill the cursor since we exhausted it. | |
436 | ||
437 | 28 | return docs; |
438 | } | |
439 | ||
440 | /** | |
441 | * Computes the size for the next batch of documents to get. | |
442 | * | |
443 | * @return The returnNex | |
444 | */ | |
445 | protected int nextBatchSize() { | |
446 | 10 | if ((0 < myLimit) && (myLimit <= myBatchSize)) { |
447 | 2 | return myLimit; |
448 | } | |
449 | 8 | return myBatchSize; |
450 | } | |
451 | ||
452 | /** | |
453 | * Sends a {@link KillCursors} message if there is an active cursor. | |
454 | * | |
455 | * @throws MongoDbException | |
456 | * On a failure to send the {@link KillCursors} message. | |
457 | */ | |
458 | protected void sendKill() throws MongoDbException { | |
459 | 25 | final long cursorId = myCursorId; |
460 | 25 | if ((cursorId != 0) && !myShutdown) { |
461 | 4 | myCursorId = 0; |
462 | 4 | myClient.send(new KillCursors(new long[] { cursorId }, |
463 | ReadPreference.server(myAddress)), null); | |
464 | } | |
465 | 25 | } |
466 | ||
467 | /** | |
468 | * Sends a request to start the next match of documents. | |
469 | * | |
470 | * @throws MongoDbException | |
471 | * On a failure to send the request. | |
472 | */ | |
473 | protected void sendRequest() throws MongoDbException { | |
474 | 7 | final GetMore getMore = new GetMore(myDatabaseName, myCollectionName, |
475 | myCursorId, nextBatchSize(), ReadPreference.server(myAddress)); | |
476 | ||
477 | 7 | myClient.send(getMore, this); |
478 | 7 | } |
479 | ||
480 | /** | |
481 | * Pushes the results from the reply to the application's callback. | |
482 | * | |
483 | * @param reply | |
484 | * The reply containing the results to push to the application's | |
485 | * callback. | |
486 | */ | |
487 | private void push(final Reply reply) { | |
488 | // Request the load in the synchronized block so there is only 1 | |
489 | // outstanding request. | |
490 | 29 | synchronized (myForwardCallback) { |
491 | 29 | if (myClosed) { |
492 | 1 | myCursorId = reply.getCursorId(); |
493 | 1 | sendKill(); |
494 | } | |
495 | else { | |
496 | try { | |
497 | 28 | for (final Document document : loadDocuments(reply)) { |
498 | 83 | myForwardCallback.callback(document); |
499 | 82 | } |
500 | 27 | if (myCursorId == 0) { |
501 | // Signal the end of the results. | |
502 | 21 | myForwardCallback.done(); |
503 | } | |
504 | } | |
505 | 1 | catch (final RuntimeException re) { |
506 | 1 | exception(re); |
507 | 1 | close(); |
508 | 27 | } |
509 | } | |
510 | 29 | } |
511 | 29 | } |
512 | } |