1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package com.allanbank.mongodb.client.message;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
25
26 import com.allanbank.mongodb.ReadPreference;
27 import com.allanbank.mongodb.bson.Document;
28 import com.allanbank.mongodb.bson.io.BsonInputStream;
29 import com.allanbank.mongodb.bson.io.BsonOutputStream;
30 import com.allanbank.mongodb.bson.io.BufferingBsonOutputStream;
31 import com.allanbank.mongodb.client.Message;
32 import com.allanbank.mongodb.client.Operation;
33 import com.allanbank.mongodb.error.DocumentToLargeException;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 public class Reply extends AbstractMessage {
59
60 public static final int AWAIT_CAPABLE_BIT = 8;
61
62
63 public static final int CURSOR_NOT_FOUND_BIT = 1;
64
65
66 public static final int QUERY_FAILURE_BIT = 2;
67
68
69 public static final int SHARD_CONFIG_STALE_BIT = 4;
70
71
72 private final boolean myAwaitCapable;
73
74
75
76
77
78 private final long myCursorId;
79
80
81 private final boolean myCursorNotFound;
82
83
84 private final int myCursorOffset;
85
86
87 private final boolean myQueryFailed;
88
89
90 private final int myResponseToId;
91
92
93 private final List<Document> myResults;
94
95
96 private final boolean myShardConfigStale;
97
98
99
100
101
102
103
104
105
106
107
108 public Reply(final Header header, final BsonInputStream in)
109 throws IOException {
110 init(".");
111
112 myResponseToId = header.getResponseId();
113
114 final int flags = in.readInt();
115 myCursorId = in.readLong();
116 myCursorOffset = in.readInt();
117
118 final int docCount = in.readInt();
119 myResults = new ArrayList<Document>(docCount);
120 for (int i = 0; i < docCount; ++i) {
121 myResults.add(in.readDocument());
122 }
123
124 myAwaitCapable = (flags & AWAIT_CAPABLE_BIT) == AWAIT_CAPABLE_BIT;
125 myCursorNotFound = (flags & CURSOR_NOT_FOUND_BIT) == CURSOR_NOT_FOUND_BIT;
126 myQueryFailed = (flags & QUERY_FAILURE_BIT) == QUERY_FAILURE_BIT;
127 myShardConfigStale = (flags & SHARD_CONFIG_STALE_BIT) == SHARD_CONFIG_STALE_BIT;
128 }
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156 public Reply(final int responseToId, final long cursorId,
157 final int cursorOffset, final List<Document> results,
158 final boolean awaitCapable, final boolean cursorNotFound,
159 final boolean queryFailed, final boolean shardConfigStale) {
160 super("", "", ReadPreference.PRIMARY);
161
162 myResponseToId = responseToId;
163 myCursorId = cursorId;
164 myCursorOffset = cursorOffset;
165 myResults = new ArrayList<Document>(results);
166 myAwaitCapable = awaitCapable;
167 myCursorNotFound = cursorNotFound;
168 myQueryFailed = queryFailed;
169 myShardConfigStale = shardConfigStale;
170 }
171
172
173
174
175
176
177
178
179
180
181 @Override
182 public boolean equals(final Object object) {
183 boolean result = false;
184 if (this == object) {
185 result = true;
186 }
187 else if ((object != null) && (getClass() == object.getClass())) {
188 final Reply other = (Reply) object;
189
190
191 result = (myAwaitCapable == other.myAwaitCapable)
192 && (myCursorNotFound == other.myCursorNotFound)
193 && (myQueryFailed == other.myQueryFailed)
194 && (myShardConfigStale == other.myShardConfigStale)
195 && (myResponseToId == other.myResponseToId)
196 && (myCursorOffset == other.myCursorOffset)
197 && (myCursorId == other.myCursorId)
198 && myResults.equals(other.myResults);
199 }
200 return result;
201 }
202
203
204
205
206
207
208
209
210 public long getCursorId() {
211 return myCursorId;
212 }
213
214
215
216
217
218
219
220
221 public int getCursorOffset() {
222 return myCursorOffset;
223 }
224
225
226
227
228
229
230
231 @Override
232 public String getOperationName() {
233 return Operation.REPLY.name();
234 }
235
236
237
238
239
240
241 public int getResponseToId() {
242 return myResponseToId;
243 }
244
245
246
247
248
249
250 public List<Document> getResults() {
251 return myResults;
252 }
253
254
255
256
257
258
259 @Override
260 public int hashCode() {
261 int result = 1;
262 result = (31 * result) + super.hashCode();
263 result = (31 * result) + (myAwaitCapable ? 1 : 3);
264 result = (31 * result) + (myCursorNotFound ? 1 : 3);
265 result = (31 * result) + (myQueryFailed ? 1 : 3);
266 result = (31 * result) + (myShardConfigStale ? 1 : 3);
267 result = (31 * result) + myResponseToId;
268 result = (31 * result) + myCursorOffset;
269 result = (31 * result) + (int) (myCursorId >> Integer.SIZE);
270 result = (31 * result) + (int) myCursorId;
271 result = (31 * result) + myResults.hashCode();
272 return result;
273 }
274
275
276
277
278
279
280 public boolean isAwaitCapable() {
281 return myAwaitCapable;
282 }
283
284
285
286
287
288
289
290
291 public boolean isCursorNotFound() {
292 return myCursorNotFound;
293 }
294
295
296
297
298
299
300 public boolean isQueryFailed() {
301 return myQueryFailed;
302 }
303
304
305
306
307
308
309 public boolean isShardConfigStale() {
310 return myShardConfigStale;
311 }
312
313
314
315
316
317
318
319 @Override
320 public int size() {
321
322 int size = HEADER_SIZE + 20;
323
324
325
326
327 for (final Document result : myResults) {
328 size += result.size();
329 }
330
331 return size;
332 }
333
334
335
336
337
338
339
340
341 @Override
342 public void validateSize(final int maxDocumentSize)
343 throws DocumentToLargeException {
344
345 }
346
347
348
349
350
351
352
353
354
355 @Override
356 public void write(final int messageId, final BsonOutputStream out)
357 throws IOException {
358 final int flags = computeFlags();
359
360 int size = HEADER_SIZE;
361 size += 4;
362 size += 8;
363 size += 4;
364 size += 4;
365 for (final Document result : myResults) {
366 size += result.size();
367 }
368
369 writeHeader(out, messageId, myResponseToId, Operation.REPLY, size);
370 out.writeInt(flags);
371 out.writeLong(myCursorId);
372 out.writeInt(myCursorOffset);
373 out.writeInt(myResults.size());
374 for (final Document result : myResults) {
375 out.writeDocument(result);
376 }
377 }
378
379
380
381
382
383
384
385
386
387 @Override
388 public void write(final int messageId, final BufferingBsonOutputStream out)
389 throws IOException {
390 final int flags = computeFlags();
391
392 final long start = writeHeader(out, messageId, myResponseToId,
393 Operation.REPLY);
394 out.writeInt(flags);
395 out.writeLong(myCursorId);
396 out.writeInt(myCursorOffset);
397 out.writeInt(myResults.size());
398 for (final Document result : myResults) {
399 out.writeDocument(result);
400 }
401 finishHeader(out, start);
402
403 out.flushBuffer();
404 }
405
406
407
408
409
410
411 private int computeFlags() {
412 int flags = 0;
413 if (myAwaitCapable) {
414 flags += AWAIT_CAPABLE_BIT;
415 }
416 if (myCursorNotFound) {
417 flags += CURSOR_NOT_FOUND_BIT;
418 }
419 if (myQueryFailed) {
420 flags += QUERY_FAILURE_BIT;
421 }
422 if (myShardConfigStale) {
423 flags += SHARD_CONFIG_STALE_BIT;
424 }
425 return flags;
426 }
427
428 }