1 /*
2 * #%L
3 * ShardedConnection.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20
21 package com.allanbank.mongodb.client.connection.sharded;
22
23 import java.io.IOException;
24 import java.util.Collections;
25 import java.util.List;
26
27 import com.allanbank.mongodb.MongoClientConfiguration;
28 import com.allanbank.mongodb.MongoDbException;
29 import com.allanbank.mongodb.ReadPreference;
30 import com.allanbank.mongodb.client.Message;
31 import com.allanbank.mongodb.client.connection.Connection;
32 import com.allanbank.mongodb.client.connection.proxy.AbstractProxyMultipleConnection;
33 import com.allanbank.mongodb.client.connection.proxy.ConnectionInfo;
34 import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory;
35 import com.allanbank.mongodb.client.state.Cluster;
36 import com.allanbank.mongodb.client.state.Server;
37 import com.allanbank.mongodb.client.state.ServerSelector;
38 import com.allanbank.mongodb.util.log.Log;
39 import com.allanbank.mongodb.util.log.LogFactory;
40
41 /**
42 * Provides a {@link Connection} implementation for connecting to a sharded
43 * environment via mongos servers.
44 *
45 * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
46 * mutated in incompatible ways between any two releases of the driver.
47 * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved
48 */
49 public class ShardedConnection extends AbstractProxyMultipleConnection<Server> {
50
51 /** The logger for the {@link ShardedConnection}. */
52 private static final Log LOG = LogFactory.getLog(ShardedConnection.class);
53
54 /** The selector for the server when we need to reconnect. */
55 private final ServerSelector mySelector;
56
57 /**
58 * Creates a new {@link ShardedConnection}.
59 *
60 * @param proxiedConnection
61 * The connection being proxied.
62 * @param server
63 * The primary server this connection is connected to.
64 * @param cluster
65 * The state of the cluster for finding secondary connections.
66 * @param selector
67 * The selector for servers when we need to reconnect.
68 * @param factory
69 * The connection factory for opening secondary connections.
70 * @param config
71 * The MongoDB client configuration.
72 */
73 public ShardedConnection(final Connection proxiedConnection,
74 final Server server, final Cluster cluster,
75 final ServerSelector selector,
76 final ProxiedConnectionFactory factory,
77 final MongoClientConfiguration config) {
78 super(proxiedConnection, server, cluster, factory, config);
79
80 mySelector = selector;
81 }
82
83 /**
84 * {@inheritDoc}
85 * <p>
86 * Overridden to return the canonical name of the primary.
87 * </p>
88 */
89 @Override
90 public String getServerName() {
91 if (myMainKey != null) {
92 return myMainKey.getCanonicalName();
93 }
94 return "UNKNOWN";
95 }
96
97 /**
98 * {@inheritDoc}
99 * <p>
100 * Overridden to create a connection to the server.
101 * </p>
102 */
103 @Override
104 protected Connection connect(final Server server) {
105 Connection conn = null;
106 try {
107 conn = myFactory.connect(server, myConfig);
108
109 conn = cacheConnection(server, conn);
110 }
111 catch (final IOException e) {
112 LOG.info(e, "Could not connect to the server '{}': {}",
113 server.getCanonicalName(), e.getMessage());
114 }
115 return conn;
116 }
117
118 /**
119 * {@inheritDoc}
120 * <p>
121 * Overridden for testing access.
122 * </p>
123 */
124 @Override
125 protected Connection connection(final Server server) {
126 LOG.debug("Lookup connection for server: {}", server.getCanonicalName());
127 return super.connection(server);
128 }
129
130 /**
131 * Locates the set of servers that can be used to send the specified
132 * messages. This method will attempt to connect to the primary server if
133 * there is not a current connection to the primary.
134 *
135 * @param message1
136 * The first message to send.
137 * @param message2
138 * The second message to send. May be <code>null</code>.
139 * @return The servers that can be used.
140 * @throws MongoDbException
141 * On a failure to locate a server that all messages can be sent
142 * to.
143 */
144 @Override
145 protected List<Server> findPotentialKeys(final Message message1,
146 final Message message2) throws MongoDbException {
147 List<Server> servers = resolveServerReadPreference(message1, message2);
148
149 if (servers.isEmpty()) {
150 // If we get here and a reconnect is in progress then
151 // block for the reconnect. Once the reconnect is complete, try
152 // again.
153 if (myMainKey == null) {
154 // Wait for a reconnect.
155 final ConnectionInfo<Server> newConnInfo = reconnectMain();
156 if (newConnInfo != null) {
157 updateMain(newConnInfo);
158 servers = resolveServerReadPreference(message1, message2);
159 }
160 }
161
162 if (servers.isEmpty()) {
163 throw createReconnectFailure(message1, message2);
164 }
165 }
166
167 return servers;
168 }
169
170 /**
171 * {@inheritDoc}
172 * <p>
173 * Overridden to return the string {@code Sharded}.
174 * </p>
175 */
176 @Override
177 protected String getConnectionType() {
178 return "Sharded";
179 }
180
181 /**
182 * {@inheritDoc}
183 * <p>
184 * Overridden creates a connection back to the primary server.
185 * </p>
186 */
187 @Override
188 protected ConnectionInfo<Server> reconnectMain() {
189 for (final Server server : mySelector.pickServers()) {
190 try {
191 final Connection conn = myFactory.connect(server, myConfig);
192
193 return new ConnectionInfo<Server>(conn, server);
194 }
195 catch (final IOException ioe) {
196 // Ignored. Will return null.
197 LOG.debug(ioe, "Could not connect to '{}': {}",
198 server.getCanonicalName(), ioe.getMessage());
199 }
200 }
201 return null;
202 }
203
204 /**
205 * Locates the set of servers that can be used to send the specified
206 * messages.
207 *
208 * @param message1
209 * The first message to send.
210 * @param message2
211 * The second message to send. May be <code>null</code>.
212 * @return The servers that can be used.
213 */
214 private List<Server> resolveServerReadPreference(final Message message1,
215 final Message message2) {
216
217 List<Server> servers = Collections.emptyList();
218
219 final Server main = myMainKey;
220 if (main != null) {
221 servers = Collections.singletonList(main);
222 }
223
224 if (message1 != null) {
225 ReadPreference pref = message1.getReadPreference();
226 if (pref.getServer() != null) {
227 servers = Collections.singletonList(myCluster.get(pref
228 .getServer()));
229 }
230 else if (message2 != null) {
231 pref = message2.getReadPreference();
232 if (pref.getServer() != null) {
233 servers = Collections.singletonList(myCluster.get(pref
234 .getServer()));
235 }
236 }
237 }
238 return servers;
239 }
240 }