1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package com.allanbank.mongodb.client.connection.rs;
22
23 import java.io.IOException;
24 import java.net.InetSocketAddress;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.concurrent.ExecutionException;
28 import java.util.logging.Level;
29
30 import com.allanbank.mongodb.MongoClientConfiguration;
31 import com.allanbank.mongodb.MongoDbException;
32 import com.allanbank.mongodb.bson.Document;
33 import com.allanbank.mongodb.bson.element.StringElement;
34 import com.allanbank.mongodb.client.ClusterStats;
35 import com.allanbank.mongodb.client.ClusterType;
36 import com.allanbank.mongodb.client.callback.FutureReplyCallback;
37 import com.allanbank.mongodb.client.connection.Connection;
38 import com.allanbank.mongodb.client.connection.ConnectionFactory;
39 import com.allanbank.mongodb.client.connection.ReconnectStrategy;
40 import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory;
41 import com.allanbank.mongodb.client.message.IsMaster;
42 import com.allanbank.mongodb.client.message.Reply;
43 import com.allanbank.mongodb.client.state.Cluster;
44 import com.allanbank.mongodb.client.state.ClusterPinger;
45 import com.allanbank.mongodb.client.state.LatencyServerSelector;
46 import com.allanbank.mongodb.client.state.Server;
47 import com.allanbank.mongodb.client.state.ServerUpdateCallback;
48 import com.allanbank.mongodb.util.IOUtils;
49 import com.allanbank.mongodb.util.log.Log;
50 import com.allanbank.mongodb.util.log.LogFactory;
51
52
53
54
55
56
57
58
59 public class ReplicaSetConnectionFactory implements ConnectionFactory {
60
61
62 protected static final Log LOG = LogFactory
63 .getLog(ReplicaSetConnectionFactory.class);
64
65
66 protected final ProxiedConnectionFactory myConnectionFactory;
67
68
69 private final Cluster myCluster;
70
71
72 private final MongoClientConfiguration myConfig;
73
74
75 private final ClusterPinger myPinger;
76
77
78 private final ReplicaSetReconnectStrategy myStrategy;
79
80
81
82
83
84
85
86
87
88 public ReplicaSetConnectionFactory(final ProxiedConnectionFactory factory,
89 final MongoClientConfiguration config) {
90 myConnectionFactory = factory;
91 myConfig = config;
92 myCluster = new Cluster(config, ClusterType.REPLICA_SET);
93 myPinger = new ClusterPinger(myCluster, factory, config);
94
95 myStrategy = new ReplicaSetReconnectStrategy();
96 myStrategy.setConfig(myConfig);
97 myStrategy.setConnectionFactory(myConnectionFactory);
98 myStrategy.setState(myCluster);
99 myStrategy.setSelector(new LatencyServerSelector(myCluster, false));
100
101
102 bootstrap();
103 }
104
105
106
107
108 public void bootstrap() {
109
110 locatePrimary();
111
112
113
114 myPinger.initialSweep(myCluster);
115 myPinger.start();
116 }
117
118
119
120
121
122
123
124
125 @Override
126 public void close() {
127 IOUtils.close(myPinger);
128 IOUtils.close(myConnectionFactory);
129 }
130
131
132
133
134
135
136 @Override
137 public Connection connect() throws IOException {
138
139
140 List<Server> writableServers = myCluster.getWritableServers();
141 for (int i = 0; i < 10; ++i) {
142 servers: for (final Server primary : writableServers) {
143 Connection primaryConn = null;
144 try {
145 primaryConn = myConnectionFactory
146 .connect(primary, myConfig);
147
148 if (isWritable(primary, primaryConn)) {
149
150 final ReplicaSetConnection rsConnection = new ReplicaSetConnection(
151 primaryConn, primary, myCluster,
152 myConnectionFactory, myConfig, myStrategy);
153
154 primaryConn = null;
155
156 return rsConnection;
157 }
158
159 break servers;
160 }
161 catch (final IOException e) {
162 LOG.debug(e, "Error connecting to presumptive primary: {}",
163 e.getMessage());
164 }
165 finally {
166 IOUtils.close(primaryConn);
167 }
168 }
169
170
171 writableServers = locatePrimary();
172 }
173
174
175
176
177 return new ReplicaSetConnection(null, null, myCluster,
178 myConnectionFactory, myConfig, myStrategy);
179 }
180
181
182
183
184
185
186
187 @Override
188 public ClusterStats getClusterStats() {
189 return myCluster;
190 }
191
192
193
194
195
196
197
198 @Override
199 public ClusterType getClusterType() {
200 return ClusterType.REPLICA_SET;
201 }
202
203
204
205
206
207
208
209 @Override
210 public ReconnectStrategy getReconnectStrategy() {
211 return myStrategy;
212 }
213
214
215
216
217
218
219 protected Cluster getCluster() {
220 return myCluster;
221 }
222
223
224
225
226
227
228
229
230
231
232
233 protected boolean isWritable(final Server server,
234 final Connection connection) {
235
236 try {
237 final ServerUpdateCallback replyCallback = new ServerUpdateCallback(
238 server);
239 connection.send(new IsMaster(), replyCallback);
240
241 final Reply reply = replyCallback.get();
242 final List<Document> results = reply.getResults();
243 if (!results.isEmpty()) {
244 final Document doc = results.get(0);
245
246
247 final StringElement primaryName = doc.get(StringElement.class,
248 "primary");
249 if (primaryName != null) {
250 return (primaryName.getValue().equals(connection
251 .getServerName()));
252 }
253 }
254 }
255 catch (final InterruptedException e) {
256
257 LOG.debug(e, "Failure testing if a connection is writable: {}",
258 e.getMessage());
259 }
260 catch (final ExecutionException e) {
261
262 LOG.debug(e, "Failure testing if a connection is writable: {}",
263 e.getMessage());
264 }
265 return false;
266 }
267
268
269
270
271
272
273 protected List<Server> locatePrimary() {
274 for (final InetSocketAddress addr : myConfig.getServerAddresses()) {
275 Connection conn = null;
276 final FutureReplyCallback future = new FutureReplyCallback();
277 try {
278 final Server server = myCluster.add(addr);
279
280 conn = myConnectionFactory.connect(server, myConfig);
281
282 conn.send(new IsMaster(), future);
283
284 final Reply reply = future.get();
285 final List<Document> results = reply.getResults();
286 if (!results.isEmpty()) {
287 final Document doc = results.get(0);
288
289
290
291 if (myConfig.isAutoDiscoverServers()) {
292
293 final List<StringElement> hosts = doc.find(
294 StringElement.class, "hosts", ".*");
295 for (final StringElement host : hosts) {
296 myCluster.add(host.getValue());
297 }
298 }
299
300
301 final StringElement primary = doc.findFirst(
302 StringElement.class, "primary");
303 if (primary != null) {
304 return Collections.singletonList(myCluster.add(primary
305 .getValue()));
306 }
307 }
308 }
309 catch (final IOException ioe) {
310 LOG.warn(ioe, "I/O error during replica-set bootstrap to {}.",
311 addr);
312 }
313 catch (final MongoDbException me) {
314 LOG.warn(me,
315 "MongoDB error during replica-set bootstrap to {}.",
316 addr);
317 }
318 catch (final InterruptedException e) {
319 LOG.warn(e, "Interrupted during replica-set bootstrap to {}.",
320 addr);
321 }
322 catch (final ExecutionException e) {
323 LOG.warn(e, "Error during replica-set bootstrap to {}.", addr);
324 }
325 finally {
326 IOUtils.close(conn, Level.WARNING,
327 "I/O error shutting down replica-set bootstrap connection to "
328 + addr + ".");
329 }
330 }
331 return Collections.emptyList();
332 }
333 }