1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package com.allanbank.mongodb.client.callback;
22
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.IdentityHashMap;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Map;
29
30 import com.allanbank.mongodb.Callback;
31 import com.allanbank.mongodb.Durability;
32 import com.allanbank.mongodb.builder.BatchedWrite;
33 import com.allanbank.mongodb.builder.BatchedWriteMode;
34 import com.allanbank.mongodb.builder.write.DeleteOperation;
35 import com.allanbank.mongodb.builder.write.InsertOperation;
36 import com.allanbank.mongodb.builder.write.UpdateOperation;
37 import com.allanbank.mongodb.builder.write.WriteOperation;
38 import com.allanbank.mongodb.client.AbstractMongoOperations;
39 import com.allanbank.mongodb.error.BatchedWriteException;
40
41
42
43
44
45
46
47
48
49
50 public class BatchedNativeWriteCallback extends ReplyLongCallback {
51
52
53 private final AbstractMongoOperations myCollection;
54
55
56 private final Map<WriteOperation, Throwable> myFailedOperations;
57
58
59 private int myFinished;
60
61
62 private long myN = 0;
63
64
65 private final List<WriteOperation> myOperations;
66
67
68 private final List<WriteOperation> myPendingOperations;
69
70
71 private final BatchedWrite myWrite;
72
73
74
75
76
77
78
79
80
81
82
83
84
85 public BatchedNativeWriteCallback(final Callback<Long> results,
86 final BatchedWrite write, final AbstractMongoOperations collection,
87 final List<WriteOperation> operations) {
88 super(results);
89
90 myWrite = write;
91 myCollection = collection;
92 myOperations = Collections
93 .unmodifiableList(new ArrayList<WriteOperation>(operations));
94
95 myPendingOperations = new LinkedList<WriteOperation>(myOperations);
96
97 myFinished = 0;
98 myN = 0;
99
100 myFailedOperations = new IdentityHashMap<WriteOperation, Throwable>();
101 }
102
103
104
105
106 public void send() {
107
108 List<WriteOperation> toSendOperations;
109 Durability durability = null;
110
111 synchronized (this) {
112 List<WriteOperation> toSend = myPendingOperations;
113 if (BatchedWriteMode.SERIALIZE_AND_STOP.equals(myWrite.getMode())) {
114 toSend = myPendingOperations.subList(0, 1);
115 }
116
117 durability = myWrite.getDurability();
118 if ((durability == null) || (durability == Durability.NONE)) {
119 durability = Durability.ACK;
120 }
121
122
123
124 toSendOperations = new ArrayList<WriteOperation>(toSend);
125 toSend.clear();
126 }
127
128
129
130
131 for (final WriteOperation operation : toSendOperations) {
132 switch (operation.getType()) {
133 case INSERT: {
134 final InsertOperation insert = (InsertOperation) operation;
135 myCollection.insertAsync(new NativeCallback<Integer>(insert),
136 true, durability, insert.getDocument());
137 break;
138 }
139 case UPDATE: {
140 final UpdateOperation update = (UpdateOperation) operation;
141 myCollection.updateAsync(new NativeCallback<Long>(operation),
142 update.getQuery(), update.getUpdate(),
143 update.isMultiUpdate(), update.isUpsert(), durability);
144 break;
145 }
146 case DELETE: {
147 final DeleteOperation delete = (DeleteOperation) operation;
148 myCollection.deleteAsync(new NativeCallback<Long>(operation),
149 delete.getQuery(), delete.isSingleDelete(), durability);
150 break;
151 }
152 }
153 }
154 }
155
156
157
158
159
160
161
162
163
164 protected synchronized void callback(final WriteOperation operation,
165 final long result) {
166 myN += result;
167 myFinished += 1;
168
169 if (!myPendingOperations.isEmpty()) {
170 send();
171 }
172 else if (myFinished == myOperations.size()) {
173 publishResults();
174 }
175 }
176
177
178
179
180
181
182
183
184
185
186 protected synchronized void exception(final WriteOperation operation,
187 final Throwable thrown) {
188 myFinished += 1;
189 myFailedOperations.put(operation, thrown);
190
191 if (myWrite.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP) {
192 publishResults();
193 }
194
195
196 else if (myFinished == myOperations.size()) {
197 publishResults();
198 }
199
200 }
201
202
203
204
205 private void publishResults() {
206 if (myFailedOperations.isEmpty()) {
207 myForwardCallback.callback(Long.valueOf(myN));
208 }
209 else {
210 myForwardCallback.exception(new BatchedWriteException(myWrite, myN,
211 myPendingOperations, myFailedOperations));
212 }
213 }
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230 class NativeCallback<T extends Number> implements Callback<T> {
231
232
233 private final WriteOperation myOperation;
234
235
236
237
238
239
240
241 public NativeCallback(final WriteOperation operation) {
242 myOperation = operation;
243 }
244
245
246
247
248
249
250
251 @Override
252 public void callback(final T result) {
253 BatchedNativeWriteCallback.this.callback(myOperation,
254 result.longValue());
255 }
256
257
258
259
260
261
262
263 @Override
264 public void exception(final Throwable thrown) {
265 BatchedNativeWriteCallback.this.exception(myOperation, thrown);
266 }
267 }
268 }