1 /*
2 * #%L
3 * BatchedInsertCountingCallback.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20
21 package com.allanbank.mongodb.client.callback;
22
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.IdentityHashMap;
26 import java.util.List;
27 import java.util.Map;
28
29 import com.allanbank.mongodb.Callback;
30 import com.allanbank.mongodb.bson.Document;
31 import com.allanbank.mongodb.bson.builder.BuilderFactory;
32 import com.allanbank.mongodb.builder.BatchedWrite;
33 import com.allanbank.mongodb.builder.write.WriteOperation;
34 import com.allanbank.mongodb.client.message.Reply;
35 import com.allanbank.mongodb.error.BatchedWriteException;
36
37 /**
38 * BatchedInsertCountingCallback is designed to work with the
39 * {@link BatchedWriteCallback}. This callback can be used as the callback for a
40 * series of individual writes and will coalesce the results into a single
41 * result based on a an expected number of callbacks.
42 * <p>
43 * The class does not track the input {@code n} value and instead always returns
44 * an N value based on the expected count. That limits the utility of this class
45 * to inserts.
46 * </p>
47 *
48 * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
49 * mutated in incompatible ways between any two releases of the driver.
50 * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
51 */
52 public class BatchedInsertCountingCallback implements Callback<Reply> {
53
54 /** The count of the number of callbacks received so far. */
55 private int myCount = 0;
56
57 /** The failed operations. */
58 private Map<WriteOperation, Throwable> myErrors;
59
60 /** The expected number of callbacks. */
61 private final int myExpectedCount;
62
63 /**
64 * The count of the number of failure ({@link #exception(Throwable)})
65 * callbacks received so far.
66 */
67 private int myFailureCount = 0;
68
69 /** The callback to notify with the final results once we receive them all. */
70 private final Callback<Reply> myForwardCallback;
71
72 /** The last batched write that failed. */
73 private BatchedWrite myLastWrite;
74
75 /** The skipped operations. */
76 private List<WriteOperation> mySkipped;
77
78 /**
79 * Creates a new CountingCallback.
80 *
81 * @param forwardCallback
82 * The callback to notify with the final results once we receive
83 * them all.
84 * @param expectedCount
85 * The expected number of callbacks.
86 */
87 public BatchedInsertCountingCallback(final Callback<Reply> forwardCallback,
88 final int expectedCount) {
89 myForwardCallback = forwardCallback;
90 myExpectedCount = expectedCount;
91 myCount = 0;
92 }
93
94 /**
95 * {@inheritDoc}
96 * <p>
97 * Overridden to increment the count and when the max is reached forward the
98 * final results.
99 * </p>
100 */
101 @Override
102 public void callback(final Reply result) {
103 boolean publish;
104 synchronized (this) {
105 myCount += 1;
106 publish = (myCount == myExpectedCount);
107 }
108
109 if (publish) {
110 publish();
111 }
112 }
113
114 /**
115 * {@inheritDoc}
116 * <p>
117 * Overridden to record the exception details.
118 * </p>
119 */
120 @Override
121 public void exception(final Throwable thrown) {
122 boolean publish;
123 synchronized (this) {
124 myFailureCount += 1;
125 myCount += 1;
126 publish = (myCount == myExpectedCount);
127
128 if (mySkipped == null) {
129 mySkipped = new ArrayList<WriteOperation>();
130 myErrors = new IdentityHashMap<WriteOperation, Throwable>();
131 }
132
133 if (thrown instanceof BatchedWriteException) {
134 final BatchedWriteException errors = (BatchedWriteException) thrown;
135
136 myLastWrite = errors.getWrite();
137 mySkipped.addAll(errors.getSkipped());
138 myErrors.putAll(errors.getErrors());
139 }
140 }
141
142 if (publish) {
143 publish();
144 }
145 }
146
147 /**
148 * Publishes the final results to {@link #myForwardCallback}.
149 */
150 private void publish() {
151 Reply reply = null;
152 BatchedWriteException error = null;
153 synchronized (this) {
154 if (myFailureCount == 0) {
155 final Document doc = BuilderFactory.start().add("ok", 1)
156 .add("n", myExpectedCount).build();
157 reply = new Reply(0, 0, 0, Collections.singletonList(doc),
158 false, false, false, false);
159 }
160 else {
161 error = new BatchedWriteException(myLastWrite,
162 (myExpectedCount - myFailureCount), mySkipped, myErrors);
163 }
164 }
165
166 // Reply outside the lock.
167 if (reply != null) {
168 myForwardCallback.callback(reply);
169 }
170 else {
171 myForwardCallback.exception(error);
172 }
173 }
174
175 }