1 /*
2 * #%L
3 * Reply.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20 package com.allanbank.mongodb.client.message;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
25
26 import com.allanbank.mongodb.ReadPreference;
27 import com.allanbank.mongodb.bson.Document;
28 import com.allanbank.mongodb.bson.io.BsonInputStream;
29 import com.allanbank.mongodb.bson.io.BsonOutputStream;
30 import com.allanbank.mongodb.bson.io.BufferingBsonOutputStream;
31 import com.allanbank.mongodb.client.Message;
32 import com.allanbank.mongodb.client.Operation;
33 import com.allanbank.mongodb.error.DocumentToLargeException;
34
35 /**
36 * Message received from the database in <a href=
37 * "http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-OPREPLY"
38 * >reply</a> to a query.
39 *
40 * <pre>
41 * <code>
42 * struct {
43 * MsgHeader header; // standard message header
44 * int32 responseFlags; // bit vector - see details below
45 * int64 cursorID; // cursor id if client needs to do get more's
46 * int32 startingFrom; // where in the cursor this reply is starting
47 * int32 numberReturned; // number of documents in the reply
48 * document* documents; // documents
49 * }
50 * </code>
51 * </pre>
52 *
53 *
54 * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
55 * mutated in incompatible ways between any two releases of the driver.
56 * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved
57 */
58 public class Reply extends AbstractMessage {
59 /** Bit for the await capable flag. */
60 public static final int AWAIT_CAPABLE_BIT = 8;
61
62 /** Bit for the cursor not found flag. */
63 public static final int CURSOR_NOT_FOUND_BIT = 1;
64
65 /** Bit for the query failure flag. */
66 public static final int QUERY_FAILURE_BIT = 2;
67
68 /** Bit for the shard configuration stale flag. */
69 public static final int SHARD_CONFIG_STALE_BIT = 4;
70
71 /** Indicates the server is await capable for tailable cursors. */
72 private final boolean myAwaitCapable;
73
74 /**
75 * The id of the cursor if the user needs to do a get_more to get the
76 * complete results.
77 */
78 private final long myCursorId;
79
80 /** Indicates that the cursor in the <tt>getmore</tt> command was not found. */
81 private final boolean myCursorNotFound;
82
83 /** The offset (index) of the first document returned from the cursor. */
84 private final int myCursorOffset;
85
86 /** Indicates that the query failed. */
87 private final boolean myQueryFailed;
88
89 /** The id of the request this response is for. */
90 private final int myResponseToId;
91
92 /** The returned documents. */
93 private final List<Document> myResults;
94
95 /** Indicates (to a MongoS?) that its shard configuration is stale. */
96 private final boolean myShardConfigStale;
97
98 /**
99 * Creates a new Reply.
100 *
101 * @param header
102 * The header from the reply message.
103 * @param in
104 * Stream to read the reply message from.
105 * @throws IOException
106 * On a failure to read the reply.
107 */
108 public Reply(final Header header, final BsonInputStream in)
109 throws IOException {
110 init(".");
111
112 myResponseToId = header.getResponseId();
113
114 final int flags = in.readInt();
115 myCursorId = in.readLong();
116 myCursorOffset = in.readInt();
117
118 final int docCount = in.readInt();
119 myResults = new ArrayList<Document>(docCount);
120 for (int i = 0; i < docCount; ++i) {
121 myResults.add(in.readDocument());
122 }
123
124 myAwaitCapable = (flags & AWAIT_CAPABLE_BIT) == AWAIT_CAPABLE_BIT;
125 myCursorNotFound = (flags & CURSOR_NOT_FOUND_BIT) == CURSOR_NOT_FOUND_BIT;
126 myQueryFailed = (flags & QUERY_FAILURE_BIT) == QUERY_FAILURE_BIT;
127 myShardConfigStale = (flags & SHARD_CONFIG_STALE_BIT) == SHARD_CONFIG_STALE_BIT;
128 }
129
130 /**
131 * Creates a new Reply.
132 *
133 * @param responseToId
134 * The id of the request this response is for.
135 * @param cursorId
136 * The id of the cursor if the user needs to do a get_more to get
137 * the complete results.
138 * @param cursorOffset
139 * The offset (index) of the first document returned from the
140 * cursor.
141 * @param results
142 * The returned documents.
143 * @param awaitCapable
144 * If true, indicates the server is await capable for tailable
145 * cursors.
146 * @param cursorNotFound
147 * If true, indicates that the cursor in the <tt>get_more</tt>
148 * message was not found.
149 * @param queryFailed
150 * If true, indicates that the query failed.
151 * @param shardConfigStale
152 * If true, indicates (to a MongoS?) that its shard configuration
153 * is stale.
154 *
155 */
156 public Reply(final int responseToId, final long cursorId,
157 final int cursorOffset, final List<Document> results,
158 final boolean awaitCapable, final boolean cursorNotFound,
159 final boolean queryFailed, final boolean shardConfigStale) {
160 super("", "", ReadPreference.PRIMARY);
161
162 myResponseToId = responseToId;
163 myCursorId = cursorId;
164 myCursorOffset = cursorOffset;
165 myResults = new ArrayList<Document>(results);
166 myAwaitCapable = awaitCapable;
167 myCursorNotFound = cursorNotFound;
168 myQueryFailed = queryFailed;
169 myShardConfigStale = shardConfigStale;
170 }
171
172 /**
173 * Determines if the passed object is of this same type as this object and
174 * if so that its fields are equal.
175 *
176 * @param object
177 * The object to compare to.
178 *
179 * @see java.lang.Object#equals(java.lang.Object)
180 */
181 @Override
182 public boolean equals(final Object object) {
183 boolean result = false;
184 if (this == object) {
185 result = true;
186 }
187 else if ((object != null) && (getClass() == object.getClass())) {
188 final Reply other = (Reply) object;
189
190 // Base class fields are always the same ""."".
191 result = (myAwaitCapable == other.myAwaitCapable)
192 && (myCursorNotFound == other.myCursorNotFound)
193 && (myQueryFailed == other.myQueryFailed)
194 && (myShardConfigStale == other.myShardConfigStale)
195 && (myResponseToId == other.myResponseToId)
196 && (myCursorOffset == other.myCursorOffset)
197 && (myCursorId == other.myCursorId)
198 && myResults.equals(other.myResults);
199 }
200 return result;
201 }
202
203 /**
204 * Returns the id of the cursor if the user needs to do a get_more to get
205 * the complete results.
206 *
207 * @return The id of the cursor if the user needs to do a get_more to get
208 * the complete results.
209 */
210 public long getCursorId() {
211 return myCursorId;
212 }
213
214 /**
215 * Returns the offset (index) of the first document returned from the
216 * cursor.
217 *
218 * @return The offset (index) of the first document returned from the
219 * cursor.
220 */
221 public int getCursorOffset() {
222 return myCursorOffset;
223 }
224
225 /**
226 * {@inheritDoc}
227 * <p>
228 * Overridden to return the name of the operation: "REPLY".
229 * </p>
230 */
231 @Override
232 public String getOperationName() {
233 return Operation.REPLY.name();
234 }
235
236 /**
237 * Returns the id of the request this response is for.
238 *
239 * @return The id of the request this response is for.
240 */
241 public int getResponseToId() {
242 return myResponseToId;
243 }
244
245 /**
246 * Returns the query results.
247 *
248 * @return The query results.
249 */
250 public List<Document> getResults() {
251 return myResults;
252 }
253
254 /**
255 * Computes a reasonable hash code.
256 *
257 * @return The hash code value.
258 */
259 @Override
260 public int hashCode() {
261 int result = 1;
262 result = (31 * result) + super.hashCode();
263 result = (31 * result) + (myAwaitCapable ? 1 : 3);
264 result = (31 * result) + (myCursorNotFound ? 1 : 3);
265 result = (31 * result) + (myQueryFailed ? 1 : 3);
266 result = (31 * result) + (myShardConfigStale ? 1 : 3);
267 result = (31 * result) + myResponseToId;
268 result = (31 * result) + myCursorOffset;
269 result = (31 * result) + (int) (myCursorId >> Integer.SIZE);
270 result = (31 * result) + (int) myCursorId;
271 result = (31 * result) + myResults.hashCode();
272 return result;
273 }
274
275 /**
276 * Returns true if the server is await capable for tailable cursors.
277 *
278 * @return True if the server is await capable for tailable cursors.
279 */
280 public boolean isAwaitCapable() {
281 return myAwaitCapable;
282 }
283
284 /**
285 * Returns true if the cursor in the <tt>get_more</tt> message was not
286 * found.
287 *
288 * @return True if the cursor in the <tt>get_more</tt> message was not
289 * found.
290 */
291 public boolean isCursorNotFound() {
292 return myCursorNotFound;
293 }
294
295 /**
296 * Returns true if the query failed.
297 *
298 * @return True if the query failed.
299 */
300 public boolean isQueryFailed() {
301 return myQueryFailed;
302 }
303
304 /**
305 * Returns true if the shard configuration is stale.
306 *
307 * @return True if the shard configuration is stale.
308 */
309 public boolean isShardConfigStale() {
310 return myShardConfigStale;
311 }
312
313 /**
314 * {@inheritDoc}
315 * <p>
316 * Overridden to return the size of the {@link Query}.
317 * </p>
318 */
319 @Override
320 public int size() {
321
322 int size = HEADER_SIZE + 20;
323 // size += 4; // flags;
324 // size += 8; // cursorId
325 // size += 4; // cursorOffset
326 // size += 4; // result count.
327 for (final Document result : myResults) {
328 size += result.size();
329 }
330
331 return size;
332 }
333
334 /**
335 * {@inheritDoc}
336 * <p>
337 * Overrridden to be a no-op since we normally only receive a reply and
338 * don't care about the size.
339 * </p>
340 */
341 @Override
342 public void validateSize(final int maxDocumentSize)
343 throws DocumentToLargeException {
344 // Can't be too large.
345 }
346
347 /**
348 * {@inheritDoc}
349 * <p>
350 * Overridden to write the reply message.
351 * </p>
352 *
353 * @see Message#write(int, BsonOutputStream)
354 */
355 @Override
356 public void write(final int messageId, final BsonOutputStream out)
357 throws IOException {
358 final int flags = computeFlags();
359
360 int size = HEADER_SIZE;
361 size += 4; // flags;
362 size += 8; // cursorId
363 size += 4; // cursorOffset
364 size += 4; // result count.
365 for (final Document result : myResults) {
366 size += result.size();
367 }
368
369 writeHeader(out, messageId, myResponseToId, Operation.REPLY, size);
370 out.writeInt(flags);
371 out.writeLong(myCursorId);
372 out.writeInt(myCursorOffset);
373 out.writeInt(myResults.size());
374 for (final Document result : myResults) {
375 out.writeDocument(result);
376 }
377 }
378
379 /**
380 * {@inheritDoc}
381 * <p>
382 * Overridden to write the reply message.
383 * </p>
384 *
385 * @see Message#write(int, BsonOutputStream)
386 */
387 @Override
388 public void write(final int messageId, final BufferingBsonOutputStream out)
389 throws IOException {
390 final int flags = computeFlags();
391
392 final long start = writeHeader(out, messageId, myResponseToId,
393 Operation.REPLY);
394 out.writeInt(flags);
395 out.writeLong(myCursorId);
396 out.writeInt(myCursorOffset);
397 out.writeInt(myResults.size());
398 for (final Document result : myResults) {
399 out.writeDocument(result);
400 }
401 finishHeader(out, start);
402
403 out.flushBuffer();
404 }
405
406 /**
407 * Computes the message flags bit field.
408 *
409 * @return The message flags bit field.
410 */
411 private int computeFlags() {
412 int flags = 0;
413 if (myAwaitCapable) {
414 flags += AWAIT_CAPABLE_BIT;
415 }
416 if (myCursorNotFound) {
417 flags += CURSOR_NOT_FOUND_BIT;
418 }
419 if (myQueryFailed) {
420 flags += QUERY_FAILURE_BIT;
421 }
422 if (myShardConfigStale) {
423 flags += SHARD_CONFIG_STALE_BIT;
424 }
425 return flags;
426 }
427
428 }