View Javadoc
1   /**
2    * This Source Code Form is subject to the terms of the Mozilla Public
3    * License, v. 2.0. If a copy of the MPL was not distributed with this
4    * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5    *
6    * If it is not possible or desirable to put the notice in a particular
7    * file, then You may include the notice in a location (such as a LICENSE
8    * file in a relevant directory) where a recipient would be likely to look
9    * for such a notice.
10   *
11   * 
12   */
13  /*  ---------------------------------------------------------------------------
14   *  U.S. Government, Department of the Army
15   *  Army Materiel Command
16   *  Research Development Engineering Command
17   *  Communications Electronics Research Development and Engineering Center
18   *  ---------------------------------------------------------------------------
19   */
20  package org.miloss.fgsms.agentcore;
21  
22  import java.io.*;
23  import java.net.URL;
24  import java.util.*;
25  import java.util.concurrent.ConcurrentLinkedQueue;
26  import javax.xml.bind.JAXB;
27  import javax.xml.bind.JAXBContext;
28  import javax.xml.bind.JAXBElement;
29  import javax.xml.bind.Marshaller;
30  import javax.xml.bind.Unmarshaller;
31  import javax.xml.datatype.DatatypeFactory;
32  import javax.xml.datatype.Duration;
33  import javax.xml.stream.XMLInputFactory;
34  import javax.xml.stream.XMLStreamReader;
35  import javax.xml.ws.BindingProvider;
36  import org.miloss.fgsms.common.IpAddressUtility;
37  import org.miloss.fgsms.common.Constants.AuthMode;
38  import org.miloss.fgsms.common.Utility;
39  import org.miloss.fgsms.services.interfaces.common.Header;
40  import org.miloss.fgsms.services.interfaces.common.PolicyType;
41  import org.miloss.fgsms.services.interfaces.common.SecurityWrapper;
42  import org.miloss.fgsms.services.interfaces.datacollector.*;
43  import org.miloss.fgsms.services.interfaces.policyconfiguration.*;
44  import org.apache.log4j.Level;
45  import org.miloss.fgsms.common.Logger;
46  ;
47  import org.miloss.fgsms.common.Constants;
48  import org.miloss.fgsms.plugins.agents.IEndpointDiscovery;
49  import us.gov.ic.ism.v2.ClassificationType;
50  
51  /**
52   * fgsms's Java Data Pusher
53   *
54   * Asynchronously handles the caching, loading, storing and sending of all
55   * record data for transactional web services.
56   *
57   * @author AO
58   * @since 3.0
59   */
60  
61  
62  public class DataPusher implements Runnable {
63  
64      static boolean DEBUG = false;
65      private static ConcurrentLinkedQueue outboundQueue;
66      private static HashMap policyCache;
67      private static final Logger log = Logger.getLogger(org.miloss.fgsms.common.Constants.LoggerName);
68      private static ConfigLoader cfg = null;
69      private static boolean ErrorState = false;
70  
71      // private static boolean configured = false;
72      public DataPusher(HashMap cache, ConcurrentLinkedQueue queue) {
73          Initializer(cache, queue);
74      }
75  
76      private synchronized void Initializer(HashMap cache, ConcurrentLinkedQueue queue) {
77          policyCache = cache;
78          outboundQueue = queue;
79      }
80  
81      /**
82       * used only for the Persistent storage agent
83       */
84      protected DataPusher() {
85          policyCache = new HashMap();
86          outboundQueue = new ConcurrentLinkedQueue();
87      }
88  
89      /**
90       * loads all configuration data, setups up the remote service proxies for
91       * PCS and DCS
92       */
93      protected static synchronized void Init() {
94          if (cfg == null) {
95              try {
96                  //TODO, if the ACS service is finally implemented, uncomment this 
97                  //cfg = ConfigLoader.DoDynamicConfig();
98                  if (cfg == null) {
99                      cfg = MessageProcessor.getSingletonObject().getConfig();
100                 }
101                 if (cfg == null) {
102                     throw new ConfigurationException("Config unavailable");
103                 }
104                 //   configured=true;
105             } catch (Exception ex) {
106                 cfg = null;
107                 ErrorState = true;
108                 LastErrorMessage = "could not initialize the configuration from fgsms.AgentCore.jar";
109                 log.log(Level.FATAL, "could not initialize the configuration from fgsms.AgentCore.jar", ex);
110                 return;
111             }
112 
113             BindingProvider bp = (BindingProvider) cfg.dcsport;
114             Map<String, Object> context = bp.getRequestContext();
115 
116             if (cfg.mode_ == AuthMode.UsernamePassword) {
117                 context.put(BindingProvider.USERNAME_PROPERTY, cfg.username);
118                 context.put(BindingProvider.PASSWORD_PROPERTY, Utility.DE(cfg.password));
119             }
120             if (cfg.mode_ == AuthMode.PKI) {
121                 //for PKI, we need a client certificate for authenticating to the FGSMS server
122                 //and it wasn't already specified
123                 if (Utility.stringIsNullOrEmpty(System.getProperty("javax.net.ssl.keyStore")) || Utility.stringIsNullOrEmpty(System.getProperty("javax.net.ssl.keyStorePassword"))) {
124                     if (cfg.getJavaxkeystore() != null) {
125                         //the context setup only works on JbossWS, as far as I can tell
126                         context.put("javax.net.ssl.keyStorePassword", Utility.DE(cfg.getJavaxkeystorepass()));
127                         context.put("javax.net.ssl.keyStore", Utility.DE(cfg.getJavaxkeystore()));
128                         try {
129                             System.setProperty("javax.net.ssl.keyStorePassword", Utility.DE(cfg.getJavaxkeystorepass()));
130                             System.setProperty("javax.net.ssl.keyStore", (cfg.getJavaxkeystore()));
131                         } catch (Exception ex) {
132                             log.log(Level.WARN, "error caught when referencing (get or set) System.properties for SSL communication. Check to ensure that this is enabled in your JAAS managemer", ex);
133                         }
134                     }
135                 }
136             }
137 
138             //trust store. Fox Mulder always said to trust no one, so we won't use the default
139             //JDK trust store and only trust the certifcate issuers that we want to trust
140             //however if someone specified an alternate mechanism from the command line, don't
141             //do anything
142             if (Utility.stringIsNullOrEmpty(System.getProperty("javax.net.ssl.trustStore")) && Utility.stringIsNullOrEmpty(System.getProperty("javax.net.ssl.trustStorePassword"))) {
143                 if (cfg.getJavaxtruststore() != null) {
144                     //the context setup only works on JbossWS, as far as I can tell
145                     context.put("javax.net.ssl.trustStorePassword", Utility.DE(cfg.getJavaxtruststorepass()));
146                     context.put("javax.net.ssl.trustStore", (cfg.getJavaxtruststore()));
147                     try {
148                         System.setProperty("javax.net.ssl.trustStorePassword", Utility.DE(cfg.getJavaxtruststorepass()));
149                         System.setProperty("javax.net.ssl.trustStore", (cfg.getJavaxtruststore()));
150                     } catch (Exception ex) {
151                         log.log(Level.WARN, "error caught when referencing (get or set) System.properties for SSL communication. Check to ensure that this is enabled in your JAAS managemer", ex);
152                     }
153                 }
154             }
155 
156             ApacheCxfSslHelper.doCXF(cfg.dcsport, cfg);
157 
158             bp = (BindingProvider) cfg.pcsport;
159             context = bp.getRequestContext();
160 
161             if (cfg.mode_ == AuthMode.UsernamePassword) {
162                 context.put(BindingProvider.USERNAME_PROPERTY, cfg.username);
163                 context.put(BindingProvider.PASSWORD_PROPERTY, Utility.DE(cfg.password));
164             }
165             if (Utility.stringIsNullOrEmpty(System.getProperty("javax.net.ssl.trustStore")) || Utility.stringIsNullOrEmpty(System.getProperty("javax.net.ssl.trustStorePassword"))) {
166                 if (cfg.getJavaxtruststore() != null) {
167                     context.put("javax.net.ssl.trustStorePassword", Utility.DE(cfg.getJavaxtruststorepass()));
168                     context.put("javax.net.ssl.trustStore", Utility.DE(cfg.getJavaxtruststore()));
169                     try {
170                         System.setProperty("javax.net.ssl.trustStorePassword", Utility.DE(cfg.getJavaxtruststorepass()));
171                         System.setProperty("javax.net.ssl.trustStore", (cfg.getJavaxtruststore()));
172                     } catch (Exception ex) {
173                         log.log(Level.WARN, "error caught when referencing (get or set) System.properties for SSL communication. Check to ensure that this is enabled in your JAAS managemer", ex);
174                     }
175                 }
176             }
177 
178             if (cfg.mode_ == AuthMode.PKI) {
179 
180                 if (Utility.stringIsNullOrEmpty(System.getProperty("javax.net.ssl.keyStore")) || Utility.stringIsNullOrEmpty(System.getProperty("javax.net.ssl.keyStorePassword"))) {
181                     if (cfg.getJavaxkeystore() != null) {
182                         context.put("javax.net.ssl.keyStorePassword", Utility.DE(cfg.getJavaxkeystorepass()));
183                         context.put("javax.net.ssl.keyStore", Utility.DE(cfg.getJavaxkeystore()));
184                         try {
185                             System.setProperty("javax.net.ssl.keyStorePassword", Utility.DE(cfg.getJavaxkeystorepass()));
186                             System.setProperty("javax.net.ssl.keyStore", cfg.getJavaxkeystore());
187                         } catch (Exception ex) {
188                             log.log(Level.WARN, "error caught when referencing (get or set) System.properties for SSL communication. Check to ensure that this is enabled in your JAAS managemer", ex);
189                         }
190                     }
191                 }
192             }
193 
194             ApacheCxfSslHelper.doCXF(cfg.pcsport, cfg);
195 
196         }
197 
198     }
199 
200     /**
201      * caches IP to DNS mappings to speed up performance this may block if an ip
202      * address is not resolvable or there is some kind of DNS related problem
203      *
204      * @param URL
205      * @param isclient
206      * @return
207      */
208     protected static String IpWrapAndCacher(String URL, boolean isclient) {
209 
210         if (MessageProcessor.getSingletonObject().getURLaddressMap().containsKey(URL)) {
211             return (String) MessageProcessor.getSingletonObject().getURLaddressMap().get(URL);
212         }
213         String newurl = IpAddressUtility.modifyURL(URL, isclient);
214         MessageProcessor.getSingletonObject().getURLaddressMap().put(URL, newurl);
215         return newurl;
216     }
217 
218     /*
219      * protected enum Algorithm {
220      *
221      * FAILOVER, ROUNDROBIN }
222      *
223      * protected enum UnavailableBehavior {
224      *
225      * /**
226      * PURGE, if I can't send it, throw it out
227      *
228      * PURGE, /** HOLD, if I can't send it, keep it in memory until I can or
229      * until I get destroyed by the garbage collector
230      *
231      * HOLD, /** HOLDPERSIST, if I can't send it, store it to disk, then send
232      * when it's available
233      *
234      * HOLDPERSIST }
235      */
236     /**
237      * Loads the policy from the fgsms PCS service
238      *
239      * @param url
240      * @param style
241      * @return null if it cannot reach the PCS or access was denied.
242      * @isserver isclient, carried over from modify url
243      */
244     private static ServicePolicyResponseMsg FetchPolicy(String url, ConfigLoader.Algorithm style, boolean isclient) {
245         boolean ok = false;
246         int urlcount = 0;
247         int retrycount = 0;
248         BindingProvider bp = (BindingProvider) cfg.pcsport;
249         Map<String, Object> context = bp.getRequestContext();
250 
251         switch (style) {
252             case FAILOVER:
253                 retrycount = 0;
254                 urlcount = 0;
255                 ok = false;
256                 while (!ok && (retrycount < cfg.PCSRetryCount)) {
257 
258                     urlcount = 0;
259                     while (!ok && urlcount < cfg.PCS_URLS.size()) {
260                         try { // Call Web Service Operation
261 
262                             context.put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, cfg.PCS_URLS.get(urlcount));
263 
264                             //send it
265                             ServicePolicyRequestMsg request = new ServicePolicyRequestMsg();
266                             request.setClassification(cfg.classlevel);
267                             request.setURI(url);
268                             request.setPolicytype(PolicyType.TRANSACTIONAL);
269                             if (!isclient) {
270                                 request.setMachine(Utility.getHostName());
271 
272                             } else {
273                                 request.setMachine(new URL(url).getHost());
274                             }
275                             ServicePolicyResponseMsg result = cfg.pcsport.getServicePolicy(request);
276                             // result.getPolicy();
277 
278                             ok = true;
279                             //log.log(Level.INFO, "fgsms successfully retrieved policy for " + request.getURI());
280                             return result;
281 
282                         } catch (Exception ex) {
283                             log.log(Level.WARN, "fgsms unable to fetch policy from PCS at " + cfg.PCS_URLS.get(urlcount) + " will retry " + (cfg.PCSRetryCount - retrycount) + " times." + ex.getMessage());
284                         }
285                         urlcount++;
286                     }
287                     retrycount++;
288                 }
289                 if (!ok) {
290                     log.log(Level.FATAL, "fgsms unable to fetch policy from any of the PCS[" + cfg.PCS_URLS.size() + "] URLs, retry count exceeded. Falling back to default policy");
291                     MessageProcessor.getSingletonObject().setLastErrorMessage("All PCS endpoints are either unreachable or are responding in error");
292                 }
293 
294                 break;
295             case ROUNDROBIN:
296                 retrycount = 0;
297                 urlcount = 0;
298                 ok = false;
299                 while (!ok && (retrycount < cfg.PCSRetryCount)) {
300 
301                     urlcount = 0;
302                     while (!ok && urlcount < cfg.PCS_URLS.size()) {
303                         try { // Call Web Service Operation
304 
305                             context.put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, cfg.PCS_URLS.get(urlcount));
306                             //send it
307                             ServicePolicyRequestMsg request = new ServicePolicyRequestMsg();
308                             request.setClassification(cfg.classlevel);
309                             request.setURI(url);
310                             request.setPolicytype(PolicyType.TRANSACTIONAL);
311                             if (!isclient) {
312                                 request.setMachine(Utility.getHostName());
313                             } else {
314                                 request.setMachine(new URL(url).getHost());
315                             }
316                             ServicePolicyResponseMsg result = cfg.pcsport.getServicePolicy(request);
317                             result.getPolicy();
318                             ok = true;
319                             //log.log(Level.INFO, "fgsms successfully retrieved policy for " + request.getURI());
320                             return result;
321 
322                             //    dcsport.addMoreData(req.getReq());
323                         } catch (Exception ex) {
324                             // BindingProvider bp = (BindingProvider) dcsport;
325                             //  Map<String, Object> context = bp.getRequestContext();
326                             //  String oldAddress = (String) context.get(BindingProvider.ENDPOINT_ADDRESS_PROPERTY);
327                             log.log(Level.WARN, "fgsms unable to fetch policy from PCS at " + cfg.PCS_URLS.get(urlcount) + " will retry " + (cfg.PCSRetryCount - retrycount) + " times." + ex.getMessage());
328                         }
329                         urlcount++;
330                     }
331                     retrycount++;
332                 }
333                 if (!ok) {
334                     log.log(Level.FATAL, "fgsms unable to fetch policy from any of the PCS[" + cfg.PCS_URLS.size() + "] URLs, retry count exceeded. Falling back to default policy");
335                     MessageProcessor.getSingletonObject().setLastErrorMessage("All PCS endpoints are either unreachable or are responding in error");
336                 }
337 
338                 break;
339         }
340         return null;
341     }
342 
343     /**
344      * Entry point for loading a policy, if one cannot be fetched from the PCS,
345      * the default policy is returned
346      *
347      * @param url
348      * @return
349      */
350     public static PolicyHelper LoadPolicy(String url, boolean isclient) {
351         // boolean error = false;
352         //check the policy cache first
353 
354         //TODO insert logic for ServiceUnavailable behavior
355         //suggest adding anothe property which designates a folder from which to store data until the mother ship is available
356         //be sure to encrypt the file before 
357         //also an entry point for 100% offline usage, store all data locally working off of a templated policy
358         PolicyHelper p = (PolicyHelper) policyCache.get(url);
359         if (p == null) {
360             try { // it's not in the cache, try to get it....Call Web Service Operation
361                 log.log(Level.INFO, "fgsms requesting policy for service " + url);
362                 //ServicePolicyRequestMsg request = new ServicePolicyRequestMsg();
363                 // request.setURI(url);
364                 p = new PolicyHelper();
365                 ServicePolicyResponseMsg result = FetchPolicy(url, cfg.PCSalgo, isclient);
366                 if (result == null) {
367                     log.log(Level.WARN, "fgsms PCS is unavailable or returned an error, reverting to agent default policy");
368                     result = LoadDefaultPolicy(url);
369                     p.lastUpdate = 0;
370 
371                 } else {
372                     p.lastUpdate = Calendar.getInstance().getTimeInMillis();
373                 }
374                 //     if (!(p.policy instanceof TransactionalWebServicePolicy)) {
375                 //         log.log(Level.WARN, "fgsms PCS unexpectedly returned a policy for something other than transactional services. This is unexpected and should be reported to the developer. Type is " +result.getPolicy().getPolicyType());
376                 //     }
377                 p.policy = (TransactionalWebServicePolicy) result.getPolicy();
378 
379                 cfg.classlevel = result.getClassification();
380                 policyCache.put(url, p);
381                 //if (p.policy.getPolicyType() != PolicyType.TRANSACTIONAL) {
382 //                    log.log(Level.WARN, "fgsms PCS unexpectedly returned a policy for something other than transactional services. This is unexpected and should be reported to the developer");
383                 //            }
384                 log.log(Level.DEBUG, "fgsms obtained policy for service " + url + " and cached it.");
385                 return p;
386             } catch (Exception ex) {
387                 log.log(Level.ERROR, "fgsms Error retrieving policy from PCS for service at " + url + " because it's either down or busy (check config). This transaction will be discarded.", ex);
388                 //       error = true;
389             }
390         } else {
391             //it's in the cache, check to see if it's expired
392             long expirationtime = p.policy.getPolicyRefreshRate().getTimeInMillis(Calendar.getInstance());
393             if (p.lastUpdate + expirationtime < System.currentTimeMillis()) {
394                 try { // it's expired, get a new copy....Call Web Service Operation
395                     log.log(Level.INFO, "fgsms retrieved cached policy, but it has expired. Retrieving latest policy for service " + url);
396                     policyCache.remove(url);
397 
398                     //ServicePolicyRequestMsg request = new ServicePolicyRequestMsg();
399                     //request.setURI(url);
400                     ServicePolicyResponseMsg result = FetchPolicy(url, cfg.PCSalgo, isclient);
401 
402                     // ServicePolicyResponseMsg result = pcsport.getServicePolicy(request);
403                     p = new PolicyHelper();
404                     p.lastUpdate = Calendar.getInstance().getTimeInMillis();
405                     //if (!(p.policy instanceof TransactionalWebServicePolicy)) {
406 //                        log.log(Level.WARN, "fgsms PCS unexpectedly returned a policy for something other than transactional services. This is unexpected and should be reported to the developer");
407                     //                }
408                     p.policy = (TransactionalWebServicePolicy) result.getPolicy();
409                     //      if (p.policy.getPolicyType() != PolicyType.TRANSACTIONAL) {
410                     //           log.log(Level.WARN, "fgsms PCS unexpectedly returned a policy for something other than transactional services. This is unexpected and should be reported to the developer");
411                     //       }
412                     cfg.classlevel = result.getClassification();
413                     policyCache.put(url, p);
414                     log.log(Level.INFO, "fgsms obtained policy for service " + url + " and cached it.");
415                     return p;
416                 } catch (Exception ex) {
417                     log.log(Level.WARN, "fgsms Error refreshing policy from PCS for service at " + url + " using expired policy", ex);
418                     return p;
419 
420                 }
421             } else {
422                 log.log(Level.DEBUG, "fgsms retrieved cached policy " + url + ". It expires in " + (System.currentTimeMillis() - (p.lastUpdate + expirationtime)) + "ms");
423                 return p;
424             }
425         }
426         // else i can't get a policy from the pcs because it's either down or busy, we can't return a synthesized one else the database will get out of synch
427         return null;
428     }
429 
430     /**
431      * Loads the default policy from disk defaultpolicy.xml
432      *
433      * @param url
434      * @return
435      */
436     public static ServicePolicyResponseMsg LoadDefaultPolicy(String url) {
437 
438         try {
439             InputStream in = null;
440             ClassLoader loader = Thread.currentThread().getContextClassLoader();
441             if (loader == null) {
442                 loader = ClassLoader.getSystemClassLoader();
443             }
444 
445             // Returns null on lookup failures:
446             in = loader.getResourceAsStream("org/miloss/fgsms/agentcore/defaultpolicy.xml");
447             String pol = ReadAllText(in);
448             in.close();
449 
450             JAXBContext jc = Utility.getSerializationContext();
451 
452             Unmarshaller u = jc.createUnmarshaller();
453 
454             ByteArrayInputStream bss = new ByteArrayInputStream(pol.getBytes(Constants.CHARSET));
455             //1 = reader
456             //2 = writer
457             XMLInputFactory xf = XMLInputFactory.newInstance();
458             XMLStreamReader r = xf.createXMLStreamReader(bss);
459 
460             JAXBElement<TransactionalWebServicePolicy> foo = (JAXBElement<TransactionalWebServicePolicy>) u.unmarshal(r, TransactionalWebServicePolicy.class);
461             if (foo
462                     == null || foo.getValue()
463                     == null) {
464                 log.log(Level.WARN, "ServicePolicy is unexpectedly null or empty");
465                 return null;
466             }
467             ServicePolicyResponseMsg ret = new ServicePolicyResponseMsg();
468 
469             ret.setPolicy(foo.getValue());
470             ret.setClassification(cfg.classlevel);
471 
472             ret.getPolicy().setURL(url);
473             return ret;
474         } catch (Exception ex) {
475             log.log(Level.ERROR, "error loading default policy from disk", ex);
476         }
477         return null;
478     }
479 
480     /**
481      * prepares a message for transmission
482      *
483      * @param current
484      * @param p
485      * @return
486      */
487     private static AddDataRequestMsg PrepMessage(MessageCorrelator current, PolicyHelper p) {
488         if (current == null) {
489             return null;
490         }
491         AddDataRequestMsg req = new AddDataRequestMsg();
492         req.setClassification(cfg.classlevel);
493 
494         //stuff that's recorded always
495         req.setRequestURI(p.policy.getURL());
496         req.setURI(current.URL);
497         req.setAction(current.soapAction);
498         req.setServiceHost(MessageProcessor.getSingletonObject().getHostName());
499         req.setRequestSize(current.reqsize);
500         req.setResponseSize(current.ressize);
501         req.setMessage("Queue size:" + outboundQueue.size() + " PolicyCache:" + policyCache.size() + " MsgMap:" + current.currentMapsize + " CPUs:" + Runtime.getRuntime().availableProcessors());
502         req.setAgentType(current.agent_class_name);
503         req.setRequestURI(current.originalurl);
504         req.setRelatedTransactionID(current.RelatedMsgId);
505         req.setTransactionThreadID(current.TransactionThreadId);
506         req.setTransactionID(current.MessageID);
507         try {
508             DatatypeFactory f = DatatypeFactory.newInstance();
509             GregorianCalendar gcal = new GregorianCalendar();
510             gcal.setTimeInMillis(current.RecievedAt);
511             req.setRecordedat((gcal));
512         } catch (Exception ex) {
513             log.log(Level.ERROR, "unexpected error caught when preparing a message.", ex);
514         }
515         long diff = Math.abs(current.CompletedAt - current.RecievedAt);
516         int x = (int) diff;
517         req.setResponseTime(x);
518         // current.IsFault = true;
519         req.setSuccess(!current.IsFault);
520 
521         //   log.log(Level.ERROR, current.MessageID + "Success = " + !current.IsFault);
522         //stuff that's recorded sometimes
523         if (current.RequestMessage != null) {
524             if (p.policy.isRecordRequestMessage() || (p.policy.isRecordFaultsOnly() && current.IsFault)) {
525 
526                 if (p.policy.getRecordedMessageCap() >= current.RequestMessage.length()) {
527 
528                     req.setXmlRequest(current.RequestMessage);
529 
530                 } else {
531                     req.setXmlRequest(current.RequestMessage.substring(0, p.policy.getRecordedMessageCap()));
532                 }
533             } else {
534                 if (DEBUG) {
535                     log.info("skipping request payload " + p.policy.isRecordRequestMessage() + " " + p.policy.isRecordFaultsOnly() + " " + current.IsFault);
536                 }
537             }
538         }
539         if (current.ResponseMessage != null) {
540             if (p.policy.isRecordResponseMessage() || (p.policy.isRecordFaultsOnly() && current.IsFault)) {
541                 if (p.policy.getRecordedMessageCap() >= current.ResponseMessage.length()) {
542                     req.setXmlResponse(current.ResponseMessage);
543 
544                 } else {
545                     req.setXmlResponse(current.ResponseMessage.substring(0, p.policy.getRecordedMessageCap()));
546                 }
547             } else {
548                 if (DEBUG) {
549                     log.info("skipping response payload " + p.policy.isRecordRequestMessage() + " " + p.policy.isRecordFaultsOnly() + " " + current.IsFault);
550                 }
551             }
552         }
553 
554         // log.log(Level.INFO, "fgsms publish loop#2. Thread id: + " + Thread.currentThread().getName() + "Time" + (System.currentTimeMillis() - start));
555         //fun with serialized types
556         //   List<String> s = new LinkedList<String>();
557         //   ArrayOfString s = new ArrayOfString();
558         ArrayList<String> s2 = MessageProcessor.getSingletonObject().getUserIdentities(p.policy, current);
559         //s2.add(current.ipaddress);
560         // s.setString((List<String>)(s2));
561         req.getIdentity().addAll(s2);
562         // req.setIdentity((List<String>)s2);
563 
564         if (current.Headers != null && !current.Headers.isEmpty() && p.policy.isRecordHeaders()) {
565             Iterator e = current.Headers.keySet().iterator();
566             while (e.hasNext()) {
567                 org.miloss.fgsms.services.interfaces.common.Header h = new org.miloss.fgsms.services.interfaces.common.Header();
568                 String s = (String) e.next();
569                 h.setName(s);
570                 try {
571                     String value = (String) current.Headers.get(s);
572                     h.getValue().add(value);
573                     req.getHeadersRequest().add(h);
574                 } catch (Exception ex) {
575                 }
576                 try {
577                     List<String> value = (List<String>) current.Headers.get(s);
578                     for (int k = 0; k < value.size(); k++) {
579                         h.getValue().add(value.get(k));
580                     }
581                     req.getHeadersRequest().add(h);
582 
583                 } catch (Exception ex) {
584                 }
585             }
586             if (current.Header_Response != null && !current.Header_Response.isEmpty()) {
587                 e = current.Header_Response.keySet().iterator();
588                 while (e.hasNext()) {
589                     org.miloss.fgsms.services.interfaces.common.Header h = new org.miloss.fgsms.services.interfaces.common.Header();
590                     String s = (String) e.next();
591                     h.setName(s);
592                     try {
593                         String value = (String) current.Header_Response.get(s);
594                         h.getValue().add(value);
595                         req.getHeadersRequest().add(h);
596                     } catch (Exception ex) {
597                     }
598                     try {
599                         List<String> value = (List<String>) current.Header_Response.get(s);
600                         for (int k = 0; k < value.size(); k++) {
601                             h.getValue().add(value.get(k));
602                         }
603                         req.getHeadersRequest().add(h);
604 
605                     } catch (Exception ex) {
606                     }
607                 }
608             }
609         }
610 
611         return req;
612     }
613     private static String LastErrorMessage = "";
614 
615     /**
616      * Run's until the outbound queue is empty, sends at most 40 message or 1 MB
617      * of data (if message logging is on) at a time.
618      */
619     @Override
620     public void run() {
621 
622         MessageProcessor.getSingletonObject().removeDeadMessage();
623 
624         Init();
625         if (ErrorState) {
626             log.log(Level.FATAL, "fgsms Data Pusher is in an error state. Recorded data cannot be sent and will be discarded. Check the configuration file and log for reason. Last error message was " + LastErrorMessage);
627             outboundQueue.clear();
628             return;
629         }
630 
631         DoDiscovery();
632 
633         AddDataRequestMsg PreppedMessage = null;
634         boolean enabled = true;
635         while (!outboundQueue.isEmpty()) {
636             //long start = System.currentTimeMillis();
637             log.log(Level.DEBUG, "fgsms entering publish loop " + outboundQueue.size() + " items to publish.");
638             AddMoreData req = new AddMoreData();
639             int count = 0;
640             int totalbody = 0;
641             while (count < 40 && !outboundQueue.isEmpty() && enabled && totalbody < 1024000) {
642                 //pop
643                 MessageCorrelator current = null;
644                 PreppedMessage = null;
645                 try {
646                     Object j = outboundQueue.remove();
647                     if (j instanceof MessageCorrelator) {
648                         current = (MessageCorrelator) j;
649                     }
650                     if (j instanceof AddDataRequestMsg) {
651                         PreppedMessage = (AddDataRequestMsg) j;
652                     }
653                 } catch (Exception e) {//just swallow it and continue, another thread must have picked up the last item.
654                     log.log(Level.DEBUG, "fgsms publish loop, error removing item from the queue, another thread must have grabbed the last item. Queue size is currently "
655                             + outboundQueue.size());
656                 }
657 
658                 boolean isclient = false;
659                 if (current != null) {
660                     isclient = current.agent_class_name != null && (current.agent_class_name.contains("client") || current.agent_class_name.contains("Client"));
661                     if (isclient) {
662                         current.URL = IpWrapAndCacher(current.URL, true);
663                         //current.URL = IpAddressUtility.modifyURL(current.URL, true);
664                     } else {
665                         current.URL = IpWrapAndCacher(current.URL, false);
666                         //current.URL = IpAddressUtility.modifyURL(current.URL, false);
667                     }
668 
669                     //Applying the ignore list once more
670                     if (MessageProcessor.getSingletonObject().getIgnoreList().contains(current.URL.toLowerCase())) {
671                         current = null;
672                         continue;
673                     }
674                 }
675                 if (PreppedMessage != null) {
676                     //I've already gotten the policy and prepped this message, we just to build the counters up from here
677                     if (!org.miloss.fgsms.common.Utility.stringIsNullOrEmpty(PreppedMessage.getXmlRequest())) {
678                         totalbody += PreppedMessage.getXmlRequest().length();
679                     }
680                     if (!org.miloss.fgsms.common.Utility.stringIsNullOrEmpty(PreppedMessage.getXmlResponse())) {
681                         totalbody += PreppedMessage.getXmlResponse().length();
682                     }
683                     req.getReq().add(PreppedMessage);
684                     count++;
685                 }
686                 if (current != null) {
687                     PolicyHelper p = LoadPolicy(current.URL, isclient);
688                     if (p == null || p.policy == null) // outboundQueue.add(current);
689                     {
690                         //?  PCS is not available, stick the current item back in the queue? wont happen anymore
691                         log.log(Level.WARN, "Unable to obtain policy for URL " + current.URL + ", transaction data will be lost.");
692                         continue;
693                     } else {
694 
695                         if (p.policy.isAgentsEnabled()) {
696                             if (current != null) {
697                                 PreppedMessage = PrepMessage(current, p);
698                             }
699                             if (PreppedMessage != null && !org.miloss.fgsms.common.Utility.stringIsNullOrEmpty(PreppedMessage.getXmlRequest())) {
700                                 totalbody += PreppedMessage.getXmlRequest().length();
701                             }
702                             if (PreppedMessage != null && !org.miloss.fgsms.common.Utility.stringIsNullOrEmpty(PreppedMessage.getXmlResponse())) {
703                                 totalbody += PreppedMessage.getXmlResponse().length();
704                             }
705                             req.getReq().add(PreppedMessage);
706                             count++;
707                         } else {
708                             enabled = false;
709                             log.log(Level.WARN, "fgsms PCS reports that all agents are disabled!");
710                             MessageProcessor.getSingletonObject().setLastErrorMessage("Agents centrally disabled");
711                             //TODO, what if we are on persist and hold?
712                             outboundQueue.clear();
713                         }
714                     }
715                 }
716 
717             }
718 
719             if (enabled) {
720 
721                 BindingProvider bp = (BindingProvider) cfg.dcsport;
722                 Map<String, Object> context = bp.getRequestContext();
723 
724                 int retrycount = 0;
725                 int urlcount = 0;
726                 boolean ok = false;
727                 if (!req.getReq().isEmpty()) {
728                     switch (cfg.DCSalgo) {
729                         case ROUNDROBIN:
730                             retrycount = 0;
731                             urlcount = 0;
732                             ok = false;
733                             while (!ok && (retrycount < cfg.DCSRetryCount)) {
734 
735                                 urlcount = 0;
736                                 while (!ok && urlcount < cfg.DCS_URLS.size()) {
737                                     try { // Call Web Service Operation
738 
739                                         //                                context.remove(BindingProvider.ENDPOINT_ADDRESS_PROPERTY);
740                                         context.put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, cfg.DCS_URLS.get(urlcount));
741 
742                                         if (DEBUG) {
743                                             JAXB.marshal(req, System.out);
744                                         }
745                                         //send it
746                                         AddDataResponseMsg addMoreData = cfg.dcsport.addMoreData(req.getReq());
747                                         if (addMoreData != null && addMoreData.getStatus() != null && addMoreData.getStatus() == DataResponseStatus.SUCCESS) {
748                                             ok = true;
749                                             log.log(Level.DEBUG, "fgsms successfully sent transaction data for " + req.getReq().size() + " transactions. Items still to process: " + outboundQueue.size());
750                                             MessageProcessor.getSingletonObject().incMessagesProcessed(req.getReq().size());
751                                         } else {
752                                             ok = false;
753                                             log.log(Level.WARN, "fgsms failed to sent transaction data for " + req.getReq().size() + " transactions. 1 or more items couldn't be saved. Items still to process: " + outboundQueue.size());
754                                         }
755                                     } catch (Exception ex) {
756                                         // BindingProvider bp = (BindingProvider) dcsport;
757                                         //  Map<String, Object> context = bp.getRequestContext();
758                                         //  String oldAddress = (String) context.get(BindingProvider.ENDPOINT_ADDRESS_PROPERTY);
759                                         log.log(Level.WARN, "fgsms Error sending performance data to DCS for service at " + cfg.DCS_URLS.get(urlcount) + " will retry " + (cfg.DCSRetryCount - retrycount) + " times.", ex);
760                                     }
761                                     urlcount++;
762                                 }
763                                 retrycount++;
764                             }
765                             if (!ok) {
766                                 log.log(Level.ERROR, "fgsms unable to send performance data to DCS, retry count exceeded.");
767                                 MessageProcessor.getSingletonObject().setLastErrorMessage("retry count exceeded");
768                                 if (cfg.behavior == ConfigLoader.UnavailableBehavior.HOLD) {
769                                     for (int i = 0; i < req.getReq().size(); i++) {
770                                         outboundQueue.add(req.getReq());
771                                     }
772                                 } else if (cfg.behavior == ConfigLoader.UnavailableBehavior.HOLDPERSIST) {
773                                     for (int i = 0; i < req.getReq().size(); i++) {
774                                         StorePersist(req);
775                                     }
776                                 } else if (cfg.behavior == ConfigLoader.UnavailableBehavior.PURGE) {
777                                     req.getReq().clear();
778                                     req = null;
779                                 } else {
780                                     throw new IllegalArgumentException("agent unavailable behavior");
781                                 }
782 
783                             }
784 
785                             break;
786                         case FAILOVER:
787 
788                             retrycount = 0;
789                             urlcount = 0;
790                             ok = false;
791                             while (!ok && (urlcount < cfg.DCS_URLS.size())) {
792 
793                                 while (!ok && retrycount < cfg.DCSRetryCount) {
794                                     try { // Call Web Service Operation
795 
796                                         //                            context.remove(BindingProvider.ENDPOINT_ADDRESS_PROPERTY);
797                                         context.put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, cfg.DCS_URLS.get(urlcount));
798 
799                                         if (DEBUG) {
800                                             JAXB.marshal(req, System.out);
801                                         }
802                                         //send it
803                                         AddDataResponseMsg addMoreData = cfg.dcsport.addMoreData(req.getReq());
804                                         if (addMoreData != null && addMoreData.getStatus() != null && addMoreData.getStatus() == DataResponseStatus.SUCCESS) {
805                                             ok = true;
806                                             log.log(Level.DEBUG, "fgsms successfully sent transaction data for " + req.getReq().size() + " transactions. Items still to process: " + outboundQueue.size());
807                                             MessageProcessor.getSingletonObject().incMessagesProcessed(req.getReq().size());
808                                         } else {
809                                             ok = false;
810                                             log.log(Level.WARN, "fgsms failed to sent transaction data for " + req.getReq().size() + " transactions. 1 or more items couldn't be saved. Items still to process: " + outboundQueue.size());
811                                         }
812                                     } catch (Exception ex) {
813                                         log.log(Level.WARN, "fgsms Error sending performance data to DCS for service at " + cfg.DCS_URLS.get(urlcount) + " will retry " + (cfg.DCSRetryCount - retrycount) + " times.", ex);
814                                     }
815                                     retrycount++;
816                                 }
817                                 urlcount++;
818                             }
819                             if (!ok) {
820                                 log.log(Level.ERROR, "fgsms unable to send performance data to DCS, retry count exceeded.");
821                                 MessageProcessor.getSingletonObject().setLastErrorMessage("retry count exceeded");
822 
823                                 if (cfg.behavior == ConfigLoader.UnavailableBehavior.HOLD) {
824                                     for (int i = 0; i < req.getReq().size(); i++) {
825                                         outboundQueue.add(req.getReq());
826                                     }
827                                 } else if (cfg.behavior == ConfigLoader.UnavailableBehavior.HOLDPERSIST) {
828                                     for (int i = 0; i < req.getReq().size(); i++) {
829                                         StorePersist(req);
830                                     }
831                                 } else if (cfg.behavior == ConfigLoader.UnavailableBehavior.PURGE) {
832                                     req.getReq().clear();
833                                     req = null;
834                                 } else {
835                                     throw new IllegalArgumentException("agent unavailable behavior");
836                                 }
837 
838                             }
839                             break;
840                     }
841                 }
842             }
843             //CheckPersistStore();
844         }
845         log.log(Level.DEBUG, "fgsms Data Pusher thread is terminating, no data available to send.");
846     }
847 
848     /**
849      * checks the local file system for files that were not able to be sent if
850      * present,
851      */
852     protected static AddMoreData CheckPersistStore() {
853         Init();
854         //  needs to be thread safe 
855         if (Utility.stringIsNullOrEmpty(cfg.offlinestorage)
856                 || cfg.behavior != ConfigLoader.UnavailableBehavior.HOLDPERSIST) {
857             return null;
858         }
859         File f = new File(cfg.offlinestorage);
860         if (f.exists() && f.isDirectory()) {
861             File[] list = f.listFiles();
862             if (list != null) {
863                 try {
864                     JAXBContext jc = JAXBContext.newInstance(ArrayOfSLA.class, SLA.class,
865                             SLAAction.class, SLARuleGeneric.class, AndOrNot.class,
866                             AddDataRequestMsg.class, AddMoreData.class, AddData.class, String.class,
867                             Duration.class, Long.class, SecurityWrapper.class,
868                             ClassificationType.class, List.class, Header.class);
869                     Unmarshaller u
870                             = jc.createUnmarshaller();
871                     for (int i = 0; i < list.length; i++) {
872                         try {
873                             String s
874                                     = ReadAllText(f.getPath() + File.separator + f.getName());
875                             s = Utility.DE(s);
876 
877                             ByteArrayInputStream bss = new ByteArrayInputStream(s.getBytes(Constants.CHARSET));
878                             XMLStreamReader r
879                                     = XMLInputFactory.newFactory().createXMLStreamReader(bss);
880                             JAXBElement<AddMoreData> foo = (JAXBElement<AddMoreData>) u.unmarshal(r,
881                                     AddMoreData.class);
882                             if (foo
883                                     == null || foo.getValue()
884                                     == null) {
885                                 log.log(Level.WARN, "Add request is unexpectedly null or empty when     reading it in from disk                        ");
886                             }
887                             boolean delete = f.delete();
888                             if (!delete) {
889                                 log.log(Level.ERROR, "Unable to delete file " + f.getPath() + File.separator + f.getName() + " this may cause unintented consequences, even infinite looping. Ensure that"
890                                         + "this process has delete access to the folder");
891                             }
892                             if (foo != null) {
893                                 return foo.getValue();
894                             } else {
895                                 return null;
896                             }
897                         } catch (Exception ex) {
898                             log.log(Level.WARN, "error caught reading performance data from disk",
899                                     ex);
900                         }
901                     }
902                 } catch (Exception ex) {
903                     log.log(Level.WARN, "error caught reading performance data from disk",
904                             ex);
905                 }
906             }
907         }
908 
909         return null;
910     }
911 
912     protected static void EnsureFolderExists(String folder) {
913         File f = null;
914         try {
915             f = new File(folder);
916             if (f.exists()) {
917                 return;
918             }
919         } catch (Exception ex) {
920         }
921         try {
922             new File(folder).mkdirs();
923         } catch (Exception ex) {
924             log.log(Level.WARN, "Cannot ensure that the folder " + folder + " exists");
925         }
926     }
927 
928     /**
929      * serializes the request to a string, encrypts it, then stores it to disk
930      */
931     protected static void StorePersist(AddMoreData req) {
932         if (req == null || req.getReq() == null || req.getReq().isEmpty()) {
933             return;
934         }
935         Init();
936         for (int i = 0; i < req.getReq().size(); i++) {
937             JAXBContext jc = null;
938             try {
939                 jc = JAXBContext.newInstance(AddDataRequestMsg.class,
940                         AddDataRequestMsg.class, AddMoreData.class, AddData.class,
941                         String.class, Duration.class, Long.class, SecurityWrapper.class,
942                         ClassificationType.class, List.class, org.miloss.fgsms.services.interfaces.common.Header.class);
943                 Marshaller m = jc.createMarshaller();
944                 StringWriter sw = new StringWriter();
945 
946                 m.marshal(
947                         (req.getReq().get(i)), sw);
948                 String s = sw.toString();
949                 s = Utility.EN(s);
950 
951                 WriteAllText(cfg.offlinestorage + File.separator + UUID.randomUUID().toString(), s);
952             } catch (Exception ex) {
953                 log.log(Level.WARN, "Unable to marshall or store to disk service performance record", ex);
954             }
955         }
956     }
957 
958     protected static String ReadAllText(InputStream in) {
959         try {
960             InputStreamReader sr = new InputStreamReader(in, Constants.CHARSET);
961             StringBuilder fileData = new StringBuilder(1000);
962 
963             BufferedReader reader = new BufferedReader(sr);
964 
965             char[] buf = new char[1024];
966             int numRead = 0;
967             while ((numRead = reader.read(buf)) != -1) {
968                 String readData = String.valueOf(buf, 0, numRead);
969                 fileData.append(readData);
970                 buf = new char[1024];
971             }
972             reader.close();
973             sr.close();
974             return fileData.toString();
975         } catch (Exception ex) {
976         }
977         return "";
978     }
979 
980     private static String ReadAllText(String pathandfile) {
981         try {
982 
983             StringBuilder fileData = new StringBuilder(1000);
984             BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(pathandfile), Constants.CHARSET));
985             char[] buf = new char[1024];
986             int numRead = 0;
987             while ((numRead = reader.read(buf)) != -1) {
988                 String readData = String.valueOf(buf, 0, numRead);
989                 fileData.append(readData);
990                 buf = new char[1024];
991             }
992             reader.close();
993             return fileData.toString();
994         } /*
995          *
996          * InputStream stream; int size = 1024; byte chars[] = new byte[size];
997          * int k = stream.read(chars); String str = ""; while (k > 0) {
998          *
999          * for (int i = 0; i < k; i++) { str += (char) chars[i]; } k =
1000          * stream.read(chars); } // log.log(Level.INFO, name + "ReadAllText,
1001          * Read " + str.length() + " bytes.");
1002          */ catch (Exception e) {
1003             log.log(Level.ERROR, "ReadAllText, ", e);
1004             return "";
1005         }
1006     }
1007 
1008     private static void WriteAllText(String filename, String text) {
1009         try {
1010             File f = new File(filename);
1011 
1012             log.log(Level.INFO, "WriteAllText Current Dir = " + f.getName() + f.getAbsolutePath());
1013             BufferedWriter out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(filename), Constants.CHARSET));
1014             out.write(text);
1015             out.close();
1016         } catch (Exception e) {
1017             log.log(Level.INFO, filename + " WriteAllText, ", e);
1018         }
1019     }
1020     List<IEndpointDiscovery> endpointproviders = null;
1021 
1022     private void DoDiscovery() {
1023         if (cfg == null || cfg.prop == null) {
1024             throw new NullPointerException("fgsms properties file is not available.");
1025         }
1026         if (endpointproviders == null) {
1027             endpointproviders = LoadEndpointProviders(cfg);
1028         }
1029         List<String> endpointspcs = new ArrayList<String>();
1030         List<String> endpointsdcs = new ArrayList<String>();
1031         boolean ran = false;
1032         for (int i = 0; i < endpointproviders.size(); i++) {
1033             try {
1034                 if ((System.currentTimeMillis() - cfg.discoveryInterval) > endpointproviders.get(i).GetLastLookup()) {
1035                     if (endpointproviders.get(i).IsEnabled()) {
1036                         endpointspcs.addAll(endpointproviders.get(i).GetPCSURLs());
1037                         endpointsdcs.addAll(endpointproviders.get(i).GetDCSURLs());
1038                         ran = true;
1039                         endpointproviders.get(i).SetLastLookup(System.currentTimeMillis());
1040                     }
1041                 }
1042             } catch (Exception ex) {
1043                 log.fatal("The discovery provery " + endpointproviders.getClass().getCanonicalName() + " is faulty and threw an exception", ex);
1044             }
1045 
1046         }
1047         if (ran) {
1048             for (int i = 0; i < endpointspcs.size(); i++) {
1049                 if (!cfg.PCS_URLS.contains(endpointspcs.get(i))) {
1050                     cfg.PCS_URLS.add(endpointspcs.get(i));
1051                 }
1052             }
1053             for (int i = 0; i < endpointspcs.size(); i++) {
1054                 if (!cfg.DCS_URLS.contains(endpointsdcs.get(i))) {
1055                     cfg.DCS_URLS.add(endpointsdcs.get(i));
1056                 }
1057             }
1058         }
1059 
1060     }
1061 
1062     public static List<IEndpointDiscovery> LoadEndpointProviders(ConfigLoader cl) {
1063         if (cl == null || cl.prop == null) {
1064             throw new NullPointerException("fgsms properties file is not available.");
1065         }
1066         List<String> discovery_providers = cl.getDiscovery_providers();
1067         List<IEndpointDiscovery> eps = new ArrayList<IEndpointDiscovery>();
1068         for (int i = 0; i < discovery_providers.size(); i++) {
1069             try {
1070                 Class t = Class.forName(discovery_providers.get(i));
1071                 IEndpointDiscovery newInstance = (IEndpointDiscovery) t.newInstance();
1072                 newInstance.LoadConfig(cl.prop);
1073                 eps.add(newInstance);
1074             } catch (Exception ex) {
1075                 log.warn("Unable to load endpoint provider " + discovery_providers.get(i), ex);
1076             }
1077         }
1078         return eps;
1079     }
1080 }