View Javadoc
1   /**
2    * This Source Code Form is subject to the terms of the Mozilla Public
3    * License, v. 2.0. If a copy of the MPL was not distributed with this
4    * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5    *
6    * If it is not possible or desirable to put the notice in a particular
7    * file, then You may include the notice in a location (such as a LICENSE
8    * file in a relevant directory) where a recipient would be likely to look
9    * for such a notice.
10  
11   * 
12   */
13   
14  /*  ---------------------------------------------------------------------------
15   *  U.S. Government, Department of the Army
16   *  Army Materiel Command
17   *  Research Development Engineering Command
18   *  Communications Electronics Research Development and Engineering Center
19   *  ---------------------------------------------------------------------------
20   */
21  
22  package org.miloss.fgsms.agents;
23  
24  import java.io.File;
25  import java.io.RandomAccessFile;
26  import java.nio.channels.FileChannel;
27  import java.nio.channels.FileLock;
28  import java.sql.Connection;
29  import java.sql.PreparedStatement;
30  import java.sql.SQLException;
31  import java.util.*;
32  import javax.management.AttributeList;
33  import javax.management.MBeanServerConnection;
34  import javax.management.ObjectName;
35  import javax.management.remote.JMXConnector;
36  import javax.management.remote.JMXConnectorFactory;
37  import javax.management.remote.JMXServiceURL;
38  import org.miloss.fgsms.common.DBSettingsLoader;
39  import org.miloss.fgsms.sla.AuxHelper;
40  import org.miloss.fgsms.common.IpAddressUtility;
41  import org.miloss.fgsms.common.PropertyLoader;
42  import org.miloss.fgsms.common.Utility;
43  import org.miloss.fgsms.services.interfaces.common.PolicyType;
44  import org.miloss.fgsms.services.interfaces.policyconfiguration.ServicePolicy;
45  import org.miloss.fgsms.sla.SLACommon;
46  import org.apache.log4j.Level;
47  import org.miloss.fgsms.common.Logger;;
48  import org.apache.log4j.PropertyConfigurator;
49  import org.miloss.fgsms.services.interfaces.datacollector.AddStatisticalDataRequestMsg;
50  import org.miloss.fgsms.services.interfaces.datacollector.BrokerData;
51  
52  /**
53   * fgsms monitor for Hornet Qmessage brokers via JMX only
54   * 
55   * runs at FGSMS server
56   *
57   * @author AO
58   *
59   */
60  public class HornetqJMSAgent {
61  
62      static Logger log = Logger.getLogger("fgsms.HornetqJMX");
63      private boolean running = true;
64      //~ Inner Classes -----------------------------------------------------------------------------
65      private boolean done = false;
66  
67      public class RunWhenShuttingDown extends Thread {
68  
69          public void run() {
70              System.out.println("Control-C caught. Shutting down...");
71              running = false;
72  
73              while (!done) {
74                  try {
75                      Thread.sleep(1000);
76                  } catch (InterruptedException ex) {
77                  }
78              }
79              closeLock();
80              deleteFile();
81              if (urls != null) {
82                  for (int i = 0; i < urls.length; i++) {
83                      AuxHelper.TryUpdateStatus(false, IpAddressUtility.modifyURL(urls[i], true), "Agent Stopped", false, PolicyType.STATISTICAL, AuxHelper.UNSPECIFIED, SLACommon.GetHostName());
84                  }
85              }
86          }
87      }
88      private File file;
89      private FileChannel channel;
90      private FileLock lock;
91      private String[] urls = null;
92   
93      private void closeLock() {
94          try {
95              lock.release();
96          } catch (Exception e) {
97          }
98          try {
99              channel.close();
100         } catch (Exception e) {
101         }
102     }
103 
104     private void deleteFile() {
105         try {
106             file.delete();
107         } catch (Exception e) {
108         }
109     }
110    
111 
112     /**
113      * @param args the command line arguments
114      */
115     public static void main(String[] args) {
116         new HornetqJMSAgent().Main(args);
117 
118     }
119 
120     private void Main(String[] args) {
121 
122         try {
123             file = new File("Hornetq.lck");
124             channel = new RandomAccessFile(file, "rw").getChannel();
125             lock = channel.tryLock();
126         } catch (Exception e) {
127             // already locked
128             closeLock();
129             System.out.println("Could not obtain the lock, this means that either this program is already running or something went wrong and the file Hornetq.lck needs to be deleted.");
130             return;
131         }
132         if (lock == null) {
133             closeLock();
134             System.out.println("Could not obtain the lock, this means that either this program is already running or something went wrong and the file Hornetq.lck needs to be deleted.");
135             return;
136         }
137 
138 
139         Runtime.getRuntime().addShutdownHook(new RunWhenShuttingDown());
140         PropertyConfigurator.configure("log4j.properties");
141         long interval = 10000;
142         if (args.length == 1) {
143             try {
144                 interval = Long.parseLong(args[0]);
145                 if (interval < 10000) {
146                     interval = 10000;
147                 }
148             } catch (Exception ex) {
149             }
150         }
151         log.log(Level.INFO, "fgsms hornetq JMX Agent startup...");
152         Properties prop = PropertyLoader.loadProperties("org.miloss.fgsms.agents.connection");
153 
154         String temp = (String) prop.get("JMXServiceURL");
155         if (temp.contains("|")) {
156             urls = temp.split("\\|");
157         } else {
158             urls = new String[1];
159             urls[0] = temp;
160         }
161 
162         if ((urls == null || urls.length == 0)) {
163             log.log(Level.FATAL, "no JMX url has been defined.");
164             return;
165         }
166 
167         long lastranat = 0;
168         while (running) {
169             if (lastranat < System.currentTimeMillis() - interval) {
170                 try {
171                     lastranat = System.currentTimeMillis();
172 
173                     for (int aa = 0; aa < urls.length; aa++) {
174                         String originalurl = urls[aa];
175                         String url = IpAddressUtility.modifyURL(originalurl, true);
176                         try {
177                             Connection con = Utility.getConfigurationDB_NONPOOLED_Connection();
178                             AuxHelper.CheckStatisticalPolicyAndCreate(url, con, false, AuxHelper.UNSPECIFIED, SLACommon.GetHostName());
179                             con.close();
180                         } catch (Exception ex) {
181                         }
182                         Fire(false, originalurl, url);
183                     }
184                 } catch (Exception ex) {
185                     log.log(Level.ERROR, null, ex);
186                 }
187                 log.log(Level.INFO, "Sleeping " + interval + "ms until next interation....");
188             }
189             if (running) {
190                 try {
191                     Thread.sleep(1000);
192                 } catch (InterruptedException ex) {
193                     log.log(Level.ERROR, null, ex);
194                 }
195             }
196             done = true;
197         }
198     }
199 
200     public static void Fire(final boolean pooled, final String url, final String modifiedurl) throws Exception {
201 
202         ServicePolicy p = null;
203         if (pooled) {
204             p = SLACommon.LoadPolicyPooled(modifiedurl);
205         } else {
206             p = SLACommon.LoadPolicyNotPooled(modifiedurl);
207         } 
208         //boolean slarules = false;
209         //if (p != null && SLACommon.ListContainsStatisticalRules(p.getServiceLevelAggrements())) {
210         //    slarules = true;
211         //}
212         
213         //FIXME what do we do if the policy is null?
214 
215         JMXServiceURL u = new JMXServiceURL(url);
216         Map m = new HashMap();
217 
218 
219         boolean ok = true;
220         String[] data = DBSettingsLoader.GetCredentials(pooled, modifiedurl);
221         if (data != null) {
222             String[] cred = new String[]{data[0], Utility.DE(data[1])
223             };
224             m.put(JMXConnector.CREDENTIALS, cred);
225         }
226 
227         String status="OK";
228         JMXConnector c = null;
229         try {
230             c = JMXConnectorFactory.connect(u, m);
231             AuxHelper.TryUpdateStatus(true, modifiedurl, "OK", pooled, PolicyType.STATISTICAL, AuxHelper.UNSPECIFIED, SLACommon.GetHostName());
232         } catch (Exception ex) {
233             AuxHelper.TryUpdateStatus(false, modifiedurl, ex.getLocalizedMessage(), pooled, PolicyType.STATISTICAL, AuxHelper.UNSPECIFIED, SLACommon.GetHostName());
234             //server offline
235              status = "Offline " + ex.getMessage();
236             log.log(Level.WARN, modifiedurl + "Hornet Server offline", ex);
237             ok = false;
238         }
239 
240         if (ok && c!=null) {
241             log.log(Level.INFO, "Hornet Server Online, updating statistics");
242             MBeanServerConnection mBeanServerConnection = c.getMBeanServerConnection();
243             // Integer i = mBeanServerConnection.getMBeanCount();
244             // String[] d = mBeanServerConnection.getDomains();
245             Set<ObjectName> queryNames = mBeanServerConnection.queryNames(ObjectName.WILDCARD, null);
246             //"org.apache.qpid"
247             Iterator<ObjectName> iterator = queryNames.iterator();
248             String[] queueattribs = new String[]{
249                 "Name",
250                 "DeliveringCount",
251                 "DurableMessageCount",
252                 "DurableSubscriptionCount",
253                 "MessageCount",
254                 "MessagesAdded",
255                 "NonDurableMessageCount",
256                 "NonDurableSubscriptionCount",
257                 "SubscriptionCount"
258             };
259 
260             java.sql.Connection perfcon = null;
261 
262             try {
263                 if (pooled) {
264                     perfcon = Utility.getPerformanceDBConnection();
265 
266                 } else {
267                     perfcon = Utility.getPerformanceDB_NONPOOLED_Connection();
268 
269                 }
270 
271             } catch (Exception ex) {
272                 log.log(Level.ERROR, "Cannot connect to fgsms performance db, monitoring is not possible, exiting...");
273                 return;
274             }
275 
276             AddStatisticalDataRequestMsg req = new AddStatisticalDataRequestMsg();
277             PreparedStatement ps = perfcon.prepareStatement("delete from brokerrawdata where host=?");
278             ps.setString(1, modifiedurl);
279             ps.executeUpdate();
280             Map currenttopics = new HashMap();
281             while (iterator.hasNext()) {
282 
283                 ObjectName n = iterator.next();
284                 if (n != null && n.getCanonicalName() != null) {
285                     if (n.getCanonicalName().startsWith("org.hornetq:module=Core,name=")) {
286                         {
287 
288                             // log.log(Level.INFO, "###################################################");
289                             AttributeList attributes = null;
290                             try {
291                                 attributes = mBeanServerConnection.getAttributes(new ObjectName(n.getCanonicalName()), queueattribs);
292                             } catch (Exception ex2) {
293                                 try {
294                                     mBeanServerConnection = c.getMBeanServerConnection();
295                                     attributes = mBeanServerConnection.getAttributes(new ObjectName(n.getCanonicalName()), queueattribs);
296                                 } catch (Exception ex3) {
297                                 }
298                             }
299 
300                             log.log(Level.DEBUG, "Updating: " + n.getCanonicalName());
301 
302                             String bigname = n.getCanonicalName();
303                             String ExchangeType = (bigname.contains("Type=Topic")) ? "Topic" : "Queue";
304 
305 
306                             //long DequeueCount = 0;
307                             // long DispatchCount = 0;
308                             //       long EnqueueCount = 0;
309                             long ExpiredCount = 0;
310                             //   long ProducerCount = 0;
311 /*
312                              "DeliveringCount", //mesages out
313                              "DurableMessageCount",
314                              "DurableSubscriptionCount", 
315                              "MessageCount", //messages in
316                              "MessagesAdded",            //total of messages delivered
317                              "NonDurableMessageCount",
318                              "NonDurableSubscriptionCount",  
319                              "SubscriptionCount" // total active consumers
320                              */
321 
322                             long ConsumerCount = 0;
323                             long RecievedMessageCount = 0;
324                             long MessageCount = 0;
325                             String name = "";
326                             long QueueDepth = 0;
327                             if (attributes != null) {
328                                 for (int i2 = 0; i2 < attributes.size(); i2++) {
329                                     //log.log(Level.INFO, attributes.get(i2).toString());
330                                    /*
331                                      *
332                                      * attrib Name =
333                                      * ActiveMQ.Advisory.Connection attrib
334                                      * ConsumerCount = 0 attrib DequeueCount = 0
335                                      * attrib DispatchCount = 0 messages sent
336                                      * attrib EnqueueCount = 3 attrib
337                                      * ExpiredCount = 0 attrib ProducerCount = 0
338                                      * attrib QueueSize = 0
339                                      */
340 
341                                     if (attributes.get(i2).toString().toLowerCase().startsWith("enqueuecount = ")) {
342                                         //        RecievedMessageCount = Long.parseLong(attributes.get(i2).toString().toLowerCase().replace("enqueuecount = ", "").trim());
343                                     }
344                                     if (attributes.get(i2).toString().toLowerCase().startsWith("messagesadded = ")) {
345                                         MessageCount = Long.parseLong(attributes.get(i2).toString().toLowerCase().replace("messagesadded = ", "").trim());
346                                     }
347                                     if (attributes.get(i2).toString().toLowerCase().startsWith("queuesize = ")) {
348                                         //         QueueDepth = Long.parseLong(attributes.get(i2).toString().toLowerCase().replace("queuesize = ", "").trim());
349                                     }
350                                     if (attributes.get(i2).toString().toLowerCase().startsWith("subscriptioncount = ")) {
351                                         ConsumerCount = Long.parseLong(attributes.get(i2).toString().toLowerCase().replace("subscriptioncount = ", "").trim());
352                                     }
353                                     if (attributes.get(i2).toString().toLowerCase().startsWith("name = ")) {
354                                         name = attributes.get(i2).toString().split("=")[1].trim();
355                                     }
356                                     if (attributes.get(i2).toString().toLowerCase().startsWith("expiredcount = ")) {
357                                         //        ExpiredCount = Long.parseLong(attributes.get(i2).toString().toLowerCase().replace("expiredcount = ", "").trim());
358                                     }
359                                 }
360                             }
361                             //log.log(Level.INFO, "item name = " + name + " " + bigname);
362                             if (!Utility.stringIsNullOrEmpty(name)) {
363                                 currenttopics.put(n.getCanonicalName(), QueueDepth);
364 
365                                 req.getData().add(UpdateData(modifiedurl, name, bigname, MessageCount, RecievedMessageCount, ConsumerCount, ConsumerCount,
366                                         QueueDepth, ExchangeType, 0, 0, 0, ExpiredCount, perfcon));
367                                 //    if (slarules) {
368                                 //     SLACommon.ProcessStatisticalSLARules(p, modifiedurl, name, QueueDepth, pooled);
369                                 // }
370 
371                             }
372                         }
373                     }
374 
375 
376                 }
377 
378             }
379             req.setAgentType(AGENT);
380             req.setBrokerURI(modifiedurl);
381             req.setBrokerHostname(p.getMachineName());
382             req.setDomain(p.getDomainName());
383             req.setClassification(SLACommon.GetClassLevel(pooled));
384             req.setOperationalStatus(ok);
385             req.setOperationalStatusMessage(status);
386             SLACommon.ProcessStatisticalSLARules(req, pooled);
387             c.close();
388             perfcon.close();
389         } else {
390 
391         }
392 
393 
394     }
395     static final String AGENT = "org.miloss.fgsms.agents.hornetq.jmx";
396 
397     private static BrokerData UpdateData(String url, String name, String bigname,
398             long MessageCount, long RecievedMessageCount, long ConsumerCount, long ActiveConsumerCount,
399             long QueueDepth, String ExchangeType, long bytesin, long bytesout, long bytesdrop,
400             long MessageDropCount, Connection perf) {
401         BrokerData bd = new BrokerData();
402         bd.setActiveConsumers(QueueDepth);
403         bd.setQueueOrTopicCanonicalName(bigname);
404         bd.setQueueOrTopicName(name);
405         bd.setMessagesIn(RecievedMessageCount);
406         bd.setMessagesDropped(MessageDropCount);
407         bd.setDepth(QueueDepth);
408         bd.setActiveConsumers(ActiveConsumerCount);
409         bd.setBytesDropped(bytesdrop);
410         bd.setBytesIn(bytesin);
411         bd.setBytesOut(bytesout);
412         bd.setItemType(ExchangeType);
413         bd.setMessagesOut(MessageCount);
414         bd.setTotalConsumers(ConsumerCount);
415 
416         if (!Utility.stringIsNullOrEmpty(name)) {
417             try {
418                 PreparedStatement comm = perf.prepareStatement("Delete from brokerrawdata where host=? and namecol=? and canonicalname=?; "
419                         + "INSERT INTO brokerrawdata (host, utcdatetime, namecol, canonicalname, messagecount,recievedmessagecount, consumercount, "
420                         + "activeconsumercount, queuedepth, typecol, agenttype, messagedropcount, bytesdropcount, bytesin, bytesout) "
421                         + "VALUES (?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"
422                         + "INSERT INTO brokerhistory (host, utcdatetime, namecol,canonicalname, messagecount,recievedmessagecount, consumercount, "
423                         + "activeconsumercount, queuedepth, typecol, agenttype, messagedropcount, bytesdropcount, bytesin, bytesout) "
424                         + "VALUES (?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);");
425                 comm.setString(1, url);
426                 comm.setString(2, name);
427                 comm.setString(3, bigname);
428                 comm.setString(4, url);
429                 comm.setLong(5, System.currentTimeMillis());
430                 comm.setString(6, name);
431                 comm.setString(7, bigname);
432                 comm.setLong(8, MessageCount);
433                 comm.setLong(9, RecievedMessageCount);
434                 comm.setLong(10, ConsumerCount);
435                 comm.setLong(11, ActiveConsumerCount);
436                 comm.setLong(12, QueueDepth);
437                 comm.setString(13, ExchangeType);
438                 comm.setString(14, "org.miloss.fgsms.agents.hornetq.jmx");
439                 comm.setLong(15, MessageDropCount);
440                 comm.setLong(16, bytesdrop);
441                 comm.setLong(17, bytesin);
442                 comm.setLong(18, bytesout);
443 
444                 comm.setString(19, url);
445                 comm.setLong(20, System.currentTimeMillis());
446                 comm.setString(21, name);
447                 comm.setString(22, bigname);
448                 comm.setLong(23, MessageCount);
449                 comm.setLong(24, RecievedMessageCount);
450                 comm.setLong(25, ConsumerCount);
451                 comm.setLong(26, ActiveConsumerCount);
452                 comm.setLong(27, QueueDepth);
453                 comm.setString(28, ExchangeType);
454                 comm.setString(29, "org.miloss.fgsms.agents.hornetq.jmx");
455                 comm.setLong(30, MessageDropCount);
456                 comm.setLong(31, bytesdrop);
457                 comm.setLong(32, bytesin);
458                 comm.setLong(33, bytesout);
459                 comm.execute();
460                 comm.close();
461             } catch (SQLException ex) {
462                 log.log(Level.WARN, "error saving broker data", ex);
463             }
464 
465         }
466 
467         return bd;
468 
469     }
470 }