1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package com.allanbank.mongodb.client.callback;
22
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.IdentityHashMap;
26 import java.util.List;
27 import java.util.Map;
28
29 import com.allanbank.mongodb.Callback;
30 import com.allanbank.mongodb.bson.Document;
31 import com.allanbank.mongodb.bson.builder.BuilderFactory;
32 import com.allanbank.mongodb.builder.BatchedWrite;
33 import com.allanbank.mongodb.builder.write.WriteOperation;
34 import com.allanbank.mongodb.client.message.Reply;
35 import com.allanbank.mongodb.error.BatchedWriteException;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 public class BatchedInsertCountingCallback implements Callback<Reply> {
53
54
55 private int myCount = 0;
56
57
58 private Map<WriteOperation, Throwable> myErrors;
59
60
61 private final int myExpectedCount;
62
63
64
65
66
67 private int myFailureCount = 0;
68
69
70 private final Callback<Reply> myForwardCallback;
71
72
73 private BatchedWrite myLastWrite;
74
75
76 private List<WriteOperation> mySkipped;
77
78
79
80
81
82
83
84
85
86
87 public BatchedInsertCountingCallback(final Callback<Reply> forwardCallback,
88 final int expectedCount) {
89 myForwardCallback = forwardCallback;
90 myExpectedCount = expectedCount;
91 myCount = 0;
92 }
93
94
95
96
97
98
99
100
101 @Override
102 public void callback(final Reply result) {
103 boolean publish;
104 synchronized (this) {
105 myCount += 1;
106 publish = (myCount == myExpectedCount);
107 }
108
109 if (publish) {
110 publish();
111 }
112 }
113
114
115
116
117
118
119
120 @Override
121 public void exception(final Throwable thrown) {
122 boolean publish;
123 synchronized (this) {
124 myFailureCount += 1;
125 myCount += 1;
126 publish = (myCount == myExpectedCount);
127
128 if (mySkipped == null) {
129 mySkipped = new ArrayList<WriteOperation>();
130 myErrors = new IdentityHashMap<WriteOperation, Throwable>();
131 }
132
133 if (thrown instanceof BatchedWriteException) {
134 final BatchedWriteException errors = (BatchedWriteException) thrown;
135
136 myLastWrite = errors.getWrite();
137 mySkipped.addAll(errors.getSkipped());
138 myErrors.putAll(errors.getErrors());
139 }
140 }
141
142 if (publish) {
143 publish();
144 }
145 }
146
147
148
149
150 private void publish() {
151 Reply reply = null;
152 BatchedWriteException error = null;
153 synchronized (this) {
154 if (myFailureCount == 0) {
155 final Document doc = BuilderFactory.start().add("ok", 1)
156 .add("n", myExpectedCount).build();
157 reply = new Reply(0, 0, 0, Collections.singletonList(doc),
158 false, false, false, false);
159 }
160 else {
161 error = new BatchedWriteException(myLastWrite,
162 (myExpectedCount - myFailureCount), mySkipped, myErrors);
163 }
164 }
165
166
167 if (reply != null) {
168 myForwardCallback.callback(reply);
169 }
170 else {
171 myForwardCallback.exception(error);
172 }
173 }
174
175 }