View Javadoc
1   /**
2    * This Source Code Form is subject to the terms of the Mozilla Public
3    * License, v. 2.0. If a copy of the MPL was not distributed with this
4    * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5    *
6    * If it is not possible or desirable to put the notice in a particular
7    * file, then You may include the notice in a location (such as a LICENSE
8    * file in a relevant directory) where a recipient would be likely to look
9    * for such a notice.
10  
11   * 
12   */
13   
14  /*  ---------------------------------------------------------------------------
15   *  U.S. Government, Department of the Army
16   *  Army Materiel Command
17   *  Research Development Engineering Command
18   *  Communications Electronics Research Development and Engineering Center
19   *  ---------------------------------------------------------------------------
20   */
21  
22  package org.miloss.fgsms.wsn.broker;
23  
24  import java.net.URL;
25  
26  import java.util.ArrayList;
27  import java.util.HashMap;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.UUID;
32  import java.util.concurrent.ConcurrentLinkedQueue;
33  import javax.xml.bind.JAXBElement;
34  import javax.xml.ws.BindingProvider;
35  
36  import org.apache.log4j.*;
37  import org.oasis_open.docs.wsn.b_2.*;
38  import org.oasis_open.docs.wsn.b_2.CreatePullPoint;
39  import org.oasis_open.docs.wsn.brw_2.*;
40  import org.oasis_open.docs.wsrf.r_2.ResourceUnknownFaultType;
41  import org.miloss.fgsms.wsn.WSNUtility;
42  
43  /**
44   * This is a singleton message broker for the OASIS specification
45   * WS-Notification No authorization is peformed in this class, authorization
46   * should be performed by its callers
47   *
48   * @author AO
49   */
50  public class SingletonBroker {
51  
52      private SingletonBroker() {
53          
54      }
55  
56      public static SingletonBroker getInstance() {
57          return SingletonBrokerHolder.INSTANCE;
58      }
59      final static Logger log = Logger.getLogger("WS-NotificationBroker");
60  
61      @Override
62      public Object clone() throws CloneNotSupportedException {
63          throw new CloneNotSupportedException();
64      }
65      private static long messagesin = 0;
66      private static long messagesout = 0;
67  
68      public static long getMessagesin() {
69          return messagesin;
70      }
71  
72      public static long getMessagesout() {
73          return messagesout;
74      }
75  
76      private static class SingletonBrokerHolder {
77  
78          private static final SingletonBroker INSTANCE = new SingletonBroker();
79      }
80      private static Map currentSubscriptions = new HashMap();
81      private static Map currentMailboxes = new HashMap();
82      /**
83       * urn:wsn:mailbox
84       */
85      public static final String MAILBOX_URL_PREFIX = "urn:wsn:mailbox";
86      
87  
88      static String GetTopicList(Notify n) {
89          String s = new String();
90          if (n == null) {
91              return s;
92          }
93          for (int i = 0; i < n.getNotificationMessage().size(); i++) {
94              if (n.getNotificationMessage().get(i).getTopic() != null) {
95                  if (n.getNotificationMessage().get(i).getTopic().getContent() != null) {
96                      for (int k = 0; k < n.getNotificationMessage().get(i).getTopic().getContent().size(); k++) {
97                          s = s + " " + n.getNotificationMessage().get(i).getTopic().getContent().get(k).toString();
98                      }
99                  }
100             }
101         }
102         return s.trim();
103     }
104 
105     protected static void Dispatch(Notify notify) {
106         messagesin++;
107         //TODO run this in another thread
108         //get list of subscriber endpoints that are subscribed to this topic
109         List<SubscriptionInfo> clients = GetMatchingSubscribers(notify.getNotificationMessage());
110         log.log(Level.INFO, GetTopicList(notify));
111         log.log(Level.INFO, "Dispatching WSN-Notify, msg count:" + notify.getNotificationMessage().size() + " to " + clients.size() + " subscribers");
112         NotificationBroker port = null;
113         BindingProvider bp = null;
114         Map<String, Object> ctx = null;
115 
116         
117             NotificationService client = new  NotificationService();//wsdllocation,  org.miloss.fgsms.wsn.client.NotificationService.qname);
118             port = client.getNotificationPort();
119             bp = (BindingProvider) port;
120             ctx = bp.getRequestContext();
121         
122 
123         for (int i = 0; i < clients.size(); i++) {
124             boolean ok = false;
125             //dispatch to web service subscribers
126             if (!clients.get(i).callback.startsWith(MAILBOX_URL_PREFIX)) {
127                 ctx.put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, clients.get(i).callback);
128                 for (int k = 0; k < 2; k++) {
129                     try {
130                         port.notify(notify);
131                         int code = (Integer) bp.getResponseContext().get("javax.xml.ws.http.response.code");
132                         if (code >= 200 && code < 300) {
133                             ok = true;
134                             messagesout++;
135                             break;
136                         }
137                     } catch (Exception ex) {
138                         log.log(Level.WARN, "could not deliver wsn notify to " + clients.get(i).callback + " " + ex.getMessage());
139                         log.log(Level.DEBUG, "could not deliver wsn notify to " + clients.get(i).callback, ex);
140 
141                     }
142                     if (!ok) {
143                         log.log(Level.WARN, "dropping subscription " + clients.get(i).subid + " retry count exceeded");
144                         currentSubscriptions.remove(clients.get(i).subid);
145                     }
146                 }
147 
148             } else {
149                 //dispatch to mailboxes
150                 Mailbox box = (Mailbox) currentMailboxes.get(clients.get(i).callback);
151                 if (box != null) {
152                     box.messages.add(notify);
153                 }
154                 //else                    log.log(Level.WARN, "cannot deliver message")
155             }
156 
157         }
158         log.log(Level.INFO, "Stats " + messagesin + " in, " + messagesout + " out");
159     }
160 
161     /*
162      * key subscription id key topicname
163      *
164      * topic.category.taxonomy.xyz.etc anyone with topics matching topic or
165      * topic.category or topic.category.taxonomy if notify.topic.startswith
166      * (subscription.topic) //match
167      */
168     private static List<SubscriptionInfo> GetMatchingSubscribers(List<NotificationMessageHolderType> notificationMessage) {
169         ArrayList<SubscriptionInfo> list = new ArrayList<SubscriptionInfo>();
170         if (notificationMessage.isEmpty()) {
171             return list;
172         }
173         Map subs = new HashMap(currentSubscriptions);
174         Iterator it = subs.keySet().iterator();
175         while (it.hasNext()) { //for each subscription
176             SubscriptionInfo si = (SubscriptionInfo) subs.get(it.next());
177             //if one of the inbound notify's topic matching a subscription's topic, add to the list
178             for (int i = 0; i < notificationMessage.size(); i++) {
179                 if (SubscriptionContains(si, notificationMessage.get(i).getTopic())) {
180                     list.add(si);
181                 }
182 
183             }
184         }
185         return list;
186     }
187 
188     private static boolean SubscriptionContains(SubscriptionInfo si, TopicExpressionType topic) {
189         for (int i = 0; i < si.topics.size(); i++) {
190             List<String> topics = WSNUtility.topicExpressionToList(topic);
191             for (int k = 0; k < topics.size(); k++) {
192                 if (si.topics.get(i).toLowerCase().startsWith(topics.get(k).toLowerCase())) {
193                     return true;
194                 }
195             }
196         }
197         return false;
198     }
199 
200     protected static String AddSubscription(Subscribe subscribeRequest) {
201         String s = UUID.randomUUID().toString();
202         SubscriptionInfo info = new SubscriptionInfo();
203         info.data = subscribeRequest;
204         info.callback = WSNUtility.getWSAAddress(subscribeRequest.getConsumerReference());
205         info.subid = s;
206         info.topics = new ArrayList<String>();
207         for (int i = 0; i < subscribeRequest.getFilter().getAny().size(); i++) {
208             if ((subscribeRequest.getFilter().getAny().get(i) instanceof JAXBElement)) {
209                 JAXBElement<TopicExpressionType> te = (JAXBElement<TopicExpressionType>) subscribeRequest.getFilter().getAny().get(i);
210                 info.topics = WSNUtility.topicExpressionToList(te.getValue());
211             }
212         }
213         currentSubscriptions.put(s, info);
214         log.log(Level.INFO, "adding subscription " + info.subid + " topics " + WSNUtility.listStringtoString(info.topics) + " to address " + info.callback);
215 
216         return s;
217     }
218 
219     protected static void RemoveSubscription(List<Object> any) {
220         for (int i = 0; i < any.size(); i++) {
221             log.log(Level.INFO, "attempting to remove subscription " + any.get(i).toString());
222             if (any.get(i) instanceof String) {
223                 if(RemoveSubscription((String) any.get(i)))
224                     log.log(Level.INFO, "successfully removed " + any.get(i).toString());
225                 else log.log(Level.WARN, "unsuccessful removal " + any.get(i).toString() + " it probably didn't exist");
226             }
227         }
228     }
229 
230     protected static boolean RemoveSubscription(String subscriptionId) {
231         Object remove = currentSubscriptions.remove(subscriptionId);
232         if (remove == null) {
233             return false;
234         }
235         return true;
236     }
237 
238     protected static String CreateMailBox(CreatePullPoint createPullPointRequest, String username) {
239         if (createPullPointRequest == null) {
240             return null;
241         }
242         Mailbox box = new Mailbox();
243         box.messages = new ConcurrentLinkedQueue<Notify>();
244         box.username = username;
245         box.id = MAILBOX_URL_PREFIX + UUID.randomUUID().toString();
246         currentMailboxes.put(box.id, box);
247         return box.id;
248     }
249 
250     protected static void DestroyMailBox(List<Object> any) {
251         for (int i = 0; i < any.size(); i++) {
252             if (any.get(i) instanceof String) {
253                 Object remove = currentMailboxes.remove((String) any.get(i));
254                 if (remove == null)
255                     ; //maybe throw resource unknown fault?
256                 remove = null;
257             }
258         }
259     }
260 
261     protected static GetMessagesResponse GetMessages(GetMessages messagesRequest) throws ResourceUnknownFault, UnableToGetMessagesFault {
262         if (messagesRequest == null) {
263             throw new UnableToGetMessagesFault("null request", new UnableToGetMessagesFaultType());
264         }
265         if (messagesRequest.getAny().isEmpty()) {
266             throw new UnableToGetMessagesFault("at least one pull point mailbox must be specified", new UnableToGetMessagesFaultType());
267         }
268         GetMessagesResponse res = new GetMessagesResponse();
269         int count = 0;
270         long maxmessages = 100;
271         if (messagesRequest.getMaximumNumber() != null) {
272             maxmessages = messagesRequest.getMaximumNumber().longValue();
273         }
274         if (maxmessages <= 0) {
275             maxmessages = 10;
276         }
277         if (maxmessages > 1000) {
278             maxmessages = 1000;
279         }
280 
281         for (int i = 0; i < messagesRequest.getAny().size(); i++) {
282             Mailbox box = (Mailbox) currentMailboxes.get((String) messagesRequest.getAny().get(i));
283             if (box == null) {
284                 throw new ResourceUnknownFault("unknown pull point", new ResourceUnknownFaultType());
285             }
286             while (count < maxmessages && !box.messages.isEmpty()) {
287                 Notify poll = box.messages.poll();
288                 if (poll != null) {
289                     res.getNotificationMessage().addAll(poll.getNotificationMessage());
290                     count++;
291                 }
292             }
293         }
294         return res;
295     }
296 
297     protected static void PauseSubscription(PauseSubscription pauseSubscriptionRequest) throws PauseFailedFault, ResourceUnknownFault {
298 
299         for (int i = 0; i < pauseSubscriptionRequest.getAny().size(); i++) {
300             if (pauseSubscriptionRequest.getAny().get(i) instanceof String) {
301                 SubscriptionInfo get = (SubscriptionInfo) currentSubscriptions.get((String) pauseSubscriptionRequest.getAny().get(i));
302                 if (get == null) {
303                     throw new ResourceUnknownFault("subscription does not exist", new ResourceUnknownFaultType());
304                 }
305                 get.paused = true;
306                 log.log(Level.INFO, "Subscription id " + get.subid + " to callback " + get.callback + " paused");
307             }
308         }
309     }
310 
311     protected static void ResumeSubscription(ResumeSubscription resumeSubscriptionRequest) throws ResumeFailedFault, ResourceUnknownFault {
312         for (int i = 0; i < resumeSubscriptionRequest.getAny().size(); i++) {
313             if (resumeSubscriptionRequest.getAny().get(i) instanceof String) {
314                 SubscriptionInfo get = (SubscriptionInfo) currentSubscriptions.get((String) resumeSubscriptionRequest.getAny().get(i));
315                 if (get == null) {
316                     throw new ResourceUnknownFault("subscription does not exist", new ResourceUnknownFaultType());
317                 }
318                 get.paused = false;
319                 log.log(Level.INFO, "Subscription id " + get.subid + " to callback " + get.callback + " resumed");
320             }
321         }
322     }
323 }