1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package com.allanbank.mongodb.client.connection.socket;
21
22 import java.beans.PropertyChangeEvent;
23 import java.beans.PropertyChangeListener;
24 import java.io.IOException;
25 import java.lang.ref.Reference;
26 import java.net.InetSocketAddress;
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.List;
30 import java.util.concurrent.ExecutionException;
31
32 import com.allanbank.mongodb.MongoClientConfiguration;
33 import com.allanbank.mongodb.Version;
34 import com.allanbank.mongodb.bson.io.BufferingBsonOutputStream;
35 import com.allanbank.mongodb.bson.io.StringDecoderCache;
36 import com.allanbank.mongodb.bson.io.StringEncoderCache;
37 import com.allanbank.mongodb.client.ClusterStats;
38 import com.allanbank.mongodb.client.ClusterType;
39 import com.allanbank.mongodb.client.connection.Connection;
40 import com.allanbank.mongodb.client.connection.ConnectionFactory;
41 import com.allanbank.mongodb.client.connection.ReconnectStrategy;
42 import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory;
43 import com.allanbank.mongodb.client.message.IsMaster;
44 import com.allanbank.mongodb.client.state.Cluster;
45 import com.allanbank.mongodb.client.state.LatencyServerSelector;
46 import com.allanbank.mongodb.client.state.Server;
47 import com.allanbank.mongodb.client.state.ServerSelector;
48 import com.allanbank.mongodb.client.state.ServerUpdateCallback;
49 import com.allanbank.mongodb.client.state.SimpleReconnectStrategy;
50 import com.allanbank.mongodb.util.log.Log;
51 import com.allanbank.mongodb.util.log.LogFactory;
52
53
54
55
56
57
58
59
60 public class SocketConnectionFactory implements ProxiedConnectionFactory {
61
62
63 private static final Log LOG = LogFactory
64 .getLog(SocketConnectionFactory.class);
65
66
67
68
69
70
71
72 private ThreadLocal<Reference<BufferingBsonOutputStream>> myBuffers;
73
74
75 private final Cluster myCluster;
76
77
78 private final MongoClientConfiguration myConfig;
79
80
81 private final ConfigurationListener myConfigListener;
82
83
84 private final StringDecoderCache myDecoderCache;
85
86
87 private final StringEncoderCache myEncoderCache;
88
89
90 private final ServerSelector myServerSelector;
91
92
93
94
95
96
97
98 public SocketConnectionFactory(final MongoClientConfiguration config) {
99 super();
100 myConfig = config;
101 myCluster = new Cluster(config, ClusterType.STAND_ALONE);
102 myServerSelector = new LatencyServerSelector(myCluster, true);
103 myBuffers = new ThreadLocal<Reference<BufferingBsonOutputStream>>();
104
105 myConfigListener = new ConfigurationListener();
106 myConfig.addPropertyChangeListener(myConfigListener);
107
108 myDecoderCache = new StringDecoderCache();
109 myDecoderCache.setMaxCacheEntries(config.getMaxCachedStringEntries());
110 myDecoderCache.setMaxCacheLength(config.getMaxCachedStringLength());
111
112 myEncoderCache = new StringEncoderCache();
113 myEncoderCache.setMaxCacheEntries(config.getMaxCachedStringEntries());
114 myEncoderCache.setMaxCacheLength(config.getMaxCachedStringLength());
115 }
116
117
118
119
120
121
122
123 @Override
124 public void close() {
125 myBuffers = null;
126 myConfig.removePropertyChangeListener(myConfigListener);
127
128
129 myDecoderCache.setMaxCacheEntries(0);
130 myDecoderCache.setMaxCacheLength(0);
131 }
132
133
134
135
136
137
138
139
140
141 @Override
142 public Connection connect() throws IOException {
143 final List<InetSocketAddress> servers = new ArrayList<InetSocketAddress>(
144 myConfig.getServerAddresses());
145
146
147 IOException last = null;
148 Collections.shuffle(servers);
149 for (final InetSocketAddress address : servers) {
150 try {
151 final Server server = myCluster.add(address);
152 final Connection conn = connect(server, myConfig);
153
154
155 final ServerUpdateCallback cb = new ServerUpdateCallback(server);
156 conn.send(new IsMaster(), cb);
157
158 if (Version.UNKNOWN.equals(server.getVersion())) {
159
160 try {
161 cb.get();
162 }
163 catch (final ExecutionException e) {
164
165 LOG.debug(e, "Could not execute an 'ismaster' command.");
166 }
167 catch (final InterruptedException e) {
168
169 LOG.debug(e, "Could not execute an 'ismaster' command.");
170 }
171 }
172
173 return conn;
174 }
175 catch (final IOException error) {
176 last = error;
177 }
178 }
179
180 if (last != null) {
181 throw last;
182 }
183 throw new IOException("Could not connect to any server: " + servers);
184 }
185
186
187
188
189
190
191
192
193
194
195
196
197 @Override
198 public Connection connect(final Server server,
199 final MongoClientConfiguration config) throws IOException {
200
201 final AbstractSocketConnection connection;
202
203 switch (myConfig.getConnectionModel()) {
204 case SENDER_RECEIVER_THREAD: {
205 connection = new TwoThreadSocketConnection(server, myConfig,
206 myEncoderCache, myDecoderCache);
207 break;
208 }
209 default: {
210 connection = new SocketConnection(server, myConfig, myEncoderCache,
211 myDecoderCache, myBuffers);
212 break;
213 }
214 }
215
216
217 connection.start();
218
219 return connection;
220 }
221
222
223
224
225
226
227 public Cluster getCluster() {
228 return myCluster;
229 }
230
231
232
233
234
235
236
237 @Override
238 public ClusterStats getClusterStats() {
239 return myCluster;
240 }
241
242
243
244
245
246
247
248 @Override
249 public ClusterType getClusterType() {
250 return ClusterType.STAND_ALONE;
251 }
252
253
254
255
256
257
258
259 @Override
260 public ReconnectStrategy getReconnectStrategy() {
261 final SimpleReconnectStrategy strategy = new SimpleReconnectStrategy();
262 strategy.setConfig(myConfig);
263 strategy.setConnectionFactory(this);
264 strategy.setSelector(myServerSelector);
265 strategy.setState(myCluster);
266
267 return strategy;
268 }
269
270
271
272
273
274
275
276 protected void configurationChanged(final PropertyChangeEvent evt) {
277 final String name = evt.getPropertyName();
278 final Object value = evt.getNewValue();
279 if ("maxCachedStringEntries".equals(name) && (value instanceof Number)) {
280 myDecoderCache.setMaxCacheEntries(((Number) value).intValue());
281 myEncoderCache.setMaxCacheEntries(((Number) value).intValue());
282 }
283 else if ("maxCachedStringLength".equals(name)
284 && (value instanceof Number)) {
285 myDecoderCache.setMaxCacheLength(((Number) value).intValue());
286 myEncoderCache.setMaxCacheLength(((Number) value).intValue());
287 }
288 }
289
290
291
292
293
294
295
296
297
298
299 protected final class ConfigurationListener implements
300 PropertyChangeListener {
301 @Override
302 public void propertyChange(final PropertyChangeEvent evt) {
303 configurationChanged(evt);
304 }
305 }
306 }