1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.miloss.fgsms.agents;
23
24 import java.io.File;
25 import java.io.RandomAccessFile;
26 import java.nio.channels.FileChannel;
27 import java.nio.channels.FileLock;
28 import java.sql.Connection;
29 import java.sql.PreparedStatement;
30 import java.sql.SQLException;
31 import java.util.*;
32 import javax.management.AttributeList;
33 import javax.management.MBeanServerConnection;
34 import javax.management.ObjectName;
35 import javax.management.remote.JMXConnector;
36 import javax.management.remote.JMXConnectorFactory;
37 import javax.management.remote.JMXServiceURL;
38 import org.miloss.fgsms.common.DBSettingsLoader;
39 import org.miloss.fgsms.sla.AuxHelper;
40 import org.miloss.fgsms.common.IpAddressUtility;
41 import org.miloss.fgsms.common.PropertyLoader;
42 import org.miloss.fgsms.common.Utility;
43 import org.miloss.fgsms.services.interfaces.common.PolicyType;
44 import org.miloss.fgsms.services.interfaces.policyconfiguration.ServicePolicy;
45 import org.miloss.fgsms.sla.SLACommon;
46 import org.apache.log4j.Level;
47 import org.miloss.fgsms.common.Logger;;
48 import org.apache.log4j.PropertyConfigurator;
49 import org.miloss.fgsms.services.interfaces.datacollector.AddStatisticalDataRequestMsg;
50 import org.miloss.fgsms.services.interfaces.datacollector.BrokerData;
51
52
53
54
55
56
57
58
59
60 public class HornetqJMSAgent {
61
62 static Logger log = Logger.getLogger("fgsms.HornetqJMX");
63 private boolean running = true;
64
65 private boolean done = false;
66
67 public class RunWhenShuttingDown extends Thread {
68
69 public void run() {
70 System.out.println("Control-C caught. Shutting down...");
71 running = false;
72
73 while (!done) {
74 try {
75 Thread.sleep(1000);
76 } catch (InterruptedException ex) {
77 }
78 }
79 closeLock();
80 deleteFile();
81 if (urls != null) {
82 for (int i = 0; i < urls.length; i++) {
83 AuxHelper.TryUpdateStatus(false, IpAddressUtility.modifyURL(urls[i], true), "Agent Stopped", false, PolicyType.STATISTICAL, AuxHelper.UNSPECIFIED, SLACommon.GetHostName());
84 }
85 }
86 }
87 }
88 private File file;
89 private FileChannel channel;
90 private FileLock lock;
91 private String[] urls = null;
92
93 private void closeLock() {
94 try {
95 lock.release();
96 } catch (Exception e) {
97 }
98 try {
99 channel.close();
100 } catch (Exception e) {
101 }
102 }
103
104 private void deleteFile() {
105 try {
106 file.delete();
107 } catch (Exception e) {
108 }
109 }
110
111
112
113
114
115 public static void main(String[] args) {
116 new HornetqJMSAgent().Main(args);
117
118 }
119
120 private void Main(String[] args) {
121
122 try {
123 file = new File("Hornetq.lck");
124 channel = new RandomAccessFile(file, "rw").getChannel();
125 lock = channel.tryLock();
126 } catch (Exception e) {
127
128 closeLock();
129 System.out.println("Could not obtain the lock, this means that either this program is already running or something went wrong and the file Hornetq.lck needs to be deleted.");
130 return;
131 }
132 if (lock == null) {
133 closeLock();
134 System.out.println("Could not obtain the lock, this means that either this program is already running or something went wrong and the file Hornetq.lck needs to be deleted.");
135 return;
136 }
137
138
139 Runtime.getRuntime().addShutdownHook(new RunWhenShuttingDown());
140 PropertyConfigurator.configure("log4j.properties");
141 long interval = 10000;
142 if (args.length == 1) {
143 try {
144 interval = Long.parseLong(args[0]);
145 if (interval < 10000) {
146 interval = 10000;
147 }
148 } catch (Exception ex) {
149 }
150 }
151 log.log(Level.INFO, "fgsms hornetq JMX Agent startup...");
152 Properties prop = PropertyLoader.loadProperties("org.miloss.fgsms.agents.connection");
153
154 String temp = (String) prop.get("JMXServiceURL");
155 if (temp.contains("|")) {
156 urls = temp.split("\\|");
157 } else {
158 urls = new String[1];
159 urls[0] = temp;
160 }
161
162 if ((urls == null || urls.length == 0)) {
163 log.log(Level.FATAL, "no JMX url has been defined.");
164 return;
165 }
166
167 long lastranat = 0;
168 while (running) {
169 if (lastranat < System.currentTimeMillis() - interval) {
170 try {
171 lastranat = System.currentTimeMillis();
172
173 for (int aa = 0; aa < urls.length; aa++) {
174 String originalurl = urls[aa];
175 String url = IpAddressUtility.modifyURL(originalurl, true);
176 try {
177 Connection con = Utility.getConfigurationDB_NONPOOLED_Connection();
178 AuxHelper.CheckStatisticalPolicyAndCreate(url, con, false, AuxHelper.UNSPECIFIED, SLACommon.GetHostName());
179 con.close();
180 } catch (Exception ex) {
181 }
182 Fire(false, originalurl, url);
183 }
184 } catch (Exception ex) {
185 log.log(Level.ERROR, null, ex);
186 }
187 log.log(Level.INFO, "Sleeping " + interval + "ms until next interation....");
188 }
189 if (running) {
190 try {
191 Thread.sleep(1000);
192 } catch (InterruptedException ex) {
193 log.log(Level.ERROR, null, ex);
194 }
195 }
196 done = true;
197 }
198 }
199
200 public static void Fire(final boolean pooled, final String url, final String modifiedurl) throws Exception {
201
202 ServicePolicy p = null;
203 if (pooled) {
204 p = SLACommon.LoadPolicyPooled(modifiedurl);
205 } else {
206 p = SLACommon.LoadPolicyNotPooled(modifiedurl);
207 }
208
209
210
211
212
213
214
215 JMXServiceURL u = new JMXServiceURL(url);
216 Map m = new HashMap();
217
218
219 boolean ok = true;
220 String[] data = DBSettingsLoader.GetCredentials(pooled, modifiedurl);
221 if (data != null) {
222 String[] cred = new String[]{data[0], Utility.DE(data[1])
223 };
224 m.put(JMXConnector.CREDENTIALS, cred);
225 }
226
227 String status="OK";
228 JMXConnector c = null;
229 try {
230 c = JMXConnectorFactory.connect(u, m);
231 AuxHelper.TryUpdateStatus(true, modifiedurl, "OK", pooled, PolicyType.STATISTICAL, AuxHelper.UNSPECIFIED, SLACommon.GetHostName());
232 } catch (Exception ex) {
233 AuxHelper.TryUpdateStatus(false, modifiedurl, ex.getLocalizedMessage(), pooled, PolicyType.STATISTICAL, AuxHelper.UNSPECIFIED, SLACommon.GetHostName());
234
235 status = "Offline " + ex.getMessage();
236 log.log(Level.WARN, modifiedurl + "Hornet Server offline", ex);
237 ok = false;
238 }
239
240 if (ok && c!=null) {
241 log.log(Level.INFO, "Hornet Server Online, updating statistics");
242 MBeanServerConnection mBeanServerConnection = c.getMBeanServerConnection();
243
244
245 Set<ObjectName> queryNames = mBeanServerConnection.queryNames(ObjectName.WILDCARD, null);
246
247 Iterator<ObjectName> iterator = queryNames.iterator();
248 String[] queueattribs = new String[]{
249 "Name",
250 "DeliveringCount",
251 "DurableMessageCount",
252 "DurableSubscriptionCount",
253 "MessageCount",
254 "MessagesAdded",
255 "NonDurableMessageCount",
256 "NonDurableSubscriptionCount",
257 "SubscriptionCount"
258 };
259
260 java.sql.Connection perfcon = null;
261
262 try {
263 if (pooled) {
264 perfcon = Utility.getPerformanceDBConnection();
265
266 } else {
267 perfcon = Utility.getPerformanceDB_NONPOOLED_Connection();
268
269 }
270
271 } catch (Exception ex) {
272 log.log(Level.ERROR, "Cannot connect to fgsms performance db, monitoring is not possible, exiting...");
273 return;
274 }
275
276 AddStatisticalDataRequestMsg req = new AddStatisticalDataRequestMsg();
277 PreparedStatement ps = perfcon.prepareStatement("delete from brokerrawdata where host=?");
278 ps.setString(1, modifiedurl);
279 ps.executeUpdate();
280 Map currenttopics = new HashMap();
281 while (iterator.hasNext()) {
282
283 ObjectName n = iterator.next();
284 if (n != null && n.getCanonicalName() != null) {
285 if (n.getCanonicalName().startsWith("org.hornetq:module=Core,name=")) {
286 {
287
288
289 AttributeList attributes = null;
290 try {
291 attributes = mBeanServerConnection.getAttributes(new ObjectName(n.getCanonicalName()), queueattribs);
292 } catch (Exception ex2) {
293 try {
294 mBeanServerConnection = c.getMBeanServerConnection();
295 attributes = mBeanServerConnection.getAttributes(new ObjectName(n.getCanonicalName()), queueattribs);
296 } catch (Exception ex3) {
297 }
298 }
299
300 log.log(Level.DEBUG, "Updating: " + n.getCanonicalName());
301
302 String bigname = n.getCanonicalName();
303 String ExchangeType = (bigname.contains("Type=Topic")) ? "Topic" : "Queue";
304
305
306
307
308
309 long ExpiredCount = 0;
310
311
312
313
314
315
316
317
318
319
320
321
322 long ConsumerCount = 0;
323 long RecievedMessageCount = 0;
324 long MessageCount = 0;
325 String name = "";
326 long QueueDepth = 0;
327 if (attributes != null) {
328 for (int i2 = 0; i2 < attributes.size(); i2++) {
329
330
331
332
333
334
335
336
337
338
339
340
341 if (attributes.get(i2).toString().toLowerCase().startsWith("enqueuecount = ")) {
342
343 }
344 if (attributes.get(i2).toString().toLowerCase().startsWith("messagesadded = ")) {
345 MessageCount = Long.parseLong(attributes.get(i2).toString().toLowerCase().replace("messagesadded = ", "").trim());
346 }
347 if (attributes.get(i2).toString().toLowerCase().startsWith("queuesize = ")) {
348
349 }
350 if (attributes.get(i2).toString().toLowerCase().startsWith("subscriptioncount = ")) {
351 ConsumerCount = Long.parseLong(attributes.get(i2).toString().toLowerCase().replace("subscriptioncount = ", "").trim());
352 }
353 if (attributes.get(i2).toString().toLowerCase().startsWith("name = ")) {
354 name = attributes.get(i2).toString().split("=")[1].trim();
355 }
356 if (attributes.get(i2).toString().toLowerCase().startsWith("expiredcount = ")) {
357
358 }
359 }
360 }
361
362 if (!Utility.stringIsNullOrEmpty(name)) {
363 currenttopics.put(n.getCanonicalName(), QueueDepth);
364
365 req.getData().add(UpdateData(modifiedurl, name, bigname, MessageCount, RecievedMessageCount, ConsumerCount, ConsumerCount,
366 QueueDepth, ExchangeType, 0, 0, 0, ExpiredCount, perfcon));
367
368
369
370
371 }
372 }
373 }
374
375
376 }
377
378 }
379 req.setAgentType(AGENT);
380 req.setBrokerURI(modifiedurl);
381 req.setBrokerHostname(p.getMachineName());
382 req.setDomain(p.getDomainName());
383 req.setClassification(SLACommon.GetClassLevel(pooled));
384 req.setOperationalStatus(ok);
385 req.setOperationalStatusMessage(status);
386 SLACommon.ProcessStatisticalSLARules(req, pooled);
387 c.close();
388 perfcon.close();
389 } else {
390
391 }
392
393
394 }
395 static final String AGENT = "org.miloss.fgsms.agents.hornetq.jmx";
396
397 private static BrokerData UpdateData(String url, String name, String bigname,
398 long MessageCount, long RecievedMessageCount, long ConsumerCount, long ActiveConsumerCount,
399 long QueueDepth, String ExchangeType, long bytesin, long bytesout, long bytesdrop,
400 long MessageDropCount, Connection perf) {
401 BrokerData bd = new BrokerData();
402 bd.setActiveConsumers(QueueDepth);
403 bd.setQueueOrTopicCanonicalName(bigname);
404 bd.setQueueOrTopicName(name);
405 bd.setMessagesIn(RecievedMessageCount);
406 bd.setMessagesDropped(MessageDropCount);
407 bd.setDepth(QueueDepth);
408 bd.setActiveConsumers(ActiveConsumerCount);
409 bd.setBytesDropped(bytesdrop);
410 bd.setBytesIn(bytesin);
411 bd.setBytesOut(bytesout);
412 bd.setItemType(ExchangeType);
413 bd.setMessagesOut(MessageCount);
414 bd.setTotalConsumers(ConsumerCount);
415
416 if (!Utility.stringIsNullOrEmpty(name)) {
417 try {
418 PreparedStatement comm = perf.prepareStatement("Delete from brokerrawdata where host=? and namecol=? and canonicalname=?; "
419 + "INSERT INTO brokerrawdata (host, utcdatetime, namecol, canonicalname, messagecount,recievedmessagecount, consumercount, "
420 + "activeconsumercount, queuedepth, typecol, agenttype, messagedropcount, bytesdropcount, bytesin, bytesout) "
421 + "VALUES (?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"
422 + "INSERT INTO brokerhistory (host, utcdatetime, namecol,canonicalname, messagecount,recievedmessagecount, consumercount, "
423 + "activeconsumercount, queuedepth, typecol, agenttype, messagedropcount, bytesdropcount, bytesin, bytesout) "
424 + "VALUES (?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);");
425 comm.setString(1, url);
426 comm.setString(2, name);
427 comm.setString(3, bigname);
428 comm.setString(4, url);
429 comm.setLong(5, System.currentTimeMillis());
430 comm.setString(6, name);
431 comm.setString(7, bigname);
432 comm.setLong(8, MessageCount);
433 comm.setLong(9, RecievedMessageCount);
434 comm.setLong(10, ConsumerCount);
435 comm.setLong(11, ActiveConsumerCount);
436 comm.setLong(12, QueueDepth);
437 comm.setString(13, ExchangeType);
438 comm.setString(14, "org.miloss.fgsms.agents.hornetq.jmx");
439 comm.setLong(15, MessageDropCount);
440 comm.setLong(16, bytesdrop);
441 comm.setLong(17, bytesin);
442 comm.setLong(18, bytesout);
443
444 comm.setString(19, url);
445 comm.setLong(20, System.currentTimeMillis());
446 comm.setString(21, name);
447 comm.setString(22, bigname);
448 comm.setLong(23, MessageCount);
449 comm.setLong(24, RecievedMessageCount);
450 comm.setLong(25, ConsumerCount);
451 comm.setLong(26, ActiveConsumerCount);
452 comm.setLong(27, QueueDepth);
453 comm.setString(28, ExchangeType);
454 comm.setString(29, "org.miloss.fgsms.agents.hornetq.jmx");
455 comm.setLong(30, MessageDropCount);
456 comm.setLong(31, bytesdrop);
457 comm.setLong(32, bytesin);
458 comm.setLong(33, bytesout);
459 comm.execute();
460 comm.close();
461 } catch (SQLException ex) {
462 log.log(Level.WARN, "error saving broker data", ex);
463 }
464
465 }
466
467 return bd;
468
469 }
470 }