1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.miloss.fgsms.agents.smxviajmxagent;
22
23 import java.io.File;
24 import java.io.RandomAccessFile;
25 import java.nio.channels.FileChannel;
26 import java.nio.channels.FileLock;
27 import java.sql.Connection;
28 import java.sql.PreparedStatement;
29 import java.sql.SQLException;
30 import java.util.*;
31 import javax.management.AttributeList;
32 import javax.management.MBeanServerConnection;
33 import javax.management.ObjectName;
34 import javax.management.remote.JMXConnector;
35 import javax.management.remote.JMXConnectorFactory;
36 import javax.management.remote.JMXServiceURL;
37 import org.apache.log4j.Level;
38 import org.miloss.fgsms.common.Logger;;
39 import org.apache.log4j.PropertyConfigurator;
40 import org.miloss.fgsms.common.DBSettingsLoader;
41 import org.miloss.fgsms.common.DBUtils;
42 import org.miloss.fgsms.common.IpAddressUtility;
43 import org.miloss.fgsms.common.PropertyLoader;
44 import org.miloss.fgsms.common.Utility;
45 import org.miloss.fgsms.services.interfaces.common.PolicyType;
46 import org.miloss.fgsms.services.interfaces.datacollector.AddStatisticalDataRequestMsg;
47 import org.miloss.fgsms.services.interfaces.datacollector.BrokerData;
48 import org.miloss.fgsms.services.interfaces.policyconfiguration.*;
49 import org.miloss.fgsms.sla.AuxHelper;
50 import org.miloss.fgsms.sla.SLACommon;
51 import org.miloss.fgsms.sla.actions.SLAActionRestart;
52
53
54
55
56
57
58
59
60 public class FgsmsSMXviaJMXAgent {
61
62 static Logger log = Logger.getLogger("fgsms.SMXJMX");
63 private boolean running = true;
64
65 private boolean done = false;
66
67 public class RunWhenShuttingDown extends Thread {
68
69 public void run() {
70 System.out.println("Control-C caught. Shutting down...");
71 running = false;
72
73 while (!done) {
74 try {
75 Thread.sleep(1000);
76 } catch (InterruptedException ex) {
77 }
78 }
79 closeLock();
80 deleteFile();
81 if (urls != null) {
82 for (int i = 0; i < urls.length; i++) {
83 AuxHelper.TryUpdateStatus(false, IpAddressUtility.modifyURL(urls[i], true), "Agent Stopped", false, PolicyType.STATISTICAL, AuxHelper.UNSPECIFIED, SLACommon.GetHostName());
84 }
85 }
86 }
87 }
88 private File file;
89 private FileChannel channel;
90 private FileLock lock;
91
92 private void closeLock() {
93 try {
94 lock.release();
95 } catch (Exception e) {
96 }
97 try {
98 channel.close();
99 } catch (Exception e) {
100 }
101 }
102
103 private void deleteFile() {
104 try {
105 file.delete();
106 } catch (Exception e) {
107 }
108 }
109 public static String[] urls = null;
110
111
112
113
114 public static void main(String[] args) {
115 new FgsmsSMXviaJMXAgent().Main(args);
116
117 }
118
119 private void Main(String[] args) {
120
121 try {
122 file = new File("smx.lck");
123 channel = new RandomAccessFile(file, "rw").getChannel();
124 lock = channel.tryLock();
125 } catch (Exception e) {
126
127 closeLock();
128 System.out.println("Could not obtain the lock, this means that either this program is already running or something went wrong and the file smx.lck needs to be deleted.");
129 return;
130 }
131 if (lock == null) {
132 closeLock();
133 System.out.println("Could not obtain the lock, this means that either this program is already running or something went wrong and the file smx.lck needs to be deleted.");
134 return;
135 }
136
137
138 Runtime.getRuntime().addShutdownHook(new RunWhenShuttingDown());
139 PropertyConfigurator.configure("log4j.properties");
140 long interval = 10000;
141 if (args.length == 1) {
142 try {
143 interval = Long.parseLong(args[0]);
144 if (interval < 10000) {
145 interval = 10000;
146 }
147 } catch (Exception ex) {
148 }
149 }
150 log.log(Level.INFO, "fgsms SMX JMX Agent startup...");
151 Properties prop = PropertyLoader.loadProperties("fgsms.smxviajmxagent/connection");
152
153 String temp = (String) prop.get("JMXServiceURL");
154 if (temp.contains("|")) {
155 urls = temp.split("\\|");
156 } else {
157 urls = new String[1];
158 urls[0] = temp;
159 }
160
161 if ((urls == null || urls.length == 0)) {
162 log.log(Level.FATAL, "no JMX url has been defined.");
163 return;
164 }
165
166 long lastranat = 0;
167 while (running) {
168 if (lastranat < System.currentTimeMillis() - interval) {
169 try {
170 lastranat = System.currentTimeMillis();
171
172 for (int aa = 0; aa < urls.length; aa++) {
173 String originalurl = urls[aa];
174 String url = IpAddressUtility.modifyURL(originalurl, true);
175 try {
176 Connection con = Utility.getConfigurationDB_NONPOOLED_Connection();
177 AuxHelper.CheckStatisticalPolicyAndCreate(url, con, false, AuxHelper.UNSPECIFIED, SLACommon.GetHostName());
178 con.close();
179 } catch (Exception ex) {
180 }
181 Fire(false, originalurl, url);
182 }
183 } catch (Exception ex) {
184 log.log(Level.ERROR, null, ex);
185 }
186 log.log(Level.INFO, "Sleeping " + interval + "ms until next interation....");
187 }
188 if (running) {
189 try {
190 Thread.sleep(1000);
191 } catch (InterruptedException ex) {
192 log.log(Level.ERROR, null, ex);
193 }
194 }
195 done = true;
196 }
197 }
198
199 @SuppressWarnings("unchecked")
200 public static void Fire(boolean pooled, final String url, final String modifiedurl) throws Exception {
201
202 ServicePolicy p = null;
203 if (pooled) {
204 p = SLACommon.LoadPolicyPooled(modifiedurl);
205 } else {
206 p = SLACommon.LoadPolicyNotPooled(modifiedurl);
207 }
208
209
210
211 JMXServiceURL u = new JMXServiceURL(url);
212 Map m = new HashMap();
213
214 boolean hascred = false;
215 boolean ok = true;
216 String[] data = DBSettingsLoader.GetCredentials(pooled, modifiedurl);
217 if (data != null && data.length >= 2) {
218 String[] cred = new String[]{data[0], Utility.DE(data[1])};
219 m.put(JMXConnector.CREDENTIALS, cred);
220 hascred = true;
221 }
222 String status = "OK";
223 JMXConnector c = null;
224 try {
225 if (hascred) {
226 c = JMXConnectorFactory.connect(u, m);
227 } else {
228 c = JMXConnectorFactory.connect(u);
229 }
230 AuxHelper.TryUpdateStatus(true, modifiedurl, status, pooled, PolicyType.STATISTICAL, AuxHelper.UNSPECIFIED, SLACommon.GetHostName(), null);
231 } catch (Exception ex) {
232 status = ex.getMessage();
233 if (status.equalsIgnoreCase("Expected String[2], got null")) {
234 status = "No credentials were provided";
235 }
236 AuxHelper.TryUpdateStatus(false, modifiedurl, status, pooled, PolicyType.STATISTICAL, AuxHelper.UNSPECIFIED, SLACommon.GetHostName(), null);
237
238 log.log(Level.WARN, "SMX Server offline " + url + " " + status);
239 ok = false;
240 }
241
242 if (ok && c!=null) {
243 log.log(Level.INFO, "SMX Server Online, updating ActiveMQ statistics" + url);
244 MBeanServerConnection mBeanServerConnection = c.getMBeanServerConnection();
245
246
247 Set<ObjectName> queryNames = mBeanServerConnection.queryNames(ObjectName.WILDCARD, null);
248
249 Iterator<ObjectName> iterator = queryNames.iterator();
250 String[] queueattribs = new String[]{
251 "Name",
252 "ConsumerCount",
253 "DequeueCount",
254 "DispatchCount",
255 "EnqueueCount",
256 "ExpiredCount",
257 "ProducerCount",
258 "QueueSize"
259 };
260
261 java.sql.Connection perfcon = null;
262 PreparedStatement ps=null;
263
264 try {
265 if (pooled) {
266 perfcon = Utility.getPerformanceDBConnection();
267
268 } else {
269 perfcon = Utility.getPerformanceDB_NONPOOLED_Connection();
270
271 }
272
273
274
275 ps = perfcon.prepareStatement("delete from brokerrawdata where host=?");
276 ps.setString(1, modifiedurl);
277 ps.executeUpdate();
278 Map currenttopics = new HashMap();
279
280 while (iterator.hasNext()) {
281 ObjectName n = iterator.next();
282 if (n != null && n.getCanonicalName() != null) {
283 if (n.getCanonicalName().startsWith("org.apache.activemq:BrokerName")) {
284 {
285
286
287 AttributeList attributes = null;
288 try {
289 attributes = mBeanServerConnection.getAttributes(new ObjectName(n.getCanonicalName()), queueattribs);
290 } catch (Exception ex2) {
291 try {
292 mBeanServerConnection = c.getMBeanServerConnection();
293 attributes = mBeanServerConnection.getAttributes(new ObjectName(n.getCanonicalName()), queueattribs);
294 } catch (Exception ex3) {
295 }
296 }
297
298 log.log(Level.DEBUG, "Updating: " + n.getCanonicalName());
299
300 String bigname = n.getCanonicalName();
301 String ExchangeType = (bigname.contains("Type=Topic")) ? "Topic" : "Queue";
302
303
304
305
306
307 long ExpiredCount = 0;
308
309
310
311
312
313
314 long ConsumerCount = 0;
315 long RecievedMessageCount = 0;
316 long MessageCount = 0;
317 String name = "";
318 long QueueDepth = 0;
319 if (attributes != null) {
320 for (int i2 = 0; i2 < attributes.size(); i2++) {
321
322
323
324
325
326
327
328
329
330
331
332
333 if (attributes.get(i2).toString().toLowerCase().startsWith("enqueuecount = ")) {
334 RecievedMessageCount = Long.parseLong(attributes.get(i2).toString().toLowerCase().replace("enqueuecount = ", "").trim());
335 }
336 if (attributes.get(i2).toString().toLowerCase().startsWith("dispatchcount = ")) {
337 MessageCount = Long.parseLong(attributes.get(i2).toString().toLowerCase().replace("dispatchcount = ", "").trim());
338 }
339 if (attributes.get(i2).toString().toLowerCase().startsWith("queuesize = ")) {
340 QueueDepth = Long.parseLong(attributes.get(i2).toString().toLowerCase().replace("queuesize = ", "").trim());
341 }
342 if (attributes.get(i2).toString().toLowerCase().startsWith("consumercount = ")) {
343 ConsumerCount = Long.parseLong(attributes.get(i2).toString().toLowerCase().replace("consumercount = ", "").trim());
344 }
345 if (attributes.get(i2).toString().toLowerCase().startsWith("name = ")) {
346 name = attributes.get(i2).toString().split("=")[1].trim();
347 }
348 if (attributes.get(i2).toString().toLowerCase().startsWith("expiredcount = ")) {
349 ExpiredCount = Long.parseLong(attributes.get(i2).toString().toLowerCase().replace("expiredcount = ", "").trim());
350 }
351 }
352 }
353
354 if (!Utility.stringIsNullOrEmpty(name)) {
355 currenttopics.put(n.getCanonicalName(), QueueDepth);
356
357 UpdateData(modifiedurl, name, bigname, MessageCount, RecievedMessageCount, ConsumerCount, ConsumerCount,
358 QueueDepth, ExchangeType, 0, 0, 0, ExpiredCount, perfcon);
359 }
360 }
361 }
362 }
363 }
364
365
366
367 int components = 0;
368 String[] componentattribs = new String[]{"CurrentState", "Name", "MainType"};
369 queryNames = mBeanServerConnection.queryNames(ObjectName.WILDCARD, null);
370 iterator = queryNames.iterator();
371 while (iterator.hasNext()) {
372
373 ObjectName n = iterator.next();
374 if (n != null && n.getCanonicalName() != null) {
375 {
376 AttributeList attributes = null;
377 try {
378 mBeanServerConnection = c.getMBeanServerConnection();
379 attributes = mBeanServerConnection.getAttributes(new ObjectName(n.getCanonicalName()), componentattribs);
380 } catch (Exception ex2) {
381 try {
382 mBeanServerConnection = c.getMBeanServerConnection();
383 attributes = mBeanServerConnection.getAttributes(new ObjectName(n.getCanonicalName()), componentattribs);
384 } catch (Exception ex3) {
385 }
386 }
387 if (attributes != null) {
388 String name = "";
389 String currentstate = "";
390 for (int i2 = 0; i2 < attributes.size(); i2++) {
391 String[] temp = attributes.get(i2).toString().split("=");
392 if (temp.length == 2) {
393 if (temp[0].trim().equalsIgnoreCase("name")) {
394 name = temp[1].trim();
395 }
396 if (temp[0].trim().equalsIgnoreCase("CurrentState")) {
397 currentstate = temp[1].trim();
398 }
399 }
400 }
401 if (!Utility.stringIsNullOrEmpty(name) && !Utility.stringIsNullOrEmpty(currentstate)) {
402 boolean running = currentstate.toLowerCase().contains("start") ? true : false;
403 AuxHelper.TryUpdateStatus(running, modifiedurl + "/" + name, currentstate, pooled, PolicyType.STATUS, AuxHelper.UNSPECIFIED, AuxHelper.UNSPECIFIED, modifiedurl);
404 if (!running) {
405 StatusServicePolicy spol = null;
406 if (pooled) {
407 spol = (StatusServicePolicy) SLACommon.LoadPolicyPooled(modifiedurl + "/" + name);
408 } else {
409 spol = (StatusServicePolicy) SLACommon.LoadPolicyNotPooled(modifiedurl + "/" + name);
410 }
411 if (containsRestartSLA(spol.getServiceLevelAggrements())) {
412
413 try {
414 Object invoke = mBeanServerConnection.invoke(n, "Start", null, null);
415 } catch (Exception ex) {
416 log.log(Level.ERROR, "the attemp to restart the service identified by " + modifiedurl + "/" + name, ex);
417 }
418 }
419 }
420 components++;
421 }
422 }
423 }
424 }
425 }
426
427
428
429
430
431 queryNames = mBeanServerConnection.queryNames(ObjectName.WILDCARD, null);
432 iterator = queryNames.iterator();
433 AddStatisticalDataRequestMsg req = new AddStatisticalDataRequestMsg();
434 while (iterator.hasNext()) {
435 ObjectName n = iterator.next();
436 if (n != null && n.getCanonicalName() != null) {
437 if ((n.getDomain().equalsIgnoreCase("org.apache.camel")) && (n.getCanonicalName().contains("type=routes"))) {
438 {
439 log.log(Level.DEBUG, "Updating: " + n.getCanonicalName());
440
441 String bigname = n.getCanonicalName();
442 String ExchangeType = "Route";
443
444
445
446
447 long MessageCount = 0;
448 long MessageDropCount = 0;
449 long ExchangesTotal = 0;
450 String name = "";
451 long QueueDepth = 0;
452
453
454
455 try {
456 name = (String) mBeanServerConnection.getAttribute(n, "EndpointUri");
457 } catch (Exception ex) {
458 }
459 try {
460 MessageDropCount = (Long) mBeanServerConnection.getAttribute(n, "ExchangesFailed");
461 } catch (Exception ex) {
462 }
463 try {
464 MessageCount = (Long) mBeanServerConnection.getAttribute(n, "ExchangesCompleted");
465 } catch (Exception ex) {
466 }
467 try {
468 ExchangesTotal = (Long) mBeanServerConnection.getAttribute(n, "ExchangesTotal");
469 } catch (Exception ex) {
470 }
471
472 if (!Utility.stringIsNullOrEmpty(name)) {
473 currenttopics.put(n.getCanonicalName(), QueueDepth);
474 req.getData().add(UpdateData(modifiedurl, name, bigname, MessageCount, ExchangesTotal, 0, 0,
475 QueueDepth, ExchangeType, 0, 0, 0, MessageDropCount, perfcon));
476 }
477
478 }
479 }
480 }
481 }
482
483 log.log(Level.INFO, "SMX Server Online, updating status for " + components + " SMX components" + url);
484
485 req.setAgentType(AGENT);
486 req.setBrokerURI(modifiedurl);
487 req.setBrokerHostname(p.getMachineName());
488 req.setDomain(p.getDomainName());
489 req.setClassification(SLACommon.GetClassLevel(pooled));
490 req.setOperationalStatus(ok);
491 req.setOperationalStatusMessage(status);
492 SLACommon.ProcessStatisticalSLARules(req, pooled);
493 } catch (Exception ex) {
494 log.log(Level.ERROR, "Cannot connect to fgsms performance db, monitoring is not possible, exiting...");
495 return;
496 } finally {
497 DBUtils.safeClose(ps);
498 DBUtils.safeClose(perfcon);
499 }
500 } else {
501
502 }
503
504
505 }
506 final static String AGENT = "org.miloss.fgsms.agents.amqp.jmx";
507
508 private static BrokerData UpdateData(String url, String name, String bigname,
509 long MessageCount, long RecievedMessageCount, long ConsumerCount, long ActiveConsumerCount,
510 long QueueDepth, String ExchangeType, long bytesin, long bytesout, long bytesdrop,
511 long MessageDropCount, Connection perf) {
512
513 BrokerData bd = new BrokerData();
514 bd.setActiveConsumers(QueueDepth);
515 bd.setQueueOrTopicCanonicalName(bigname);
516 bd.setQueueOrTopicName(name);
517 bd.setMessagesIn(RecievedMessageCount);
518 bd.setMessagesDropped(MessageDropCount);
519 bd.setDepth(QueueDepth);
520 bd.setActiveConsumers(ActiveConsumerCount);
521 bd.setBytesDropped(bytesdrop);
522 bd.setBytesIn(bytesin);
523 bd.setBytesOut(bytesout);
524 bd.setItemType(ExchangeType);
525 bd.setMessagesOut(MessageCount);
526 bd.setTotalConsumers(ConsumerCount);
527 if (!Utility.stringIsNullOrEmpty(name)) {
528 PreparedStatement comm=null;
529 try {
530 comm = perf.prepareStatement("Delete from brokerrawdata where host=? and namecol=? and canonicalname=?; "
531 + "INSERT INTO brokerrawdata (host, utcdatetime, namecol, canonicalname, messagecount,recievedmessagecount, consumercount, "
532 + "activeconsumercount, queuedepth, typecol, agenttype, messagedropcount, bytesdropcount, bytesin, bytesout) "
533 + "VALUES (?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"
534 + "INSERT INTO brokerhistory (host, utcdatetime, namecol,canonicalname, messagecount,recievedmessagecount, consumercount, "
535 + "activeconsumercount, queuedepth, typecol, agenttype, messagedropcount, bytesdropcount, bytesin, bytesout) "
536 + "VALUES (?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);");
537 comm.setString(1, url);
538 comm.setString(2, name);
539 comm.setString(3, bigname);
540 comm.setString(4, url);
541 comm.setLong(5, System.currentTimeMillis());
542 comm.setString(6, name);
543 comm.setString(7, bigname);
544 comm.setLong(8, MessageCount);
545 comm.setLong(9, RecievedMessageCount);
546 comm.setLong(10, ConsumerCount);
547 comm.setLong(11, ActiveConsumerCount);
548 comm.setLong(12, QueueDepth);
549 comm.setString(13, ExchangeType);
550 comm.setString(14, AGENT);
551 comm.setLong(15, MessageDropCount);
552 comm.setLong(16, bytesdrop);
553 comm.setLong(17, bytesin);
554 comm.setLong(18, bytesout);
555
556 comm.setString(19, url);
557 comm.setLong(20, System.currentTimeMillis());
558 comm.setString(21, name);
559 comm.setString(22, bigname);
560 comm.setLong(23, MessageCount);
561 comm.setLong(24, RecievedMessageCount);
562 comm.setLong(25, ConsumerCount);
563 comm.setLong(26, ActiveConsumerCount);
564 comm.setLong(27, QueueDepth);
565 comm.setString(28, ExchangeType);
566 comm.setString(29, AGENT);
567 comm.setLong(30, MessageDropCount);
568 comm.setLong(31, bytesdrop);
569 comm.setLong(32, bytesin);
570 comm.setLong(33, bytesout);
571 comm.execute();
572
573 } catch (SQLException ex) {
574 log.log(Level.WARN, "error saving broker data", ex);
575 } finally {
576 DBUtils.safeClose(comm);
577 }
578
579 }
580 return bd;
581
582 }
583
584 private static boolean containsRestartSLA(ArrayOfSLA serviceLevelAggrements) {
585 if (serviceLevelAggrements == null) {
586 return false;
587 }
588
589 if (serviceLevelAggrements.getSLA().isEmpty()) {
590 return false;
591 }
592 for (int i = 0; i < serviceLevelAggrements.getSLA().size(); i++) {
593 if (serviceLevelAggrements.getSLA().get(i).getAction() == null) {
594 continue;
595 }
596 for (int k = 0; k < serviceLevelAggrements.getSLA().get(i).getAction().getSLAAction().size(); k++) {
597 if (serviceLevelAggrements.getSLA().get(i).getAction().getSLAAction().get(k).getImplementingClassName().equalsIgnoreCase(
598 SLAActionRestart.class.getCanonicalName())) {
599 return true;
600 }
601 }
602 }
603 return false;
604 }
605 }