1 /*
2 * #%L
3 * Insert.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20 package com.allanbank.mongodb.client.message;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26
27 import com.allanbank.mongodb.ReadPreference;
28 import com.allanbank.mongodb.bson.Document;
29 import com.allanbank.mongodb.bson.io.BsonInputStream;
30 import com.allanbank.mongodb.bson.io.BsonOutputStream;
31 import com.allanbank.mongodb.bson.io.BufferingBsonOutputStream;
32 import com.allanbank.mongodb.bson.io.StringEncoder;
33 import com.allanbank.mongodb.client.Message;
34 import com.allanbank.mongodb.client.Operation;
35 import com.allanbank.mongodb.client.VersionRange;
36 import com.allanbank.mongodb.error.DocumentToLargeException;
37
38 /**
39 * Message to <a href=
40 * "http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-OPINSERT"
41 * >insert</a> a set of documents into a collection.
42 *
43 * <pre>
44 * <code>
45 * struct {
46 * MsgHeader header; // standard message header
47 * int32 flags; // bit vector - see below
48 * cstring fullCollectionName; // "dbname.collectionname"
49 * document* documents; // one or more documents to insert into the collection
50 * }
51 * </code>
52 * </pre>
53 *
54 *
55 * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
56 * mutated in incompatible ways between any two releases of the driver.
57 * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved
58 */
59 public class Insert extends AbstractMessage {
60
61 /** The flag bit to keep inserting documents on an error. */
62 public static final int CONTINUE_ON_ERROR_BIT = 1;
63
64 /**
65 * If true, then the insert of documents should continue if one document
66 * causes an error.
67 */
68 private final boolean myContinueOnError;
69
70 /** The documents to be inserted. */
71 private final List<Document> myDocuments;
72
73 /**
74 * The documents to be inserted. If negative then the size has not been
75 * computed.
76 */
77 private int myDocumentsSize;
78
79 /**
80 * Creates a new Insert.
81 *
82 * @param header
83 * The header proceeding the insert message. This is used to
84 * locate the end of the insert.
85 * @param in
86 * The stream to read the insert message from.
87 * @throws IOException
88 * On a failure reading the insert message.
89 */
90 public Insert(final Header header, final BsonInputStream in)
91 throws IOException {
92
93 final long position = in.getBytesRead();
94 final long end = (position + header.getLength()) - Header.SIZE;
95
96 final int flags = in.readInt();
97 init(in.readCString());
98
99 // Read the documents to the end of the message.
100 myDocuments = new ArrayList<Document>();
101 while (in.getBytesRead() < end) {
102 myDocuments.add(in.readDocument());
103 }
104
105 myContinueOnError = (flags & CONTINUE_ON_ERROR_BIT) == CONTINUE_ON_ERROR_BIT;
106 myDocumentsSize = -1;
107 }
108
109 /**
110 * Creates a new Insert.
111 *
112 * @param databaseName
113 * The name of the database.
114 * @param collectionName
115 * The name of the collection.
116 * @param documents
117 * The documents to be inserted.
118 * @param continueOnError
119 * If the insert should continue if one of the documents causes
120 * an error.
121 */
122 public Insert(final String databaseName, final String collectionName,
123 final List<Document> documents, final boolean continueOnError) {
124 this(databaseName, collectionName, documents, continueOnError, null);
125 }
126
127 /**
128 * Creates a new Insert.
129 *
130 * @param databaseName
131 * The name of the database.
132 * @param collectionName
133 * The name of the collection.
134 * @param documents
135 * The documents to be inserted.
136 * @param continueOnError
137 * If the insert should continue if one of the documents causes
138 * an error.
139 * @param requiredServerVersion
140 * The required version of the server to support processing the
141 * message.
142 */
143 public Insert(final String databaseName, final String collectionName,
144 final List<Document> documents, final boolean continueOnError,
145 final VersionRange requiredServerVersion) {
146 super(databaseName, collectionName, ReadPreference.PRIMARY,
147 requiredServerVersion);
148
149 myDocuments = new ArrayList<Document>(documents);
150 myContinueOnError = continueOnError;
151 myDocumentsSize = -1;
152 }
153
154 /**
155 * Determines if the passed object is of this same type as this object and
156 * if so that its fields are equal.
157 *
158 * @param object
159 * The object to compare to.
160 *
161 * @see java.lang.Object#equals(java.lang.Object)
162 */
163 @Override
164 public boolean equals(final Object object) {
165 boolean result = false;
166 if (this == object) {
167 result = true;
168 }
169 else if ((object != null) && (getClass() == object.getClass())) {
170 final Insert other = (Insert) object;
171
172 result = super.equals(object)
173 && (myContinueOnError == other.myContinueOnError)
174 && myDocuments.equals(other.myDocuments);
175 }
176 return result;
177 }
178
179 /**
180 * Returns the documents to insert.
181 *
182 * @return The documents to insert.
183 */
184 public List<Document> getDocuments() {
185 return Collections.unmodifiableList(myDocuments);
186 }
187
188 /**
189 * {@inheritDoc}
190 * <p>
191 * Overridden to return the name of the operation: "INSERT".
192 * </p>
193 */
194 @Override
195 public String getOperationName() {
196 return Operation.INSERT.name();
197 }
198
199 /**
200 * Computes a reasonable hash code.
201 *
202 * @return The hash code value.
203 */
204 @Override
205 public int hashCode() {
206 int result = 1;
207 result = (31 * result) + super.hashCode();
208 result = (31 * result) + (myContinueOnError ? 1 : 3);
209 result = (31 * result) + myDocuments.hashCode();
210 return result;
211 }
212
213 /**
214 * Returns true if the insert should continue with other documents if one of
215 * the document inserts encounters an error.
216 *
217 * @return True if the insert should continue with other documents if one of
218 * the document inserts encounters an error.
219 */
220 public boolean isContinueOnError() {
221 return myContinueOnError;
222 }
223
224 /**
225 * {@inheritDoc}
226 * <p>
227 * Overridden to return the size of the {@link Insert}.
228 * </p>
229 */
230 @Override
231 public int size() {
232
233 int size = HEADER_SIZE + 6; // See below.
234 // size += 4; // flags
235 size += StringEncoder.utf8Size(myDatabaseName);
236 // size += 1; // StringEncoder.utf8Size(".");
237 size += StringEncoder.utf8Size(myCollectionName);
238 // size += 1; // \0 on the CString.
239 for (final Document document : myDocuments) {
240 size += document.size();
241 }
242
243 return size;
244 }
245
246 /**
247 * {@inheritDoc}
248 * <p>
249 * Overridden to output the documents and insert flags.
250 * </p>
251 */
252 @Override
253 public String toString() {
254 return "Insert [myContinueOnError=" + myContinueOnError
255 + ", myDocuments=" + myDocuments + "]";
256 }
257
258 /**
259 * {@inheritDoc}
260 * <p>
261 * Overridden to ensure the inserted documents are not too large in
262 * aggregate.
263 * </p>
264 */
265 @Override
266 public void validateSize(final int maxDocumentSize)
267 throws DocumentToLargeException {
268 if (myDocumentsSize < 0) {
269 long size = 0;
270 for (final Document doc : myDocuments) {
271 size += doc.size();
272 }
273
274 myDocumentsSize = (int) size;
275 }
276
277 if (maxDocumentSize < myDocumentsSize) {
278 throw new DocumentToLargeException(myDocumentsSize,
279 maxDocumentSize, myDocuments.get(0));
280 }
281 }
282
283 /**
284 * {@inheritDoc}
285 * <p>
286 * Overridden to write the insert message.
287 * </p>
288 *
289 * @see Message#write(int, BsonOutputStream)
290 */
291 @Override
292 public void write(final int messageId, final BsonOutputStream out)
293 throws IOException {
294 final int flags = computeFlags();
295
296 int size = HEADER_SIZE;
297 size += 4; // flags
298 size += out.sizeOfCString(myDatabaseName, ".", myCollectionName);
299 for (final Document document : myDocuments) {
300 size += document.size();
301 }
302
303 writeHeader(out, messageId, 0, Operation.INSERT, size);
304 out.writeInt(flags);
305 out.writeCString(myDatabaseName, ".", myCollectionName);
306 for (final Document document : myDocuments) {
307 out.writeDocument(document);
308 }
309 }
310
311 /**
312 * {@inheritDoc}
313 * <p>
314 * Overridden to write the insert message.
315 * </p>
316 *
317 * @see Message#write(int, BsonOutputStream)
318 */
319 @Override
320 public void write(final int messageId, final BufferingBsonOutputStream out)
321 throws IOException {
322 final int flags = computeFlags();
323
324 final long start = writeHeader(out, messageId, 0, Operation.INSERT);
325 out.writeInt(flags);
326 out.writeCString(myDatabaseName, ".", myCollectionName);
327 for (final Document document : myDocuments) {
328 out.writeDocument(document);
329 }
330 finishHeader(out, start);
331
332 out.flushBuffer();
333 }
334
335 /**
336 * Computes the message flags bit field.
337 *
338 * @return The message flags bit field.
339 */
340 private int computeFlags() {
341 int flags = 0;
342 if (myContinueOnError) {
343 flags += CONTINUE_ON_ERROR_BIT;
344 }
345 return flags;
346 }
347
348 }