1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.miloss.fgsms.agentcore;
21
22 import java.io.*;
23 import java.net.URL;
24 import java.util.*;
25 import java.util.concurrent.ConcurrentLinkedQueue;
26 import javax.xml.bind.JAXB;
27 import javax.xml.bind.JAXBContext;
28 import javax.xml.bind.JAXBElement;
29 import javax.xml.bind.Marshaller;
30 import javax.xml.bind.Unmarshaller;
31 import javax.xml.datatype.DatatypeFactory;
32 import javax.xml.datatype.Duration;
33 import javax.xml.stream.XMLInputFactory;
34 import javax.xml.stream.XMLStreamReader;
35 import javax.xml.ws.BindingProvider;
36 import org.miloss.fgsms.common.IpAddressUtility;
37 import org.miloss.fgsms.common.Constants.AuthMode;
38 import org.miloss.fgsms.common.Utility;
39 import org.miloss.fgsms.services.interfaces.common.Header;
40 import org.miloss.fgsms.services.interfaces.common.PolicyType;
41 import org.miloss.fgsms.services.interfaces.common.SecurityWrapper;
42 import org.miloss.fgsms.services.interfaces.datacollector.*;
43 import org.miloss.fgsms.services.interfaces.policyconfiguration.*;
44 import org.apache.log4j.Level;
45 import org.miloss.fgsms.common.Logger;
46 ;
47 import org.miloss.fgsms.common.Constants;
48 import org.miloss.fgsms.plugins.agents.IEndpointDiscovery;
49 import us.gov.ic.ism.v2.ClassificationType;
50
51
52
53
54
55
56
57
58
59
60
61
62 public class DataPusher implements Runnable {
63
64 static boolean DEBUG = false;
65 private static ConcurrentLinkedQueue outboundQueue;
66 private static HashMap policyCache;
67 private static final Logger log = Logger.getLogger(org.miloss.fgsms.common.Constants.LoggerName);
68 private static ConfigLoader cfg = null;
69 private static boolean ErrorState = false;
70
71
72 public DataPusher(HashMap cache, ConcurrentLinkedQueue queue) {
73 Initializer(cache, queue);
74 }
75
76 private synchronized void Initializer(HashMap cache, ConcurrentLinkedQueue queue) {
77 policyCache = cache;
78 outboundQueue = queue;
79 }
80
81
82
83
84 protected DataPusher() {
85 policyCache = new HashMap();
86 outboundQueue = new ConcurrentLinkedQueue();
87 }
88
89
90
91
92
93 protected static synchronized void Init() {
94 if (cfg == null) {
95 try {
96
97
98 if (cfg == null) {
99 cfg = MessageProcessor.getSingletonObject().getConfig();
100 }
101 if (cfg == null) {
102 throw new ConfigurationException("Config unavailable");
103 }
104
105 } catch (Exception ex) {
106 cfg = null;
107 ErrorState = true;
108 LastErrorMessage = "could not initialize the configuration from fgsms.AgentCore.jar";
109 log.log(Level.FATAL, "could not initialize the configuration from fgsms.AgentCore.jar", ex);
110 return;
111 }
112
113 BindingProvider bp = (BindingProvider) cfg.dcsport;
114 Map<String, Object> context = bp.getRequestContext();
115
116 if (cfg.mode_ == AuthMode.UsernamePassword) {
117 context.put(BindingProvider.USERNAME_PROPERTY, cfg.username);
118 context.put(BindingProvider.PASSWORD_PROPERTY, Utility.DE(cfg.password));
119 }
120 if (cfg.mode_ == AuthMode.PKI) {
121
122
123 if (Utility.stringIsNullOrEmpty(System.getProperty("javax.net.ssl.keyStore")) || Utility.stringIsNullOrEmpty(System.getProperty("javax.net.ssl.keyStorePassword"))) {
124 if (cfg.getJavaxkeystore() != null) {
125
126 context.put("javax.net.ssl.keyStorePassword", Utility.DE(cfg.getJavaxkeystorepass()));
127 context.put("javax.net.ssl.keyStore", Utility.DE(cfg.getJavaxkeystore()));
128 try {
129 System.setProperty("javax.net.ssl.keyStorePassword", Utility.DE(cfg.getJavaxkeystorepass()));
130 System.setProperty("javax.net.ssl.keyStore", (cfg.getJavaxkeystore()));
131 } catch (Exception ex) {
132 log.log(Level.WARN, "error caught when referencing (get or set) System.properties for SSL communication. Check to ensure that this is enabled in your JAAS managemer", ex);
133 }
134 }
135 }
136 }
137
138
139
140
141
142 if (Utility.stringIsNullOrEmpty(System.getProperty("javax.net.ssl.trustStore")) && Utility.stringIsNullOrEmpty(System.getProperty("javax.net.ssl.trustStorePassword"))) {
143 if (cfg.getJavaxtruststore() != null) {
144
145 context.put("javax.net.ssl.trustStorePassword", Utility.DE(cfg.getJavaxtruststorepass()));
146 context.put("javax.net.ssl.trustStore", (cfg.getJavaxtruststore()));
147 try {
148 System.setProperty("javax.net.ssl.trustStorePassword", Utility.DE(cfg.getJavaxtruststorepass()));
149 System.setProperty("javax.net.ssl.trustStore", (cfg.getJavaxtruststore()));
150 } catch (Exception ex) {
151 log.log(Level.WARN, "error caught when referencing (get or set) System.properties for SSL communication. Check to ensure that this is enabled in your JAAS managemer", ex);
152 }
153 }
154 }
155
156 ApacheCxfSslHelper.doCXF(cfg.dcsport, cfg);
157
158 bp = (BindingProvider) cfg.pcsport;
159 context = bp.getRequestContext();
160
161 if (cfg.mode_ == AuthMode.UsernamePassword) {
162 context.put(BindingProvider.USERNAME_PROPERTY, cfg.username);
163 context.put(BindingProvider.PASSWORD_PROPERTY, Utility.DE(cfg.password));
164 }
165 if (Utility.stringIsNullOrEmpty(System.getProperty("javax.net.ssl.trustStore")) || Utility.stringIsNullOrEmpty(System.getProperty("javax.net.ssl.trustStorePassword"))) {
166 if (cfg.getJavaxtruststore() != null) {
167 context.put("javax.net.ssl.trustStorePassword", Utility.DE(cfg.getJavaxtruststorepass()));
168 context.put("javax.net.ssl.trustStore", Utility.DE(cfg.getJavaxtruststore()));
169 try {
170 System.setProperty("javax.net.ssl.trustStorePassword", Utility.DE(cfg.getJavaxtruststorepass()));
171 System.setProperty("javax.net.ssl.trustStore", (cfg.getJavaxtruststore()));
172 } catch (Exception ex) {
173 log.log(Level.WARN, "error caught when referencing (get or set) System.properties for SSL communication. Check to ensure that this is enabled in your JAAS managemer", ex);
174 }
175 }
176 }
177
178 if (cfg.mode_ == AuthMode.PKI) {
179
180 if (Utility.stringIsNullOrEmpty(System.getProperty("javax.net.ssl.keyStore")) || Utility.stringIsNullOrEmpty(System.getProperty("javax.net.ssl.keyStorePassword"))) {
181 if (cfg.getJavaxkeystore() != null) {
182 context.put("javax.net.ssl.keyStorePassword", Utility.DE(cfg.getJavaxkeystorepass()));
183 context.put("javax.net.ssl.keyStore", Utility.DE(cfg.getJavaxkeystore()));
184 try {
185 System.setProperty("javax.net.ssl.keyStorePassword", Utility.DE(cfg.getJavaxkeystorepass()));
186 System.setProperty("javax.net.ssl.keyStore", cfg.getJavaxkeystore());
187 } catch (Exception ex) {
188 log.log(Level.WARN, "error caught when referencing (get or set) System.properties for SSL communication. Check to ensure that this is enabled in your JAAS managemer", ex);
189 }
190 }
191 }
192 }
193
194 ApacheCxfSslHelper.doCXF(cfg.pcsport, cfg);
195
196 }
197
198 }
199
200
201
202
203
204
205
206
207
208 protected static String IpWrapAndCacher(String URL, boolean isclient) {
209
210 if (MessageProcessor.getSingletonObject().getURLaddressMap().containsKey(URL)) {
211 return (String) MessageProcessor.getSingletonObject().getURLaddressMap().get(URL);
212 }
213 String newurl = IpAddressUtility.modifyURL(URL, isclient);
214 MessageProcessor.getSingletonObject().getURLaddressMap().put(URL, newurl);
215 return newurl;
216 }
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244 private static ServicePolicyResponseMsg FetchPolicy(String url, ConfigLoader.Algorithm style, boolean isclient) {
245 boolean ok = false;
246 int urlcount = 0;
247 int retrycount = 0;
248 BindingProvider bp = (BindingProvider) cfg.pcsport;
249 Map<String, Object> context = bp.getRequestContext();
250
251 switch (style) {
252 case FAILOVER:
253 retrycount = 0;
254 urlcount = 0;
255 ok = false;
256 while (!ok && (retrycount < cfg.PCSRetryCount)) {
257
258 urlcount = 0;
259 while (!ok && urlcount < cfg.PCS_URLS.size()) {
260 try {
261
262 context.put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, cfg.PCS_URLS.get(urlcount));
263
264
265 ServicePolicyRequestMsg request = new ServicePolicyRequestMsg();
266 request.setClassification(cfg.classlevel);
267 request.setURI(url);
268 request.setPolicytype(PolicyType.TRANSACTIONAL);
269 if (!isclient) {
270 request.setMachine(Utility.getHostName());
271
272 } else {
273 request.setMachine(new URL(url).getHost());
274 }
275 ServicePolicyResponseMsg result = cfg.pcsport.getServicePolicy(request);
276
277
278 ok = true;
279
280 return result;
281
282 } catch (Exception ex) {
283 log.log(Level.WARN, "fgsms unable to fetch policy from PCS at " + cfg.PCS_URLS.get(urlcount) + " will retry " + (cfg.PCSRetryCount - retrycount) + " times." + ex.getMessage());
284 }
285 urlcount++;
286 }
287 retrycount++;
288 }
289 if (!ok) {
290 log.log(Level.FATAL, "fgsms unable to fetch policy from any of the PCS[" + cfg.PCS_URLS.size() + "] URLs, retry count exceeded. Falling back to default policy");
291 MessageProcessor.getSingletonObject().setLastErrorMessage("All PCS endpoints are either unreachable or are responding in error");
292 }
293
294 break;
295 case ROUNDROBIN:
296 retrycount = 0;
297 urlcount = 0;
298 ok = false;
299 while (!ok && (retrycount < cfg.PCSRetryCount)) {
300
301 urlcount = 0;
302 while (!ok && urlcount < cfg.PCS_URLS.size()) {
303 try {
304
305 context.put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, cfg.PCS_URLS.get(urlcount));
306
307 ServicePolicyRequestMsg request = new ServicePolicyRequestMsg();
308 request.setClassification(cfg.classlevel);
309 request.setURI(url);
310 request.setPolicytype(PolicyType.TRANSACTIONAL);
311 if (!isclient) {
312 request.setMachine(Utility.getHostName());
313 } else {
314 request.setMachine(new URL(url).getHost());
315 }
316 ServicePolicyResponseMsg result = cfg.pcsport.getServicePolicy(request);
317 result.getPolicy();
318 ok = true;
319
320 return result;
321
322
323 } catch (Exception ex) {
324
325
326
327 log.log(Level.WARN, "fgsms unable to fetch policy from PCS at " + cfg.PCS_URLS.get(urlcount) + " will retry " + (cfg.PCSRetryCount - retrycount) + " times." + ex.getMessage());
328 }
329 urlcount++;
330 }
331 retrycount++;
332 }
333 if (!ok) {
334 log.log(Level.FATAL, "fgsms unable to fetch policy from any of the PCS[" + cfg.PCS_URLS.size() + "] URLs, retry count exceeded. Falling back to default policy");
335 MessageProcessor.getSingletonObject().setLastErrorMessage("All PCS endpoints are either unreachable or are responding in error");
336 }
337
338 break;
339 }
340 return null;
341 }
342
343
344
345
346
347
348
349
350 public static PolicyHelper LoadPolicy(String url, boolean isclient) {
351
352
353
354
355
356
357
358 PolicyHelper p = (PolicyHelper) policyCache.get(url);
359 if (p == null) {
360 try {
361 log.log(Level.INFO, "fgsms requesting policy for service " + url);
362
363
364 p = new PolicyHelper();
365 ServicePolicyResponseMsg result = FetchPolicy(url, cfg.PCSalgo, isclient);
366 if (result == null) {
367 log.log(Level.WARN, "fgsms PCS is unavailable or returned an error, reverting to agent default policy");
368 result = LoadDefaultPolicy(url);
369 p.lastUpdate = 0;
370
371 } else {
372 p.lastUpdate = Calendar.getInstance().getTimeInMillis();
373 }
374
375
376
377 p.policy = (TransactionalWebServicePolicy) result.getPolicy();
378
379 cfg.classlevel = result.getClassification();
380 policyCache.put(url, p);
381
382
383
384 log.log(Level.DEBUG, "fgsms obtained policy for service " + url + " and cached it.");
385 return p;
386 } catch (Exception ex) {
387 log.log(Level.ERROR, "fgsms Error retrieving policy from PCS for service at " + url + " because it's either down or busy (check config). This transaction will be discarded.", ex);
388
389 }
390 } else {
391
392 long expirationtime = p.policy.getPolicyRefreshRate().getTimeInMillis(Calendar.getInstance());
393 if (p.lastUpdate + expirationtime < System.currentTimeMillis()) {
394 try {
395 log.log(Level.INFO, "fgsms retrieved cached policy, but it has expired. Retrieving latest policy for service " + url);
396 policyCache.remove(url);
397
398
399
400 ServicePolicyResponseMsg result = FetchPolicy(url, cfg.PCSalgo, isclient);
401
402
403 p = new PolicyHelper();
404 p.lastUpdate = Calendar.getInstance().getTimeInMillis();
405
406
407
408 p.policy = (TransactionalWebServicePolicy) result.getPolicy();
409
410
411
412 cfg.classlevel = result.getClassification();
413 policyCache.put(url, p);
414 log.log(Level.INFO, "fgsms obtained policy for service " + url + " and cached it.");
415 return p;
416 } catch (Exception ex) {
417 log.log(Level.WARN, "fgsms Error refreshing policy from PCS for service at " + url + " using expired policy", ex);
418 return p;
419
420 }
421 } else {
422 log.log(Level.DEBUG, "fgsms retrieved cached policy " + url + ". It expires in " + (System.currentTimeMillis() - (p.lastUpdate + expirationtime)) + "ms");
423 return p;
424 }
425 }
426
427 return null;
428 }
429
430
431
432
433
434
435
436 public static ServicePolicyResponseMsg LoadDefaultPolicy(String url) {
437
438 try {
439 InputStream in = null;
440 ClassLoader loader = Thread.currentThread().getContextClassLoader();
441 if (loader == null) {
442 loader = ClassLoader.getSystemClassLoader();
443 }
444
445
446 in = loader.getResourceAsStream("org/miloss/fgsms/agentcore/defaultpolicy.xml");
447 String pol = ReadAllText(in);
448 in.close();
449
450 JAXBContext jc = Utility.getSerializationContext();
451
452 Unmarshaller u = jc.createUnmarshaller();
453
454 ByteArrayInputStream bss = new ByteArrayInputStream(pol.getBytes(Constants.CHARSET));
455
456
457 XMLInputFactory xf = XMLInputFactory.newInstance();
458 XMLStreamReader r = xf.createXMLStreamReader(bss);
459
460 JAXBElement<TransactionalWebServicePolicy> foo = (JAXBElement<TransactionalWebServicePolicy>) u.unmarshal(r, TransactionalWebServicePolicy.class);
461 if (foo
462 == null || foo.getValue()
463 == null) {
464 log.log(Level.WARN, "ServicePolicy is unexpectedly null or empty");
465 return null;
466 }
467 ServicePolicyResponseMsg ret = new ServicePolicyResponseMsg();
468
469 ret.setPolicy(foo.getValue());
470 ret.setClassification(cfg.classlevel);
471
472 ret.getPolicy().setURL(url);
473 return ret;
474 } catch (Exception ex) {
475 log.log(Level.ERROR, "error loading default policy from disk", ex);
476 }
477 return null;
478 }
479
480
481
482
483
484
485
486
487 private static AddDataRequestMsg PrepMessage(MessageCorrelator current, PolicyHelper p) {
488 if (current == null) {
489 return null;
490 }
491 AddDataRequestMsg req = new AddDataRequestMsg();
492 req.setClassification(cfg.classlevel);
493
494
495 req.setRequestURI(p.policy.getURL());
496 req.setURI(current.URL);
497 req.setAction(current.soapAction);
498 req.setServiceHost(MessageProcessor.getSingletonObject().getHostName());
499 req.setRequestSize(current.reqsize);
500 req.setResponseSize(current.ressize);
501 req.setMessage("Queue size:" + outboundQueue.size() + " PolicyCache:" + policyCache.size() + " MsgMap:" + current.currentMapsize + " CPUs:" + Runtime.getRuntime().availableProcessors());
502 req.setAgentType(current.agent_class_name);
503 req.setRequestURI(current.originalurl);
504 req.setRelatedTransactionID(current.RelatedMsgId);
505 req.setTransactionThreadID(current.TransactionThreadId);
506 req.setTransactionID(current.MessageID);
507 try {
508 DatatypeFactory f = DatatypeFactory.newInstance();
509 GregorianCalendar gcal = new GregorianCalendar();
510 gcal.setTimeInMillis(current.RecievedAt);
511 req.setRecordedat((gcal));
512 } catch (Exception ex) {
513 log.log(Level.ERROR, "unexpected error caught when preparing a message.", ex);
514 }
515 long diff = Math.abs(current.CompletedAt - current.RecievedAt);
516 int x = (int) diff;
517 req.setResponseTime(x);
518
519 req.setSuccess(!current.IsFault);
520
521
522
523 if (current.RequestMessage != null) {
524 if (p.policy.isRecordRequestMessage() || (p.policy.isRecordFaultsOnly() && current.IsFault)) {
525
526 if (p.policy.getRecordedMessageCap() >= current.RequestMessage.length()) {
527
528 req.setXmlRequest(current.RequestMessage);
529
530 } else {
531 req.setXmlRequest(current.RequestMessage.substring(0, p.policy.getRecordedMessageCap()));
532 }
533 } else {
534 if (DEBUG) {
535 log.info("skipping request payload " + p.policy.isRecordRequestMessage() + " " + p.policy.isRecordFaultsOnly() + " " + current.IsFault);
536 }
537 }
538 }
539 if (current.ResponseMessage != null) {
540 if (p.policy.isRecordResponseMessage() || (p.policy.isRecordFaultsOnly() && current.IsFault)) {
541 if (p.policy.getRecordedMessageCap() >= current.ResponseMessage.length()) {
542 req.setXmlResponse(current.ResponseMessage);
543
544 } else {
545 req.setXmlResponse(current.ResponseMessage.substring(0, p.policy.getRecordedMessageCap()));
546 }
547 } else {
548 if (DEBUG) {
549 log.info("skipping response payload " + p.policy.isRecordRequestMessage() + " " + p.policy.isRecordFaultsOnly() + " " + current.IsFault);
550 }
551 }
552 }
553
554
555
556
557
558 ArrayList<String> s2 = MessageProcessor.getSingletonObject().getUserIdentities(p.policy, current);
559
560
561 req.getIdentity().addAll(s2);
562
563
564 if (current.Headers != null && !current.Headers.isEmpty() && p.policy.isRecordHeaders()) {
565 Iterator e = current.Headers.keySet().iterator();
566 while (e.hasNext()) {
567 org.miloss.fgsms.services.interfaces.common.Header h = new org.miloss.fgsms.services.interfaces.common.Header();
568 String s = (String) e.next();
569 h.setName(s);
570 try {
571 String value = (String) current.Headers.get(s);
572 h.getValue().add(value);
573 req.getHeadersRequest().add(h);
574 } catch (Exception ex) {
575 }
576 try {
577 List<String> value = (List<String>) current.Headers.get(s);
578 for (int k = 0; k < value.size(); k++) {
579 h.getValue().add(value.get(k));
580 }
581 req.getHeadersRequest().add(h);
582
583 } catch (Exception ex) {
584 }
585 }
586 if (current.Header_Response != null && !current.Header_Response.isEmpty()) {
587 e = current.Header_Response.keySet().iterator();
588 while (e.hasNext()) {
589 org.miloss.fgsms.services.interfaces.common.Header h = new org.miloss.fgsms.services.interfaces.common.Header();
590 String s = (String) e.next();
591 h.setName(s);
592 try {
593 String value = (String) current.Header_Response.get(s);
594 h.getValue().add(value);
595 req.getHeadersRequest().add(h);
596 } catch (Exception ex) {
597 }
598 try {
599 List<String> value = (List<String>) current.Header_Response.get(s);
600 for (int k = 0; k < value.size(); k++) {
601 h.getValue().add(value.get(k));
602 }
603 req.getHeadersRequest().add(h);
604
605 } catch (Exception ex) {
606 }
607 }
608 }
609 }
610
611 return req;
612 }
613 private static String LastErrorMessage = "";
614
615
616
617
618
619 @Override
620 public void run() {
621
622 MessageProcessor.getSingletonObject().removeDeadMessage();
623
624 Init();
625 if (ErrorState) {
626 log.log(Level.FATAL, "fgsms Data Pusher is in an error state. Recorded data cannot be sent and will be discarded. Check the configuration file and log for reason. Last error message was " + LastErrorMessage);
627 outboundQueue.clear();
628 return;
629 }
630
631 DoDiscovery();
632
633 AddDataRequestMsg PreppedMessage = null;
634 boolean enabled = true;
635 while (!outboundQueue.isEmpty()) {
636
637 log.log(Level.DEBUG, "fgsms entering publish loop " + outboundQueue.size() + " items to publish.");
638 AddMoreData req = new AddMoreData();
639 int count = 0;
640 int totalbody = 0;
641 while (count < 40 && !outboundQueue.isEmpty() && enabled && totalbody < 1024000) {
642
643 MessageCorrelator current = null;
644 PreppedMessage = null;
645 try {
646 Object j = outboundQueue.remove();
647 if (j instanceof MessageCorrelator) {
648 current = (MessageCorrelator) j;
649 }
650 if (j instanceof AddDataRequestMsg) {
651 PreppedMessage = (AddDataRequestMsg) j;
652 }
653 } catch (Exception e) {
654 log.log(Level.DEBUG, "fgsms publish loop, error removing item from the queue, another thread must have grabbed the last item. Queue size is currently "
655 + outboundQueue.size());
656 }
657
658 boolean isclient = false;
659 if (current != null) {
660 isclient = current.agent_class_name != null && (current.agent_class_name.contains("client") || current.agent_class_name.contains("Client"));
661 if (isclient) {
662 current.URL = IpWrapAndCacher(current.URL, true);
663
664 } else {
665 current.URL = IpWrapAndCacher(current.URL, false);
666
667 }
668
669
670 if (MessageProcessor.getSingletonObject().getIgnoreList().contains(current.URL.toLowerCase())) {
671 current = null;
672 continue;
673 }
674 }
675 if (PreppedMessage != null) {
676
677 if (!org.miloss.fgsms.common.Utility.stringIsNullOrEmpty(PreppedMessage.getXmlRequest())) {
678 totalbody += PreppedMessage.getXmlRequest().length();
679 }
680 if (!org.miloss.fgsms.common.Utility.stringIsNullOrEmpty(PreppedMessage.getXmlResponse())) {
681 totalbody += PreppedMessage.getXmlResponse().length();
682 }
683 req.getReq().add(PreppedMessage);
684 count++;
685 }
686 if (current != null) {
687 PolicyHelper p = LoadPolicy(current.URL, isclient);
688 if (p == null || p.policy == null)
689 {
690
691 log.log(Level.WARN, "Unable to obtain policy for URL " + current.URL + ", transaction data will be lost.");
692 continue;
693 } else {
694
695 if (p.policy.isAgentsEnabled()) {
696 if (current != null) {
697 PreppedMessage = PrepMessage(current, p);
698 }
699 if (PreppedMessage != null && !org.miloss.fgsms.common.Utility.stringIsNullOrEmpty(PreppedMessage.getXmlRequest())) {
700 totalbody += PreppedMessage.getXmlRequest().length();
701 }
702 if (PreppedMessage != null && !org.miloss.fgsms.common.Utility.stringIsNullOrEmpty(PreppedMessage.getXmlResponse())) {
703 totalbody += PreppedMessage.getXmlResponse().length();
704 }
705 req.getReq().add(PreppedMessage);
706 count++;
707 } else {
708 enabled = false;
709 log.log(Level.WARN, "fgsms PCS reports that all agents are disabled!");
710 MessageProcessor.getSingletonObject().setLastErrorMessage("Agents centrally disabled");
711
712 outboundQueue.clear();
713 }
714 }
715 }
716
717 }
718
719 if (enabled) {
720
721 BindingProvider bp = (BindingProvider) cfg.dcsport;
722 Map<String, Object> context = bp.getRequestContext();
723
724 int retrycount = 0;
725 int urlcount = 0;
726 boolean ok = false;
727 if (!req.getReq().isEmpty()) {
728 switch (cfg.DCSalgo) {
729 case ROUNDROBIN:
730 retrycount = 0;
731 urlcount = 0;
732 ok = false;
733 while (!ok && (retrycount < cfg.DCSRetryCount)) {
734
735 urlcount = 0;
736 while (!ok && urlcount < cfg.DCS_URLS.size()) {
737 try {
738
739
740 context.put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, cfg.DCS_URLS.get(urlcount));
741
742 if (DEBUG) {
743 JAXB.marshal(req, System.out);
744 }
745
746 AddDataResponseMsg addMoreData = cfg.dcsport.addMoreData(req.getReq());
747 if (addMoreData != null && addMoreData.getStatus() != null && addMoreData.getStatus() == DataResponseStatus.SUCCESS) {
748 ok = true;
749 log.log(Level.DEBUG, "fgsms successfully sent transaction data for " + req.getReq().size() + " transactions. Items still to process: " + outboundQueue.size());
750 MessageProcessor.getSingletonObject().incMessagesProcessed(req.getReq().size());
751 } else {
752 ok = false;
753 log.log(Level.WARN, "fgsms failed to sent transaction data for " + req.getReq().size() + " transactions. 1 or more items couldn't be saved. Items still to process: " + outboundQueue.size());
754 }
755 } catch (Exception ex) {
756
757
758
759 log.log(Level.WARN, "fgsms Error sending performance data to DCS for service at " + cfg.DCS_URLS.get(urlcount) + " will retry " + (cfg.DCSRetryCount - retrycount) + " times.", ex);
760 }
761 urlcount++;
762 }
763 retrycount++;
764 }
765 if (!ok) {
766 log.log(Level.ERROR, "fgsms unable to send performance data to DCS, retry count exceeded.");
767 MessageProcessor.getSingletonObject().setLastErrorMessage("retry count exceeded");
768 if (cfg.behavior == ConfigLoader.UnavailableBehavior.HOLD) {
769 for (int i = 0; i < req.getReq().size(); i++) {
770 outboundQueue.add(req.getReq());
771 }
772 } else if (cfg.behavior == ConfigLoader.UnavailableBehavior.HOLDPERSIST) {
773 for (int i = 0; i < req.getReq().size(); i++) {
774 StorePersist(req);
775 }
776 } else if (cfg.behavior == ConfigLoader.UnavailableBehavior.PURGE) {
777 req.getReq().clear();
778 req = null;
779 } else {
780 throw new IllegalArgumentException("agent unavailable behavior");
781 }
782
783 }
784
785 break;
786 case FAILOVER:
787
788 retrycount = 0;
789 urlcount = 0;
790 ok = false;
791 while (!ok && (urlcount < cfg.DCS_URLS.size())) {
792
793 while (!ok && retrycount < cfg.DCSRetryCount) {
794 try {
795
796
797 context.put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, cfg.DCS_URLS.get(urlcount));
798
799 if (DEBUG) {
800 JAXB.marshal(req, System.out);
801 }
802
803 AddDataResponseMsg addMoreData = cfg.dcsport.addMoreData(req.getReq());
804 if (addMoreData != null && addMoreData.getStatus() != null && addMoreData.getStatus() == DataResponseStatus.SUCCESS) {
805 ok = true;
806 log.log(Level.DEBUG, "fgsms successfully sent transaction data for " + req.getReq().size() + " transactions. Items still to process: " + outboundQueue.size());
807 MessageProcessor.getSingletonObject().incMessagesProcessed(req.getReq().size());
808 } else {
809 ok = false;
810 log.log(Level.WARN, "fgsms failed to sent transaction data for " + req.getReq().size() + " transactions. 1 or more items couldn't be saved. Items still to process: " + outboundQueue.size());
811 }
812 } catch (Exception ex) {
813 log.log(Level.WARN, "fgsms Error sending performance data to DCS for service at " + cfg.DCS_URLS.get(urlcount) + " will retry " + (cfg.DCSRetryCount - retrycount) + " times.", ex);
814 }
815 retrycount++;
816 }
817 urlcount++;
818 }
819 if (!ok) {
820 log.log(Level.ERROR, "fgsms unable to send performance data to DCS, retry count exceeded.");
821 MessageProcessor.getSingletonObject().setLastErrorMessage("retry count exceeded");
822
823 if (cfg.behavior == ConfigLoader.UnavailableBehavior.HOLD) {
824 for (int i = 0; i < req.getReq().size(); i++) {
825 outboundQueue.add(req.getReq());
826 }
827 } else if (cfg.behavior == ConfigLoader.UnavailableBehavior.HOLDPERSIST) {
828 for (int i = 0; i < req.getReq().size(); i++) {
829 StorePersist(req);
830 }
831 } else if (cfg.behavior == ConfigLoader.UnavailableBehavior.PURGE) {
832 req.getReq().clear();
833 req = null;
834 } else {
835 throw new IllegalArgumentException("agent unavailable behavior");
836 }
837
838 }
839 break;
840 }
841 }
842 }
843
844 }
845 log.log(Level.DEBUG, "fgsms Data Pusher thread is terminating, no data available to send.");
846 }
847
848
849
850
851
852 protected static AddMoreData CheckPersistStore() {
853 Init();
854
855 if (Utility.stringIsNullOrEmpty(cfg.offlinestorage)
856 || cfg.behavior != ConfigLoader.UnavailableBehavior.HOLDPERSIST) {
857 return null;
858 }
859 File f = new File(cfg.offlinestorage);
860 if (f.exists() && f.isDirectory()) {
861 File[] list = f.listFiles();
862 if (list != null) {
863 try {
864 JAXBContext jc = JAXBContext.newInstance(ArrayOfSLA.class, SLA.class,
865 SLAAction.class, SLARuleGeneric.class, AndOrNot.class,
866 AddDataRequestMsg.class, AddMoreData.class, AddData.class, String.class,
867 Duration.class, Long.class, SecurityWrapper.class,
868 ClassificationType.class, List.class, Header.class);
869 Unmarshaller u
870 = jc.createUnmarshaller();
871 for (int i = 0; i < list.length; i++) {
872 try {
873 String s
874 = ReadAllText(f.getPath() + File.separator + f.getName());
875 s = Utility.DE(s);
876
877 ByteArrayInputStream bss = new ByteArrayInputStream(s.getBytes(Constants.CHARSET));
878 XMLStreamReader r
879 = XMLInputFactory.newFactory().createXMLStreamReader(bss);
880 JAXBElement<AddMoreData> foo = (JAXBElement<AddMoreData>) u.unmarshal(r,
881 AddMoreData.class);
882 if (foo
883 == null || foo.getValue()
884 == null) {
885 log.log(Level.WARN, "Add request is unexpectedly null or empty when reading it in from disk ");
886 }
887 boolean delete = f.delete();
888 if (!delete) {
889 log.log(Level.ERROR, "Unable to delete file " + f.getPath() + File.separator + f.getName() + " this may cause unintented consequences, even infinite looping. Ensure that"
890 + "this process has delete access to the folder");
891 }
892 if (foo != null) {
893 return foo.getValue();
894 } else {
895 return null;
896 }
897 } catch (Exception ex) {
898 log.log(Level.WARN, "error caught reading performance data from disk",
899 ex);
900 }
901 }
902 } catch (Exception ex) {
903 log.log(Level.WARN, "error caught reading performance data from disk",
904 ex);
905 }
906 }
907 }
908
909 return null;
910 }
911
912 protected static void EnsureFolderExists(String folder) {
913 File f = null;
914 try {
915 f = new File(folder);
916 if (f.exists()) {
917 return;
918 }
919 } catch (Exception ex) {
920 }
921 try {
922 new File(folder).mkdirs();
923 } catch (Exception ex) {
924 log.log(Level.WARN, "Cannot ensure that the folder " + folder + " exists");
925 }
926 }
927
928
929
930
931 protected static void StorePersist(AddMoreData req) {
932 if (req == null || req.getReq() == null || req.getReq().isEmpty()) {
933 return;
934 }
935 Init();
936 for (int i = 0; i < req.getReq().size(); i++) {
937 JAXBContext jc = null;
938 try {
939 jc = JAXBContext.newInstance(AddDataRequestMsg.class,
940 AddDataRequestMsg.class, AddMoreData.class, AddData.class,
941 String.class, Duration.class, Long.class, SecurityWrapper.class,
942 ClassificationType.class, List.class, org.miloss.fgsms.services.interfaces.common.Header.class);
943 Marshaller m = jc.createMarshaller();
944 StringWriter sw = new StringWriter();
945
946 m.marshal(
947 (req.getReq().get(i)), sw);
948 String s = sw.toString();
949 s = Utility.EN(s);
950
951 WriteAllText(cfg.offlinestorage + File.separator + UUID.randomUUID().toString(), s);
952 } catch (Exception ex) {
953 log.log(Level.WARN, "Unable to marshall or store to disk service performance record", ex);
954 }
955 }
956 }
957
958 protected static String ReadAllText(InputStream in) {
959 try {
960 InputStreamReader sr = new InputStreamReader(in, Constants.CHARSET);
961 StringBuilder fileData = new StringBuilder(1000);
962
963 BufferedReader reader = new BufferedReader(sr);
964
965 char[] buf = new char[1024];
966 int numRead = 0;
967 while ((numRead = reader.read(buf)) != -1) {
968 String readData = String.valueOf(buf, 0, numRead);
969 fileData.append(readData);
970 buf = new char[1024];
971 }
972 reader.close();
973 sr.close();
974 return fileData.toString();
975 } catch (Exception ex) {
976 }
977 return "";
978 }
979
980 private static String ReadAllText(String pathandfile) {
981 try {
982
983 StringBuilder fileData = new StringBuilder(1000);
984 BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(pathandfile), Constants.CHARSET));
985 char[] buf = new char[1024];
986 int numRead = 0;
987 while ((numRead = reader.read(buf)) != -1) {
988 String readData = String.valueOf(buf, 0, numRead);
989 fileData.append(readData);
990 buf = new char[1024];
991 }
992 reader.close();
993 return fileData.toString();
994 }
995
996
997
998
999
1000
1001
1002 catch (Exception e) {
1003 log.log(Level.ERROR, "ReadAllText, ", e);
1004 return "";
1005 }
1006 }
1007
1008 private static void WriteAllText(String filename, String text) {
1009 try {
1010 File f = new File(filename);
1011
1012 log.log(Level.INFO, "WriteAllText Current Dir = " + f.getName() + f.getAbsolutePath());
1013 BufferedWriter out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(filename), Constants.CHARSET));
1014 out.write(text);
1015 out.close();
1016 } catch (Exception e) {
1017 log.log(Level.INFO, filename + " WriteAllText, ", e);
1018 }
1019 }
1020 List<IEndpointDiscovery> endpointproviders = null;
1021
1022 private void DoDiscovery() {
1023 if (cfg == null || cfg.prop == null) {
1024 throw new NullPointerException("fgsms properties file is not available.");
1025 }
1026 if (endpointproviders == null) {
1027 endpointproviders = LoadEndpointProviders(cfg);
1028 }
1029 List<String> endpointspcs = new ArrayList<String>();
1030 List<String> endpointsdcs = new ArrayList<String>();
1031 boolean ran = false;
1032 for (int i = 0; i < endpointproviders.size(); i++) {
1033 try {
1034 if ((System.currentTimeMillis() - cfg.discoveryInterval) > endpointproviders.get(i).GetLastLookup()) {
1035 if (endpointproviders.get(i).IsEnabled()) {
1036 endpointspcs.addAll(endpointproviders.get(i).GetPCSURLs());
1037 endpointsdcs.addAll(endpointproviders.get(i).GetDCSURLs());
1038 ran = true;
1039 endpointproviders.get(i).SetLastLookup(System.currentTimeMillis());
1040 }
1041 }
1042 } catch (Exception ex) {
1043 log.fatal("The discovery provery " + endpointproviders.getClass().getCanonicalName() + " is faulty and threw an exception", ex);
1044 }
1045
1046 }
1047 if (ran) {
1048 for (int i = 0; i < endpointspcs.size(); i++) {
1049 if (!cfg.PCS_URLS.contains(endpointspcs.get(i))) {
1050 cfg.PCS_URLS.add(endpointspcs.get(i));
1051 }
1052 }
1053 for (int i = 0; i < endpointspcs.size(); i++) {
1054 if (!cfg.DCS_URLS.contains(endpointsdcs.get(i))) {
1055 cfg.DCS_URLS.add(endpointsdcs.get(i));
1056 }
1057 }
1058 }
1059
1060 }
1061
1062 public static List<IEndpointDiscovery> LoadEndpointProviders(ConfigLoader cl) {
1063 if (cl == null || cl.prop == null) {
1064 throw new NullPointerException("fgsms properties file is not available.");
1065 }
1066 List<String> discovery_providers = cl.getDiscovery_providers();
1067 List<IEndpointDiscovery> eps = new ArrayList<IEndpointDiscovery>();
1068 for (int i = 0; i < discovery_providers.size(); i++) {
1069 try {
1070 Class t = Class.forName(discovery_providers.get(i));
1071 IEndpointDiscovery newInstance = (IEndpointDiscovery) t.newInstance();
1072 newInstance.LoadConfig(cl.prop);
1073 eps.add(newInstance);
1074 } catch (Exception ex) {
1075 log.warn("Unable to load endpoint provider " + discovery_providers.get(i), ex);
1076 }
1077 }
1078 return eps;
1079 }
1080 }