View Javadoc
1   /**
2    * This Source Code Form is subject to the terms of the Mozilla Public
3    * License, v. 2.0. If a copy of the MPL was not distributed with this
4    * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5    *
6    * If it is not possible or desirable to put the notice in a particular
7    * file, then You may include the notice in a location (such as a LICENSE
8    * file in a relevant directory) where a recipient would be likely to look
9    * for such a notice.
10   *
11   * 
12   */
13  /*  ---------------------------------------------------------------------------
14   *  U.S. Government, Department of the Army
15   *  Army Materiel Command
16   *  Research Development Engineering Command
17   *  Communications Electronics Research Development and Engineering Center
18   *  ---------------------------------------------------------------------------
19   */
20  package org.miloss.fgsms.agents.qpidjmx;
21  
22  import java.io.File;
23  import java.io.RandomAccessFile;
24  import java.nio.channels.FileChannel;
25  import java.nio.channels.FileLock;
26  import java.sql.Connection;
27  import java.sql.PreparedStatement;
28  import java.sql.SQLException;
29  import java.util.*;
30  import javax.management.AttributeList;
31  import javax.management.MBeanServerConnection;
32  import javax.management.ObjectName;
33  import javax.management.remote.JMXConnector;
34  import javax.management.remote.JMXConnectorFactory;
35  import javax.management.remote.JMXServiceURL;
36  import org.miloss.fgsms.common.DBSettingsLoader;
37  import org.miloss.fgsms.sla.AuxHelper;
38  import org.miloss.fgsms.common.IpAddressUtility;
39  import org.miloss.fgsms.common.PropertyLoader;
40  import org.miloss.fgsms.common.Utility;
41  import org.miloss.fgsms.services.interfaces.common.PolicyType;
42  import org.miloss.fgsms.services.interfaces.policyconfiguration.ServicePolicy;
43  import org.miloss.fgsms.sla.SLACommon;
44  import org.apache.log4j.Level;
45  import org.miloss.fgsms.common.Logger;;
46  import org.apache.log4j.PropertyConfigurator;
47  import org.miloss.fgsms.services.interfaces.datacollector.AddStatisticalDataRequestMsg;
48  import org.miloss.fgsms.services.interfaces.datacollector.BrokerData;
49  
50  /**
51   * fgsms monitor for Qpid/MRG AMQP message brokers via JMX only
52   *
53   * @author AO fgsms monitor for Qpid/MRG AMQP message brokers via JMX only. This
54   * means this can only be used on java based implementations and runs on the
55   * fgsms server
56   */
57  public class QpidJMXAgent {
58  
59      static Logger log = Logger.getLogger("fgsms.QpidJMX");
60      private boolean running = true;
61      //~ Inner Classes -----------------------------------------------------------------------------
62      private boolean done = false;
63      private File file;
64      private FileChannel channel;
65      private FileLock lock;
66      private String[] urls = null;
67      static final String AGENT = "org.miloss.fgsms.agents.amqp.jmx";
68  
69       /**
70       * @param args the command line arguments
71       */
72      public static void main(String[] args) {
73          new QpidJMXAgent().Main(args);
74  
75      }
76      
77      public class RunWhenShuttingDown extends Thread {
78  
79          public void run() {
80              System.out.println("Control-C caught. Shutting down...");
81              running = false;
82  
83              while (!done) {
84                  try {
85                      Thread.sleep(1000);
86                  } catch (InterruptedException ex) {
87                  }
88              }
89              closeLock();
90              deleteFile();
91              if (urls != null) {
92                  for (int i = 0; i < urls.length; i++) {
93                      AuxHelper.TryUpdateStatus(false, IpAddressUtility.modifyURL(urls[i], true), "Agent Stopped", false, PolicyType.STATISTICAL, AuxHelper.UNSPECIFIED, SLACommon.GetHostName());
94                  }
95              }
96          }
97      }
98  
99      private void closeLock() {
100         try {
101             lock.release();
102         } catch (Exception e) {
103         }
104         try {
105             channel.close();
106         } catch (Exception e) {
107         }
108     }
109 
110     private void deleteFile() {
111         try {
112             file.delete();
113         } catch (Exception e) {
114         }
115     }
116 
117     private void Main(String[] args) {
118 
119         try {
120             file = new File("pyjmx.lck");
121             channel = new RandomAccessFile(file, "rw").getChannel();
122             lock = channel.tryLock();
123         } catch (Exception e) {
124             // already locked
125             closeLock();
126             System.out.println("Could not obtain the lock, this means that either this program is already running or something went wrong and the file pyjmx.lck needs to be deleted.");
127             return;
128         }
129         if (lock == null) {
130             closeLock();
131             System.out.println("Could not obtain the lock, this means that either this program is already running or something went wrong and the file pyjmx.lck needs to be deleted.");
132             return;
133         }
134 
135         Runtime.getRuntime().addShutdownHook(new RunWhenShuttingDown());
136         PropertyConfigurator.configure("log4j.properties");
137         long interval = 10000;
138         if (args.length == 1) {
139             try {
140                 interval = Long.parseLong(args[0]);
141                 if (interval < 10000) {
142                     interval = 10000;
143                 }
144             } catch (Exception ex) {
145             }
146         }
147         log.log(Level.INFO, "fgsms Qpid JMX Agent startup...");
148         Properties prop = PropertyLoader.loadProperties("fgsmsqpidjmx/connection");
149 
150         String temp = (String) prop.get("JMXServiceURL");
151         if (temp.contains("|")) {
152             urls = temp.split("\\|");
153         } else {
154             urls = new String[1];
155             urls[0] = temp;
156         }
157 
158         if ((urls == null || urls.length == 0)) {
159             log.log(Level.FATAL, "no JMX url has been defined.");
160             return;
161         }
162 
163         long lastranat = 0;
164         while (running) {
165             if (lastranat < System.currentTimeMillis() - interval) {
166                 try {
167                     lastranat = System.currentTimeMillis();
168 
169                     for (int aa = 0; aa < urls.length; aa++) {
170                         String originalurl = urls[aa];
171                         String url = IpAddressUtility.modifyURL(originalurl, true);
172                         try {
173                             Connection con = Utility.getConfigurationDB_NONPOOLED_Connection();
174                             AuxHelper.CheckStatisticalPolicyAndCreate(url, con, false, AuxHelper.UNSPECIFIED, SLACommon.GetHostName());
175                             con.close();
176                         } catch (Exception ex) {
177                         }
178                         Fire(false, originalurl, url);
179                     }
180                 } catch (Exception ex) {
181                     log.log(Level.ERROR, null, ex);
182                 }
183                 log.log(Level.INFO, "Sleeping " + interval + "ms until next interation....");
184             }
185             if (running) {
186                 try {
187                     Thread.sleep(1000);
188                 } catch (InterruptedException ex) {
189                     log.log(Level.ERROR, null, ex);
190                 }
191             }
192             done = true;
193         }
194     }
195 
196     public static void Fire(final boolean pooled, final String url, final String modifiedurl) throws Exception {
197 
198         ServicePolicy p = null;
199         if (pooled) {
200             p = SLACommon.LoadPolicyPooled(modifiedurl);
201         } else {
202             p = SLACommon.LoadPolicyNotPooled(modifiedurl);
203         }
204 
205         JMXServiceURL u = new JMXServiceURL(url);
206         Map m = new HashMap();
207         boolean ok = true;
208         String[] data = DBSettingsLoader.GetCredentials(pooled, modifiedurl);
209         if (data != null) {
210             String[] cred = new String[]{data[0], Utility.DE(data[1])};
211             m.put(JMXConnector.CREDENTIALS, cred);
212         }
213         String status = "OK";
214         JMXConnector c = null;
215         try {
216             c = JMXConnectorFactory.connect(u, m);
217             AuxHelper.TryUpdateStatus(true, modifiedurl, status, pooled, PolicyType.STATISTICAL, AuxHelper.UNSPECIFIED, SLACommon.GetHostName());
218         } catch (Exception ex) {
219             AuxHelper.TryUpdateStatus(false, modifiedurl, ex.getLocalizedMessage(), pooled, PolicyType.STATISTICAL, AuxHelper.UNSPECIFIED, SLACommon.GetHostName());
220             //server offline
221             status = "Offline " + ex.getMessage();
222             log.log(Level.WARN, "AMQP QPID Server offline " + url);
223             ok = false;
224         }
225 
226         if (ok && c!=null) {
227             log.log(Level.INFO, "AMQP QPID Server Online, updating statistics" + url);
228             MBeanServerConnection mBeanServerConnection = c.getMBeanServerConnection();
229             // Integer i = mBeanServerConnection.getMBeanCount();
230             // String[] d = mBeanServerConnection.getDomains();
231             Set<ObjectName> queryNames = mBeanServerConnection.queryNames(ObjectName.WILDCARD, null);
232             //"org.apache.qpid"
233             Iterator<ObjectName> iterator = queryNames.iterator();
234             String[] queueattribs = new String[]{"QueueDepth",
235                 "Name",
236                 "MessageCount",
237                 "ReceivedMessageCount",
238                 "ActiveConsumerCount",
239                 //   "AutoDelete",
240                 //     "Capacity",
241                 "ConsumerCount",
242                 //    "Durable",
243                 //   "Exclusive",
244                 //  "FlowOverfull",
245                 // "FlowResumeCapacity",
246                 // "MaximumMessageAge",
247                 //"MaximumMessageCount",
248                 //"MaximumMessageSize",
249                 //"MaximumQueueDepth",
250                 "ExchangeType"//,
251             //"Description"
252             };
253 
254             java.sql.Connection perfcon = null;
255 
256             try {
257                 if (pooled) {
258                     perfcon = Utility.getPerformanceDBConnection();
259 
260                 } else {
261                     perfcon = Utility.getPerformanceDB_NONPOOLED_Connection();
262 
263                 }
264 
265             } catch (Exception ex) {
266                 log.log(Level.ERROR, "Cannot connect to fgsms performance db, monitoring is not possible, exiting...");
267                 return;
268             }
269 
270             PreparedStatement ps = perfcon.prepareStatement("delete from brokerrawdata where host=?");
271             ps.setString(1, modifiedurl);
272             ps.executeUpdate();
273             ps.close();
274             AddStatisticalDataRequestMsg req = new AddStatisticalDataRequestMsg();
275             Map currenttopics = new HashMap();
276             while (iterator.hasNext()) {
277 
278                 ObjectName n = iterator.next();
279                 if (n != null && n.getCanonicalName() != null) {
280                     if (n.getCanonicalName().startsWith("org.apache.qpid")) {
281                         {
282                             // log.log(Level.INFO, "###################################################");
283                             AttributeList attributes = null;
284                             try {
285                                 attributes = mBeanServerConnection.getAttributes(new ObjectName(n.getCanonicalName()), queueattribs);
286                             } catch (Exception ex2) {
287                                 try {
288                                     mBeanServerConnection = c.getMBeanServerConnection();
289                                     attributes = mBeanServerConnection.getAttributes(new ObjectName(n.getCanonicalName()), queueattribs);
290                                 } catch (Exception ex3) {
291                                 }
292                             }
293 
294                             log.log(Level.DEBUG, "Updating: " + n.getCanonicalName());
295 
296                             String bigname = n.getCanonicalName();
297                             String ExchangeType = "queue";
298                             long ConsumerCount = 0;
299                             long ActiveConsumerCount = 0;
300                             long RecievedMessageCount = 0;
301                             long MessageCount = 0;
302                             String name = "";
303                             long QueueDepth = 0;
304                             if (attributes != null) {
305                                 for (int i2 = 0; i2 < attributes.size(); i2++) {
306                                     //log.log(Level.INFO, attributes.get(i2).toString());
307                                     if (attributes.get(i2).toString().toLowerCase().startsWith("activeconsumercount = ")) {
308                                         ActiveConsumerCount = Long.parseLong(attributes.get(i2).toString().toLowerCase().replace("activeconsumercount = ", "").trim());
309                                     }
310                                     if (attributes.get(i2).toString().toLowerCase().startsWith("receivedmessagecount = ")) {
311                                         RecievedMessageCount = Long.parseLong(attributes.get(i2).toString().toLowerCase().replace("receivedmessagecount = ", "").trim());
312                                     }
313                                     if (attributes.get(i2).toString().toLowerCase().startsWith("messagecount = ")) {
314                                         MessageCount = Long.parseLong(attributes.get(i2).toString().toLowerCase().replace("messagecount = ", "").trim());
315                                     }
316                                     if (attributes.get(i2).toString().toLowerCase().startsWith("queuedepth = ")) {
317                                         QueueDepth = Long.parseLong(attributes.get(i2).toString().toLowerCase().replace("queuedepth = ", "").trim());
318                                     }
319                                     if (attributes.get(i2).toString().toLowerCase().startsWith("consumercount = ")) {
320                                         ConsumerCount = Long.parseLong(attributes.get(i2).toString().toLowerCase().replace("consumercount = ", "").trim());
321                                     }
322                                     if (attributes.get(i2).toString().toLowerCase().startsWith("name = ")) {
323                                         name = attributes.get(i2).toString().split("=")[1].trim();
324                                     }
325                                     if (attributes.get(i2).toString().toLowerCase().startsWith("exchangetype = ")) {
326                                         ExchangeType = attributes.get(i2).toString().split("=")[1].trim();
327                                     }
328                                 }
329                             }
330                             //log.log(Level.INFO, "item name = " + name + " " + bigname);
331                             if (!Utility.stringIsNullOrEmpty(name)) {
332                                 currenttopics.put(n.getCanonicalName(), QueueDepth);
333 
334                                 req.getData().add(updateData(modifiedurl, name, bigname, MessageCount, RecievedMessageCount, ConsumerCount, ActiveConsumerCount,
335                                         QueueDepth, ExchangeType, 0, 0, 0, 0, perfcon));
336                                 //  if (slarules) {
337                                 //       SLACommon.ProcessStatisticalSLARules(p, modifiedurl, name, QueueDepth, pooled);
338                                 //   }
339 
340                             }
341                         }
342                     }
343 
344                 }
345 
346             }
347 
348             // SLACommon.ProcessStatisticalSLARules((modifiedurl, currenttopics, pooled);
349             req.setAgentType(AGENT);
350             req.setBrokerURI(modifiedurl);
351             req.setBrokerHostname(p.getMachineName());
352             req.setDomain(p.getDomainName());
353             req.setClassification(SLACommon.GetClassLevel(pooled));
354             req.setOperationalStatus(ok);
355             req.setOperationalStatusMessage(status);
356             SLACommon.ProcessStatisticalSLARules(req, pooled);
357             c.close();
358             perfcon.close();
359         }
360 
361     }
362 
363     private static BrokerData updateData(final String url, final String name, final String bigname,
364             final  long MessageCount,  final long RecievedMessageCount,  final long ConsumerCount,  final long ActiveConsumerCount,
365             final  long QueueDepth,  final String ExchangeType,  final long bytesin, final  long bytesout, final  long bytesdrop,
366             final  long MessageDropCount, final  Connection perf) {
367         BrokerData bd = new BrokerData();
368         bd.setActiveConsumers(QueueDepth);
369         bd.setQueueOrTopicCanonicalName(bigname);
370         bd.setQueueOrTopicName(name);
371         bd.setMessagesIn(RecievedMessageCount);
372         bd.setMessagesDropped(MessageDropCount);
373         bd.setDepth(QueueDepth);
374         bd.setActiveConsumers(ActiveConsumerCount);
375         bd.setBytesDropped(bytesdrop);
376         bd.setBytesIn(bytesin);
377         bd.setBytesOut(bytesout);
378         bd.setItemType(ExchangeType);
379         bd.setMessagesOut(MessageCount);
380         bd.setTotalConsumers(ConsumerCount);
381         if (!Utility.stringIsNullOrEmpty(name)) {
382             try {
383                 PreparedStatement comm = perf.prepareStatement("Delete from brokerrawdata where host=? and namecol=? and canonicalname=?; "
384                         + "INSERT INTO brokerrawdata (host, utcdatetime, namecol, canonicalname, messagecount,recievedmessagecount, consumercount, "
385                         + "activeconsumercount, queuedepth, typecol, agenttype, messagedropcount, bytesdropcount, bytesin, bytesout) "
386                         + "VALUES (?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"
387                         + "INSERT INTO brokerhistory (host, utcdatetime, namecol,canonicalname, messagecount,recievedmessagecount, consumercount, "
388                         + "activeconsumercount, queuedepth, typecol, agenttype, messagedropcount, bytesdropcount, bytesin, bytesout) "
389                         + "VALUES (?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);");
390                 comm.setString(1, url);
391                 comm.setString(2, name);
392                 comm.setString(3, bigname);
393                 comm.setString(4, url);
394                 comm.setLong(5, System.currentTimeMillis());
395                 comm.setString(6, name);
396                 comm.setString(7, bigname);
397                 comm.setLong(8, MessageCount);
398                 comm.setLong(9, RecievedMessageCount);
399                 comm.setLong(10, ConsumerCount);
400                 comm.setLong(11, ActiveConsumerCount);
401                 comm.setLong(12, QueueDepth);
402                 comm.setString(13, ExchangeType);
403                 comm.setString(14, AGENT);
404                 comm.setLong(15, MessageDropCount);
405                 comm.setLong(16, bytesdrop);
406                 comm.setLong(17, bytesin);
407                 comm.setLong(18, bytesout);
408 
409                 comm.setString(19, url);
410                 comm.setLong(20, System.currentTimeMillis());
411                 comm.setString(21, name);
412                 comm.setString(22, bigname);
413                 comm.setLong(23, MessageCount);
414                 comm.setLong(24, RecievedMessageCount);
415                 comm.setLong(25, ConsumerCount);
416                 comm.setLong(26, ActiveConsumerCount);
417                 comm.setLong(27, QueueDepth);
418                 comm.setString(28, ExchangeType);
419                 comm.setString(29, AGENT);
420                 comm.setLong(30, MessageDropCount);
421                 comm.setLong(31, bytesdrop);
422                 comm.setLong(32, bytesin);
423                 comm.setLong(33, bytesout);
424                 comm.execute();
425                 comm.close();
426             } catch (SQLException ex) {
427                 log.log(Level.WARN, "error saving broker data", ex);
428             }
429 
430         }
431 
432         return bd;
433     }
434 
435    
436 }