1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.miloss.fgsms.wsn.broker;
23
24 import java.net.URL;
25
26 import java.util.ArrayList;
27 import java.util.HashMap;
28 import java.util.Iterator;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.UUID;
32 import java.util.concurrent.ConcurrentLinkedQueue;
33 import javax.xml.bind.JAXBElement;
34 import javax.xml.ws.BindingProvider;
35
36 import org.apache.log4j.*;
37 import org.oasis_open.docs.wsn.b_2.*;
38 import org.oasis_open.docs.wsn.b_2.CreatePullPoint;
39 import org.oasis_open.docs.wsn.brw_2.*;
40 import org.oasis_open.docs.wsrf.r_2.ResourceUnknownFaultType;
41 import org.miloss.fgsms.wsn.WSNUtility;
42
43
44
45
46
47
48
49
50 public class SingletonBroker {
51
52 private SingletonBroker() {
53
54 }
55
56 public static SingletonBroker getInstance() {
57 return SingletonBrokerHolder.INSTANCE;
58 }
59 final static Logger log = Logger.getLogger("WS-NotificationBroker");
60
61 @Override
62 public Object clone() throws CloneNotSupportedException {
63 throw new CloneNotSupportedException();
64 }
65 private static long messagesin = 0;
66 private static long messagesout = 0;
67
68 public static long getMessagesin() {
69 return messagesin;
70 }
71
72 public static long getMessagesout() {
73 return messagesout;
74 }
75
76 private static class SingletonBrokerHolder {
77
78 private static final SingletonBroker INSTANCE = new SingletonBroker();
79 }
80 private static Map currentSubscriptions = new HashMap();
81 private static Map currentMailboxes = new HashMap();
82
83
84
85 public static final String MAILBOX_URL_PREFIX = "urn:wsn:mailbox";
86
87
88 static String GetTopicList(Notify n) {
89 String s = new String();
90 if (n == null) {
91 return s;
92 }
93 for (int i = 0; i < n.getNotificationMessage().size(); i++) {
94 if (n.getNotificationMessage().get(i).getTopic() != null) {
95 if (n.getNotificationMessage().get(i).getTopic().getContent() != null) {
96 for (int k = 0; k < n.getNotificationMessage().get(i).getTopic().getContent().size(); k++) {
97 s = s + " " + n.getNotificationMessage().get(i).getTopic().getContent().get(k).toString();
98 }
99 }
100 }
101 }
102 return s.trim();
103 }
104
105 protected static void Dispatch(Notify notify) {
106 messagesin++;
107
108
109 List<SubscriptionInfo> clients = GetMatchingSubscribers(notify.getNotificationMessage());
110 log.log(Level.INFO, GetTopicList(notify));
111 log.log(Level.INFO, "Dispatching WSN-Notify, msg count:" + notify.getNotificationMessage().size() + " to " + clients.size() + " subscribers");
112 NotificationBroker port = null;
113 BindingProvider bp = null;
114 Map<String, Object> ctx = null;
115
116
117 NotificationService client = new NotificationService();
118 port = client.getNotificationPort();
119 bp = (BindingProvider) port;
120 ctx = bp.getRequestContext();
121
122
123 for (int i = 0; i < clients.size(); i++) {
124 boolean ok = false;
125
126 if (!clients.get(i).callback.startsWith(MAILBOX_URL_PREFIX)) {
127 ctx.put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, clients.get(i).callback);
128 for (int k = 0; k < 2; k++) {
129 try {
130 port.notify(notify);
131 int code = (Integer) bp.getResponseContext().get("javax.xml.ws.http.response.code");
132 if (code >= 200 && code < 300) {
133 ok = true;
134 messagesout++;
135 break;
136 }
137 } catch (Exception ex) {
138 log.log(Level.WARN, "could not deliver wsn notify to " + clients.get(i).callback + " " + ex.getMessage());
139 log.log(Level.DEBUG, "could not deliver wsn notify to " + clients.get(i).callback, ex);
140
141 }
142 if (!ok) {
143 log.log(Level.WARN, "dropping subscription " + clients.get(i).subid + " retry count exceeded");
144 currentSubscriptions.remove(clients.get(i).subid);
145 }
146 }
147
148 } else {
149
150 Mailbox box = (Mailbox) currentMailboxes.get(clients.get(i).callback);
151 if (box != null) {
152 box.messages.add(notify);
153 }
154
155 }
156
157 }
158 log.log(Level.INFO, "Stats " + messagesin + " in, " + messagesout + " out");
159 }
160
161
162
163
164
165
166
167
168 private static List<SubscriptionInfo> GetMatchingSubscribers(List<NotificationMessageHolderType> notificationMessage) {
169 ArrayList<SubscriptionInfo> list = new ArrayList<SubscriptionInfo>();
170 if (notificationMessage.isEmpty()) {
171 return list;
172 }
173 Map subs = new HashMap(currentSubscriptions);
174 Iterator it = subs.keySet().iterator();
175 while (it.hasNext()) {
176 SubscriptionInfo si = (SubscriptionInfo) subs.get(it.next());
177
178 for (int i = 0; i < notificationMessage.size(); i++) {
179 if (SubscriptionContains(si, notificationMessage.get(i).getTopic())) {
180 list.add(si);
181 }
182
183 }
184 }
185 return list;
186 }
187
188 private static boolean SubscriptionContains(SubscriptionInfo si, TopicExpressionType topic) {
189 for (int i = 0; i < si.topics.size(); i++) {
190 List<String> topics = WSNUtility.topicExpressionToList(topic);
191 for (int k = 0; k < topics.size(); k++) {
192 if (si.topics.get(i).toLowerCase().startsWith(topics.get(k).toLowerCase())) {
193 return true;
194 }
195 }
196 }
197 return false;
198 }
199
200 protected static String AddSubscription(Subscribe subscribeRequest) {
201 String s = UUID.randomUUID().toString();
202 SubscriptionInfo info = new SubscriptionInfo();
203 info.data = subscribeRequest;
204 info.callback = WSNUtility.getWSAAddress(subscribeRequest.getConsumerReference());
205 info.subid = s;
206 info.topics = new ArrayList<String>();
207 for (int i = 0; i < subscribeRequest.getFilter().getAny().size(); i++) {
208 if ((subscribeRequest.getFilter().getAny().get(i) instanceof JAXBElement)) {
209 JAXBElement<TopicExpressionType> te = (JAXBElement<TopicExpressionType>) subscribeRequest.getFilter().getAny().get(i);
210 info.topics = WSNUtility.topicExpressionToList(te.getValue());
211 }
212 }
213 currentSubscriptions.put(s, info);
214 log.log(Level.INFO, "adding subscription " + info.subid + " topics " + WSNUtility.listStringtoString(info.topics) + " to address " + info.callback);
215
216 return s;
217 }
218
219 protected static void RemoveSubscription(List<Object> any) {
220 for (int i = 0; i < any.size(); i++) {
221 log.log(Level.INFO, "attempting to remove subscription " + any.get(i).toString());
222 if (any.get(i) instanceof String) {
223 if(RemoveSubscription((String) any.get(i)))
224 log.log(Level.INFO, "successfully removed " + any.get(i).toString());
225 else log.log(Level.WARN, "unsuccessful removal " + any.get(i).toString() + " it probably didn't exist");
226 }
227 }
228 }
229
230 protected static boolean RemoveSubscription(String subscriptionId) {
231 Object remove = currentSubscriptions.remove(subscriptionId);
232 if (remove == null) {
233 return false;
234 }
235 return true;
236 }
237
238 protected static String CreateMailBox(CreatePullPoint createPullPointRequest, String username) {
239 if (createPullPointRequest == null) {
240 return null;
241 }
242 Mailbox box = new Mailbox();
243 box.messages = new ConcurrentLinkedQueue<Notify>();
244 box.username = username;
245 box.id = MAILBOX_URL_PREFIX + UUID.randomUUID().toString();
246 currentMailboxes.put(box.id, box);
247 return box.id;
248 }
249
250 protected static void DestroyMailBox(List<Object> any) {
251 for (int i = 0; i < any.size(); i++) {
252 if (any.get(i) instanceof String) {
253 Object remove = currentMailboxes.remove((String) any.get(i));
254 if (remove == null)
255 ;
256 remove = null;
257 }
258 }
259 }
260
261 protected static GetMessagesResponse GetMessages(GetMessages messagesRequest) throws ResourceUnknownFault, UnableToGetMessagesFault {
262 if (messagesRequest == null) {
263 throw new UnableToGetMessagesFault("null request", new UnableToGetMessagesFaultType());
264 }
265 if (messagesRequest.getAny().isEmpty()) {
266 throw new UnableToGetMessagesFault("at least one pull point mailbox must be specified", new UnableToGetMessagesFaultType());
267 }
268 GetMessagesResponse res = new GetMessagesResponse();
269 int count = 0;
270 long maxmessages = 100;
271 if (messagesRequest.getMaximumNumber() != null) {
272 maxmessages = messagesRequest.getMaximumNumber().longValue();
273 }
274 if (maxmessages <= 0) {
275 maxmessages = 10;
276 }
277 if (maxmessages > 1000) {
278 maxmessages = 1000;
279 }
280
281 for (int i = 0; i < messagesRequest.getAny().size(); i++) {
282 Mailbox box = (Mailbox) currentMailboxes.get((String) messagesRequest.getAny().get(i));
283 if (box == null) {
284 throw new ResourceUnknownFault("unknown pull point", new ResourceUnknownFaultType());
285 }
286 while (count < maxmessages && !box.messages.isEmpty()) {
287 Notify poll = box.messages.poll();
288 if (poll != null) {
289 res.getNotificationMessage().addAll(poll.getNotificationMessage());
290 count++;
291 }
292 }
293 }
294 return res;
295 }
296
297 protected static void PauseSubscription(PauseSubscription pauseSubscriptionRequest) throws PauseFailedFault, ResourceUnknownFault {
298
299 for (int i = 0; i < pauseSubscriptionRequest.getAny().size(); i++) {
300 if (pauseSubscriptionRequest.getAny().get(i) instanceof String) {
301 SubscriptionInfo get = (SubscriptionInfo) currentSubscriptions.get((String) pauseSubscriptionRequest.getAny().get(i));
302 if (get == null) {
303 throw new ResourceUnknownFault("subscription does not exist", new ResourceUnknownFaultType());
304 }
305 get.paused = true;
306 log.log(Level.INFO, "Subscription id " + get.subid + " to callback " + get.callback + " paused");
307 }
308 }
309 }
310
311 protected static void ResumeSubscription(ResumeSubscription resumeSubscriptionRequest) throws ResumeFailedFault, ResourceUnknownFault {
312 for (int i = 0; i < resumeSubscriptionRequest.getAny().size(); i++) {
313 if (resumeSubscriptionRequest.getAny().get(i) instanceof String) {
314 SubscriptionInfo get = (SubscriptionInfo) currentSubscriptions.get((String) resumeSubscriptionRequest.getAny().get(i));
315 if (get == null) {
316 throw new ResourceUnknownFault("subscription does not exist", new ResourceUnknownFaultType());
317 }
318 get.paused = false;
319 log.log(Level.INFO, "Subscription id " + get.subid + " to callback " + get.callback + " resumed");
320 }
321 }
322 }
323 }