1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
|
20 | |
|
21 | |
package com.allanbank.mongodb.client.connection.rs; |
22 | |
|
23 | |
import java.io.IOException; |
24 | |
import java.net.InetSocketAddress; |
25 | |
import java.util.Collections; |
26 | |
import java.util.List; |
27 | |
import java.util.concurrent.ExecutionException; |
28 | |
import java.util.logging.Level; |
29 | |
|
30 | |
import com.allanbank.mongodb.MongoClientConfiguration; |
31 | |
import com.allanbank.mongodb.MongoDbException; |
32 | |
import com.allanbank.mongodb.bson.Document; |
33 | |
import com.allanbank.mongodb.bson.element.StringElement; |
34 | |
import com.allanbank.mongodb.client.ClusterStats; |
35 | |
import com.allanbank.mongodb.client.ClusterType; |
36 | |
import com.allanbank.mongodb.client.callback.FutureReplyCallback; |
37 | |
import com.allanbank.mongodb.client.connection.Connection; |
38 | |
import com.allanbank.mongodb.client.connection.ConnectionFactory; |
39 | |
import com.allanbank.mongodb.client.connection.ReconnectStrategy; |
40 | |
import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory; |
41 | |
import com.allanbank.mongodb.client.message.IsMaster; |
42 | |
import com.allanbank.mongodb.client.message.Reply; |
43 | |
import com.allanbank.mongodb.client.state.Cluster; |
44 | |
import com.allanbank.mongodb.client.state.ClusterPinger; |
45 | |
import com.allanbank.mongodb.client.state.LatencyServerSelector; |
46 | |
import com.allanbank.mongodb.client.state.Server; |
47 | |
import com.allanbank.mongodb.client.state.ServerUpdateCallback; |
48 | |
import com.allanbank.mongodb.util.IOUtils; |
49 | |
import com.allanbank.mongodb.util.log.Log; |
50 | |
import com.allanbank.mongodb.util.log.LogFactory; |
51 | |
|
52 | |
|
53 | |
|
54 | |
|
55 | |
|
56 | |
|
57 | |
|
58 | |
|
59 | |
public class ReplicaSetConnectionFactory implements ConnectionFactory { |
60 | |
|
61 | |
|
62 | 1 | protected static final Log LOG = LogFactory |
63 | |
.getLog(ReplicaSetConnectionFactory.class); |
64 | |
|
65 | |
|
66 | |
protected final ProxiedConnectionFactory myConnectionFactory; |
67 | |
|
68 | |
|
69 | |
private final Cluster myCluster; |
70 | |
|
71 | |
|
72 | |
private final MongoClientConfiguration myConfig; |
73 | |
|
74 | |
|
75 | |
private final ClusterPinger myPinger; |
76 | |
|
77 | |
|
78 | |
private final ReplicaSetReconnectStrategy myStrategy; |
79 | |
|
80 | |
|
81 | |
|
82 | |
|
83 | |
|
84 | |
|
85 | |
|
86 | |
|
87 | |
|
88 | |
public ReplicaSetConnectionFactory(final ProxiedConnectionFactory factory, |
89 | 23 | final MongoClientConfiguration config) { |
90 | 23 | myConnectionFactory = factory; |
91 | 23 | myConfig = config; |
92 | 23 | myCluster = new Cluster(config, ClusterType.REPLICA_SET); |
93 | 23 | myPinger = new ClusterPinger(myCluster, factory, config); |
94 | |
|
95 | 23 | myStrategy = new ReplicaSetReconnectStrategy(); |
96 | 23 | myStrategy.setConfig(myConfig); |
97 | 23 | myStrategy.setConnectionFactory(myConnectionFactory); |
98 | 23 | myStrategy.setState(myCluster); |
99 | 23 | myStrategy.setSelector(new LatencyServerSelector(myCluster, false)); |
100 | |
|
101 | |
|
102 | 23 | bootstrap(); |
103 | 23 | } |
104 | |
|
105 | |
|
106 | |
|
107 | |
|
108 | |
public void bootstrap() { |
109 | |
|
110 | 23 | locatePrimary(); |
111 | |
|
112 | |
|
113 | |
|
114 | 23 | myPinger.initialSweep(myCluster); |
115 | 23 | myPinger.start(); |
116 | 23 | } |
117 | |
|
118 | |
|
119 | |
|
120 | |
|
121 | |
|
122 | |
|
123 | |
|
124 | |
|
125 | |
@Override |
126 | |
public void close() { |
127 | 24 | IOUtils.close(myPinger); |
128 | 24 | IOUtils.close(myConnectionFactory); |
129 | 24 | } |
130 | |
|
131 | |
|
132 | |
|
133 | |
|
134 | |
|
135 | |
|
136 | |
@Override |
137 | |
public Connection connect() throws IOException { |
138 | |
|
139 | |
|
140 | 9 | List<Server> writableServers = myCluster.getWritableServers(); |
141 | 44 | for (int i = 0; i < 10; ++i) { |
142 | 41 | servers: for (final Server primary : writableServers) { |
143 | 12 | Connection primaryConn = null; |
144 | |
try { |
145 | 12 | primaryConn = myConnectionFactory |
146 | |
.connect(primary, myConfig); |
147 | |
|
148 | 11 | if (isWritable(primary, primaryConn)) { |
149 | |
|
150 | 6 | final ReplicaSetConnection rsConnection = new ReplicaSetConnection( |
151 | |
primaryConn, primary, myCluster, |
152 | |
myConnectionFactory, myConfig, myStrategy); |
153 | |
|
154 | 6 | primaryConn = null; |
155 | |
|
156 | 6 | return rsConnection; |
157 | |
} |
158 | |
|
159 | |
break servers; |
160 | |
} |
161 | 1 | catch (final IOException e) { |
162 | 1 | LOG.debug(e, "Error connecting to presumptive primary: {}", |
163 | |
e.getMessage()); |
164 | |
} |
165 | |
finally { |
166 | 7 | IOUtils.close(primaryConn); |
167 | 1 | } |
168 | 1 | } |
169 | |
|
170 | |
|
171 | 35 | writableServers = locatePrimary(); |
172 | |
} |
173 | |
|
174 | |
|
175 | |
|
176 | |
|
177 | 3 | return new ReplicaSetConnection(null, null, myCluster, |
178 | |
myConnectionFactory, myConfig, myStrategy); |
179 | |
} |
180 | |
|
181 | |
|
182 | |
|
183 | |
|
184 | |
|
185 | |
|
186 | |
|
187 | |
@Override |
188 | |
public ClusterStats getClusterStats() { |
189 | 0 | return myCluster; |
190 | |
} |
191 | |
|
192 | |
|
193 | |
|
194 | |
|
195 | |
|
196 | |
|
197 | |
|
198 | |
@Override |
199 | |
public ClusterType getClusterType() { |
200 | 2 | return ClusterType.REPLICA_SET; |
201 | |
} |
202 | |
|
203 | |
|
204 | |
|
205 | |
|
206 | |
|
207 | |
|
208 | |
|
209 | |
@Override |
210 | |
public ReconnectStrategy getReconnectStrategy() { |
211 | 6 | return myStrategy; |
212 | |
} |
213 | |
|
214 | |
|
215 | |
|
216 | |
|
217 | |
|
218 | |
|
219 | |
protected Cluster getCluster() { |
220 | 18 | return myCluster; |
221 | |
} |
222 | |
|
223 | |
|
224 | |
|
225 | |
|
226 | |
|
227 | |
|
228 | |
|
229 | |
|
230 | |
|
231 | |
|
232 | |
|
233 | |
protected boolean isWritable(final Server server, |
234 | |
final Connection connection) { |
235 | |
|
236 | |
try { |
237 | 11 | final ServerUpdateCallback replyCallback = new ServerUpdateCallback( |
238 | |
server); |
239 | 11 | connection.send(new IsMaster(), replyCallback); |
240 | |
|
241 | 11 | final Reply reply = replyCallback.get(); |
242 | 9 | final List<Document> results = reply.getResults(); |
243 | 9 | if (!results.isEmpty()) { |
244 | 8 | final Document doc = results.get(0); |
245 | |
|
246 | |
|
247 | 8 | final StringElement primaryName = doc.get(StringElement.class, |
248 | |
"primary"); |
249 | 8 | if (primaryName != null) { |
250 | 7 | return (primaryName.getValue().equals(connection |
251 | |
.getServerName())); |
252 | |
} |
253 | |
} |
254 | |
} |
255 | 1 | catch (final InterruptedException e) { |
256 | |
|
257 | 1 | LOG.debug(e, "Failure testing if a connection is writable: {}", |
258 | |
e.getMessage()); |
259 | |
} |
260 | 1 | catch (final ExecutionException e) { |
261 | |
|
262 | 1 | LOG.debug(e, "Failure testing if a connection is writable: {}", |
263 | |
e.getMessage()); |
264 | 3 | } |
265 | 4 | return false; |
266 | |
} |
267 | |
|
268 | |
|
269 | |
|
270 | |
|
271 | |
|
272 | |
|
273 | |
protected List<Server> locatePrimary() { |
274 | 58 | for (final InetSocketAddress addr : myConfig.getServerAddresses()) { |
275 | 46 | Connection conn = null; |
276 | 46 | final FutureReplyCallback future = new FutureReplyCallback(); |
277 | |
try { |
278 | 46 | final Server server = myCluster.add(addr); |
279 | |
|
280 | 46 | conn = myConnectionFactory.connect(server, myConfig); |
281 | |
|
282 | 24 | conn.send(new IsMaster(), future); |
283 | |
|
284 | 22 | final Reply reply = future.get(); |
285 | 20 | final List<Document> results = reply.getResults(); |
286 | 20 | if (!results.isEmpty()) { |
287 | 19 | final Document doc = results.get(0); |
288 | |
|
289 | |
|
290 | |
|
291 | 19 | if (myConfig.isAutoDiscoverServers()) { |
292 | |
|
293 | 17 | final List<StringElement> hosts = doc.find( |
294 | |
StringElement.class, "hosts", ".*"); |
295 | 17 | for (final StringElement host : hosts) { |
296 | 29 | myCluster.add(host.getValue()); |
297 | 29 | } |
298 | |
} |
299 | |
|
300 | |
|
301 | 19 | final StringElement primary = doc.findFirst( |
302 | |
StringElement.class, "primary"); |
303 | 19 | if (primary != null) { |
304 | 17 | return Collections.singletonList(myCluster.add(primary |
305 | |
.getValue())); |
306 | |
} |
307 | |
} |
308 | |
} |
309 | 22 | catch (final IOException ioe) { |
310 | 22 | LOG.warn(ioe, "I/O error during replica-set bootstrap to {}.", |
311 | |
addr); |
312 | |
} |
313 | 2 | catch (final MongoDbException me) { |
314 | 2 | LOG.warn(me, |
315 | |
"MongoDB error during replica-set bootstrap to {}.", |
316 | |
addr); |
317 | |
} |
318 | 2 | catch (final InterruptedException e) { |
319 | 2 | LOG.warn(e, "Interrupted during replica-set bootstrap to {}.", |
320 | |
addr); |
321 | |
} |
322 | 0 | catch (final ExecutionException e) { |
323 | 0 | LOG.warn(e, "Error during replica-set bootstrap to {}.", addr); |
324 | |
} |
325 | |
finally { |
326 | 43 | IOUtils.close(conn, Level.WARNING, |
327 | |
"I/O error shutting down replica-set bootstrap connection to " |
328 | |
+ addr + "."); |
329 | 29 | } |
330 | 29 | } |
331 | 41 | return Collections.emptyList(); |
332 | |
} |
333 | |
} |