1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package com.allanbank.mongodb.client.connection.sharded;
22
23 import java.io.IOException;
24 import java.util.Collections;
25 import java.util.List;
26
27 import com.allanbank.mongodb.MongoClientConfiguration;
28 import com.allanbank.mongodb.MongoDbException;
29 import com.allanbank.mongodb.ReadPreference;
30 import com.allanbank.mongodb.client.Message;
31 import com.allanbank.mongodb.client.connection.Connection;
32 import com.allanbank.mongodb.client.connection.proxy.AbstractProxyMultipleConnection;
33 import com.allanbank.mongodb.client.connection.proxy.ConnectionInfo;
34 import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory;
35 import com.allanbank.mongodb.client.state.Cluster;
36 import com.allanbank.mongodb.client.state.Server;
37 import com.allanbank.mongodb.client.state.ServerSelector;
38 import com.allanbank.mongodb.util.log.Log;
39 import com.allanbank.mongodb.util.log.LogFactory;
40
41
42
43
44
45
46
47
48
49 public class ShardedConnection extends AbstractProxyMultipleConnection<Server> {
50
51
52 private static final Log LOG = LogFactory.getLog(ShardedConnection.class);
53
54
55 private final ServerSelector mySelector;
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 public ShardedConnection(final Connection proxiedConnection,
74 final Server server, final Cluster cluster,
75 final ServerSelector selector,
76 final ProxiedConnectionFactory factory,
77 final MongoClientConfiguration config) {
78 super(proxiedConnection, server, cluster, factory, config);
79
80 mySelector = selector;
81 }
82
83
84
85
86
87
88
89 @Override
90 public String getServerName() {
91 if (myMainKey != null) {
92 return myMainKey.getCanonicalName();
93 }
94 return "UNKNOWN";
95 }
96
97
98
99
100
101
102
103 @Override
104 protected Connection connect(final Server server) {
105 Connection conn = null;
106 try {
107 conn = myFactory.connect(server, myConfig);
108
109 conn = cacheConnection(server, conn);
110 }
111 catch (final IOException e) {
112 LOG.info(e, "Could not connect to the server '{}': {}",
113 server.getCanonicalName(), e.getMessage());
114 }
115 return conn;
116 }
117
118
119
120
121
122
123
124 @Override
125 protected Connection connection(final Server server) {
126 LOG.debug("Lookup connection for server: {}", server.getCanonicalName());
127 return super.connection(server);
128 }
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144 @Override
145 protected List<Server> findPotentialKeys(final Message message1,
146 final Message message2) throws MongoDbException {
147 List<Server> servers = resolveServerReadPreference(message1, message2);
148
149 if (servers.isEmpty()) {
150
151
152
153 if (myMainKey == null) {
154
155 final ConnectionInfo<Server> newConnInfo = reconnectMain();
156 if (newConnInfo != null) {
157 updateMain(newConnInfo);
158 servers = resolveServerReadPreference(message1, message2);
159 }
160 }
161
162 if (servers.isEmpty()) {
163 throw createReconnectFailure(message1, message2);
164 }
165 }
166
167 return servers;
168 }
169
170
171
172
173
174
175
176 @Override
177 protected String getConnectionType() {
178 return "Sharded";
179 }
180
181
182
183
184
185
186
187 @Override
188 protected ConnectionInfo<Server> reconnectMain() {
189 for (final Server server : mySelector.pickServers()) {
190 try {
191 final Connection conn = myFactory.connect(server, myConfig);
192
193 return new ConnectionInfo<Server>(conn, server);
194 }
195 catch (final IOException ioe) {
196
197 LOG.debug(ioe, "Could not connect to '{}': {}",
198 server.getCanonicalName(), ioe.getMessage());
199 }
200 }
201 return null;
202 }
203
204
205
206
207
208
209
210
211
212
213
214 private List<Server> resolveServerReadPreference(final Message message1,
215 final Message message2) {
216
217 List<Server> servers = Collections.emptyList();
218
219 final Server main = myMainKey;
220 if (main != null) {
221 servers = Collections.singletonList(main);
222 }
223
224 if (message1 != null) {
225 ReadPreference pref = message1.getReadPreference();
226 if (pref.getServer() != null) {
227 servers = Collections.singletonList(myCluster.get(pref
228 .getServer()));
229 }
230 else if (message2 != null) {
231 pref = message2.getReadPreference();
232 if (pref.getServer() != null) {
233 servers = Collections.singletonList(myCluster.get(pref
234 .getServer()));
235 }
236 }
237 }
238 return servers;
239 }
240 }