Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
BatchedInsertCountingCallback |
|
| 2.5;2.5 |
1 | /* | |
2 | * #%L | |
3 | * BatchedInsertCountingCallback.java - mongodb-async-driver - Allanbank Consulting, Inc. | |
4 | * %% | |
5 | * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc. | |
6 | * %% | |
7 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
8 | * you may not use this file except in compliance with the License. | |
9 | * You may obtain a copy of the License at | |
10 | * | |
11 | * http://www.apache.org/licenses/LICENSE-2.0 | |
12 | * | |
13 | * Unless required by applicable law or agreed to in writing, software | |
14 | * distributed under the License is distributed on an "AS IS" BASIS, | |
15 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
16 | * See the License for the specific language governing permissions and | |
17 | * limitations under the License. | |
18 | * #L% | |
19 | */ | |
20 | ||
21 | package com.allanbank.mongodb.client.callback; | |
22 | ||
23 | import java.util.ArrayList; | |
24 | import java.util.Collections; | |
25 | import java.util.IdentityHashMap; | |
26 | import java.util.List; | |
27 | import java.util.Map; | |
28 | ||
29 | import com.allanbank.mongodb.Callback; | |
30 | import com.allanbank.mongodb.bson.Document; | |
31 | import com.allanbank.mongodb.bson.builder.BuilderFactory; | |
32 | import com.allanbank.mongodb.builder.BatchedWrite; | |
33 | import com.allanbank.mongodb.builder.write.WriteOperation; | |
34 | import com.allanbank.mongodb.client.message.Reply; | |
35 | import com.allanbank.mongodb.error.BatchedWriteException; | |
36 | ||
37 | /** | |
38 | * BatchedInsertCountingCallback is designed to work with the | |
39 | * {@link BatchedWriteCallback}. This callback can be used as the callback for a | |
40 | * series of individual writes and will coalesce the results into a single | |
41 | * result based on a an expected number of callbacks. | |
42 | * <p> | |
43 | * The class does not track the input {@code n} value and instead always returns | |
44 | * an N value based on the expected count. That limits the utility of this class | |
45 | * to inserts. | |
46 | * </p> | |
47 | * | |
48 | * @api.no This class is <b>NOT</b> part of the drivers API. This class may be | |
49 | * mutated in incompatible ways between any two releases of the driver. | |
50 | * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved | |
51 | */ | |
52 | 7 | public class BatchedInsertCountingCallback implements Callback<Reply> { |
53 | ||
54 | /** The count of the number of callbacks received so far. */ | |
55 | 7 | private int myCount = 0; |
56 | ||
57 | /** The failed operations. */ | |
58 | private Map<WriteOperation, Throwable> myErrors; | |
59 | ||
60 | /** The expected number of callbacks. */ | |
61 | private final int myExpectedCount; | |
62 | ||
63 | /** | |
64 | * The count of the number of failure ({@link #exception(Throwable)}) | |
65 | * callbacks received so far. | |
66 | */ | |
67 | 7 | private int myFailureCount = 0; |
68 | ||
69 | /** The callback to notify with the final results once we receive them all. */ | |
70 | private final Callback<Reply> myForwardCallback; | |
71 | ||
72 | /** The last batched write that failed. */ | |
73 | private BatchedWrite myLastWrite; | |
74 | ||
75 | /** The skipped operations. */ | |
76 | private List<WriteOperation> mySkipped; | |
77 | ||
78 | /** | |
79 | * Creates a new CountingCallback. | |
80 | * | |
81 | * @param forwardCallback | |
82 | * The callback to notify with the final results once we receive | |
83 | * them all. | |
84 | * @param expectedCount | |
85 | * The expected number of callbacks. | |
86 | */ | |
87 | public BatchedInsertCountingCallback(final Callback<Reply> forwardCallback, | |
88 | 7 | final int expectedCount) { |
89 | 7 | myForwardCallback = forwardCallback; |
90 | 7 | myExpectedCount = expectedCount; |
91 | 7 | myCount = 0; |
92 | 7 | } |
93 | ||
94 | /** | |
95 | * {@inheritDoc} | |
96 | * <p> | |
97 | * Overridden to increment the count and when the max is reached forward the | |
98 | * final results. | |
99 | * </p> | |
100 | */ | |
101 | @Override | |
102 | public void callback(final Reply result) { | |
103 | boolean publish; | |
104 | 7 | synchronized (this) { |
105 | 7 | myCount += 1; |
106 | 7 | publish = (myCount == myExpectedCount); |
107 | 7 | } |
108 | ||
109 | 7 | if (publish) { |
110 | 7 | publish(); |
111 | } | |
112 | 7 | } |
113 | ||
114 | /** | |
115 | * {@inheritDoc} | |
116 | * <p> | |
117 | * Overridden to record the exception details. | |
118 | * </p> | |
119 | */ | |
120 | @Override | |
121 | public void exception(final Throwable thrown) { | |
122 | boolean publish; | |
123 | 0 | synchronized (this) { |
124 | 0 | myFailureCount += 1; |
125 | 0 | myCount += 1; |
126 | 0 | publish = (myCount == myExpectedCount); |
127 | ||
128 | 0 | if (mySkipped == null) { |
129 | 0 | mySkipped = new ArrayList<WriteOperation>(); |
130 | 0 | myErrors = new IdentityHashMap<WriteOperation, Throwable>(); |
131 | } | |
132 | ||
133 | 0 | if (thrown instanceof BatchedWriteException) { |
134 | 0 | final BatchedWriteException errors = (BatchedWriteException) thrown; |
135 | ||
136 | 0 | myLastWrite = errors.getWrite(); |
137 | 0 | mySkipped.addAll(errors.getSkipped()); |
138 | 0 | myErrors.putAll(errors.getErrors()); |
139 | } | |
140 | 0 | } |
141 | ||
142 | 0 | if (publish) { |
143 | 0 | publish(); |
144 | } | |
145 | 0 | } |
146 | ||
147 | /** | |
148 | * Publishes the final results to {@link #myForwardCallback}. | |
149 | */ | |
150 | private void publish() { | |
151 | 7 | Reply reply = null; |
152 | 7 | BatchedWriteException error = null; |
153 | 7 | synchronized (this) { |
154 | 7 | if (myFailureCount == 0) { |
155 | 7 | final Document doc = BuilderFactory.start().add("ok", 1) |
156 | .add("n", myExpectedCount).build(); | |
157 | 7 | reply = new Reply(0, 0, 0, Collections.singletonList(doc), |
158 | false, false, false, false); | |
159 | 7 | } |
160 | else { | |
161 | 0 | error = new BatchedWriteException(myLastWrite, |
162 | (myExpectedCount - myFailureCount), mySkipped, myErrors); | |
163 | } | |
164 | 7 | } |
165 | ||
166 | // Reply outside the lock. | |
167 | 7 | if (reply != null) { |
168 | 7 | myForwardCallback.callback(reply); |
169 | } | |
170 | else { | |
171 | 0 | myForwardCallback.exception(error); |
172 | } | |
173 | 7 | } |
174 | ||
175 | } |