1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
|
20 | |
|
21 | |
package com.allanbank.mongodb.client.connection.sharded; |
22 | |
|
23 | |
import java.io.IOException; |
24 | |
import java.util.Collections; |
25 | |
import java.util.List; |
26 | |
|
27 | |
import com.allanbank.mongodb.MongoClientConfiguration; |
28 | |
import com.allanbank.mongodb.MongoDbException; |
29 | |
import com.allanbank.mongodb.ReadPreference; |
30 | |
import com.allanbank.mongodb.client.Message; |
31 | |
import com.allanbank.mongodb.client.connection.Connection; |
32 | |
import com.allanbank.mongodb.client.connection.proxy.AbstractProxyMultipleConnection; |
33 | |
import com.allanbank.mongodb.client.connection.proxy.ConnectionInfo; |
34 | |
import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory; |
35 | |
import com.allanbank.mongodb.client.state.Cluster; |
36 | |
import com.allanbank.mongodb.client.state.Server; |
37 | |
import com.allanbank.mongodb.client.state.ServerSelector; |
38 | |
import com.allanbank.mongodb.util.log.Log; |
39 | |
import com.allanbank.mongodb.util.log.LogFactory; |
40 | |
|
41 | |
|
42 | |
|
43 | |
|
44 | |
|
45 | |
|
46 | |
|
47 | |
|
48 | |
|
49 | 0 | public class ShardedConnection extends AbstractProxyMultipleConnection<Server> { |
50 | |
|
51 | |
|
52 | 1 | private static final Log LOG = LogFactory.getLog(ShardedConnection.class); |
53 | |
|
54 | |
|
55 | |
private final ServerSelector mySelector; |
56 | |
|
57 | |
|
58 | |
|
59 | |
|
60 | |
|
61 | |
|
62 | |
|
63 | |
|
64 | |
|
65 | |
|
66 | |
|
67 | |
|
68 | |
|
69 | |
|
70 | |
|
71 | |
|
72 | |
|
73 | |
public ShardedConnection(final Connection proxiedConnection, |
74 | |
final Server server, final Cluster cluster, |
75 | |
final ServerSelector selector, |
76 | |
final ProxiedConnectionFactory factory, |
77 | |
final MongoClientConfiguration config) { |
78 | 20 | super(proxiedConnection, server, cluster, factory, config); |
79 | |
|
80 | 20 | mySelector = selector; |
81 | 20 | } |
82 | |
|
83 | |
|
84 | |
|
85 | |
|
86 | |
|
87 | |
|
88 | |
|
89 | |
@Override |
90 | |
public String getServerName() { |
91 | 1 | if (myMainKey != null) { |
92 | 1 | return myMainKey.getCanonicalName(); |
93 | |
} |
94 | 0 | return "UNKNOWN"; |
95 | |
} |
96 | |
|
97 | |
|
98 | |
|
99 | |
|
100 | |
|
101 | |
|
102 | |
|
103 | |
@Override |
104 | |
protected Connection connect(final Server server) { |
105 | 2 | Connection conn = null; |
106 | |
try { |
107 | 2 | conn = myFactory.connect(server, myConfig); |
108 | |
|
109 | 2 | conn = cacheConnection(server, conn); |
110 | |
} |
111 | 0 | catch (final IOException e) { |
112 | 0 | LOG.info(e, "Could not connect to the server '{}': {}", |
113 | |
server.getCanonicalName(), e.getMessage()); |
114 | 2 | } |
115 | 2 | return conn; |
116 | |
} |
117 | |
|
118 | |
|
119 | |
|
120 | |
|
121 | |
|
122 | |
|
123 | |
|
124 | |
@Override |
125 | |
protected Connection connection(final Server server) { |
126 | 1 | LOG.debug("Lookup connection for server: {}", server.getCanonicalName()); |
127 | 1 | return super.connection(server); |
128 | |
} |
129 | |
|
130 | |
|
131 | |
|
132 | |
|
133 | |
|
134 | |
|
135 | |
|
136 | |
|
137 | |
|
138 | |
|
139 | |
|
140 | |
|
141 | |
|
142 | |
|
143 | |
|
144 | |
@Override |
145 | |
protected List<Server> findPotentialKeys(final Message message1, |
146 | |
final Message message2) throws MongoDbException { |
147 | 4 | List<Server> servers = resolveServerReadPreference(message1, message2); |
148 | |
|
149 | 4 | if (servers.isEmpty()) { |
150 | |
|
151 | |
|
152 | |
|
153 | 0 | if (myMainKey == null) { |
154 | |
|
155 | 0 | final ConnectionInfo<Server> newConnInfo = reconnectMain(); |
156 | 0 | if (newConnInfo != null) { |
157 | 0 | updateMain(newConnInfo); |
158 | 0 | servers = resolveServerReadPreference(message1, message2); |
159 | |
} |
160 | |
} |
161 | |
|
162 | 0 | if (servers.isEmpty()) { |
163 | 0 | throw createReconnectFailure(message1, message2); |
164 | |
} |
165 | |
} |
166 | |
|
167 | 4 | return servers; |
168 | |
} |
169 | |
|
170 | |
|
171 | |
|
172 | |
|
173 | |
|
174 | |
|
175 | |
|
176 | |
@Override |
177 | |
protected String getConnectionType() { |
178 | 3 | return "Sharded"; |
179 | |
} |
180 | |
|
181 | |
|
182 | |
|
183 | |
|
184 | |
|
185 | |
|
186 | |
|
187 | |
@Override |
188 | |
protected ConnectionInfo<Server> reconnectMain() { |
189 | 2 | for (final Server server : mySelector.pickServers()) { |
190 | |
try { |
191 | 3 | final Connection conn = myFactory.connect(server, myConfig); |
192 | |
|
193 | 2 | return new ConnectionInfo<Server>(conn, server); |
194 | |
} |
195 | 1 | catch (final IOException ioe) { |
196 | |
|
197 | 1 | LOG.debug(ioe, "Could not connect to '{}': {}", |
198 | |
server.getCanonicalName(), ioe.getMessage()); |
199 | |
} |
200 | 1 | } |
201 | 0 | return null; |
202 | |
} |
203 | |
|
204 | |
|
205 | |
|
206 | |
|
207 | |
|
208 | |
|
209 | |
|
210 | |
|
211 | |
|
212 | |
|
213 | |
|
214 | |
private List<Server> resolveServerReadPreference(final Message message1, |
215 | |
final Message message2) { |
216 | |
|
217 | 4 | List<Server> servers = Collections.emptyList(); |
218 | |
|
219 | 4 | final Server main = myMainKey; |
220 | 4 | if (main != null) { |
221 | 4 | servers = Collections.singletonList(main); |
222 | |
} |
223 | |
|
224 | 4 | if (message1 != null) { |
225 | 4 | ReadPreference pref = message1.getReadPreference(); |
226 | 4 | if (pref.getServer() != null) { |
227 | 1 | servers = Collections.singletonList(myCluster.get(pref |
228 | |
.getServer())); |
229 | |
} |
230 | 3 | else if (message2 != null) { |
231 | 2 | pref = message2.getReadPreference(); |
232 | 2 | if (pref.getServer() != null) { |
233 | 1 | servers = Collections.singletonList(myCluster.get(pref |
234 | |
.getServer())); |
235 | |
} |
236 | |
} |
237 | |
} |
238 | 4 | return servers; |
239 | |
} |
240 | |
} |