1 /*
2 * #%L
3 * ReplyHandler.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20
21 package com.allanbank.mongodb.client.callback;
22
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.RejectedExecutionException;
25
26 import com.allanbank.mongodb.client.message.Reply;
27
28 /**
29 * ReplyHandler provides the capability to properly handle the replies to a
30 * callback.
31 *
32 * @copyright 2012-2014, Allanbank Consulting, Inc., All Rights Reserved
33 */
34 public class ReplyHandler implements Runnable {
35
36 /** The socket that we are receiving for. */
37 private static final ThreadLocal<Receiver> ourReceiver = new ThreadLocal<Receiver>();
38
39 /**
40 * Raise an error on the callback, if any. Will execute the request on a
41 * background thread if provided.
42 *
43 * @param exception
44 * The thrown exception.
45 * @param replyCallback
46 * The callback for the reply to the message.
47 * @param executor
48 * The executor to use for the back-grounding the reply handling.
49 */
50 public static void raiseError(final Throwable exception,
51 final ReplyCallback replyCallback, final Executor executor) {
52 if (replyCallback != null) {
53 if (executor != null) {
54 try {
55 executor.execute(new ReplyHandler(replyCallback, exception));
56 }
57 catch (final RejectedExecutionException rej) {
58 // Run on this thread.
59 replyCallback.exception(exception);
60 }
61 }
62 else {
63 replyCallback.exception(exception);
64 }
65 }
66 }
67
68 /**
69 * Updates to set the reply for the callback, if any.
70 *
71 * @param receiver
72 * The socket receiving the message.
73 * @param reply
74 * The reply.
75 * @param replyCallback
76 * The callback for the reply to the message.
77 * @param executor
78 * The executor to use for the back-grounding the reply handling.
79 */
80 public static void reply(final Receiver receiver, final Reply reply,
81 final ReplyCallback replyCallback, final Executor executor) {
82 if (replyCallback != null) {
83 // We know the FutureCallback will not block or take long to process
84 // so just use this thread in that case.
85 final boolean lightWeight = replyCallback.isLightWeight();
86 if (!lightWeight && (executor != null)) {
87 try {
88 executor.execute(new ReplyHandler(replyCallback, reply));
89 }
90 catch (final RejectedExecutionException rej) {
91 // Run on this thread.
92 run(receiver, reply, replyCallback);
93 }
94 }
95 else {
96 run(receiver, reply, replyCallback);
97 }
98 }
99 }
100
101 /**
102 * If there is a pending reply tries to process that reply.
103 */
104 public static void tryReceive() {
105 final Receiver receiver = ourReceiver.get();
106
107 if (receiver != null) {
108 receiver.tryReceive();
109 }
110 }
111
112 /**
113 * Runs the callback on the current thread.
114 *
115 * @param receiver
116 * The receiver to be run.
117 * @param reply
118 * The reply to the provide to the callback.
119 * @param replyCallback
120 * The reply callback.
121 */
122 private static void run(final Receiver receiver, final Reply reply,
123 final ReplyCallback replyCallback) {
124 final Receiver before = ourReceiver.get();
125 try {
126 ourReceiver.set(receiver);
127 replyCallback.callback(reply);
128 }
129 finally {
130 ourReceiver.set(before);
131 }
132 }
133
134 /** The exception raised from processing the message. */
135 private final Throwable myError;
136
137 /** The reply to the message. */
138 private final Reply myReply;
139
140 /** The callback for the reply to the message. */
141 private final ReplyCallback myReplyCallback;
142
143 /**
144 * Creates a new ReplyHandler.
145 *
146 * @param replyCallback
147 * The callback for the message.
148 * @param reply
149 * The reply.
150 */
151 public ReplyHandler(final ReplyCallback replyCallback, final Reply reply) {
152 super();
153 myReplyCallback = replyCallback;
154 myReply = reply;
155 myError = null;
156 }
157
158 /**
159 * Creates a new ReplyHandler.
160 *
161 * @param replyCallback
162 * The callback for the message.
163 * @param exception
164 * The thrown exception.
165 */
166 public ReplyHandler(final ReplyCallback replyCallback,
167 final Throwable exception) {
168 super();
169 myReplyCallback = replyCallback;
170 myError = exception;
171 myReply = null;
172 }
173
174 /**
175 * {@inheritDoc}
176 * <p>
177 * Overridden to process the callback response.
178 * </p>
179 */
180 @Override
181 public void run() {
182 if (myReply != null) {
183 reply(null, myReply, myReplyCallback, null);
184 }
185 else if (myError != null) {
186 raiseError(myError, myReplyCallback, null);
187 }
188 }
189 }