1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.miloss.fgsms.agents.qpidjmx;
21
22 import java.io.File;
23 import java.io.RandomAccessFile;
24 import java.nio.channels.FileChannel;
25 import java.nio.channels.FileLock;
26 import java.sql.Connection;
27 import java.sql.PreparedStatement;
28 import java.sql.SQLException;
29 import java.util.*;
30 import javax.management.AttributeList;
31 import javax.management.MBeanServerConnection;
32 import javax.management.ObjectName;
33 import javax.management.remote.JMXConnector;
34 import javax.management.remote.JMXConnectorFactory;
35 import javax.management.remote.JMXServiceURL;
36 import org.miloss.fgsms.common.DBSettingsLoader;
37 import org.miloss.fgsms.sla.AuxHelper;
38 import org.miloss.fgsms.common.IpAddressUtility;
39 import org.miloss.fgsms.common.PropertyLoader;
40 import org.miloss.fgsms.common.Utility;
41 import org.miloss.fgsms.services.interfaces.common.PolicyType;
42 import org.miloss.fgsms.services.interfaces.policyconfiguration.ServicePolicy;
43 import org.miloss.fgsms.sla.SLACommon;
44 import org.apache.log4j.Level;
45 import org.miloss.fgsms.common.Logger;;
46 import org.apache.log4j.PropertyConfigurator;
47 import org.miloss.fgsms.services.interfaces.datacollector.AddStatisticalDataRequestMsg;
48 import org.miloss.fgsms.services.interfaces.datacollector.BrokerData;
49
50
51
52
53
54
55
56
57 public class QpidJMXAgent {
58
59 static Logger log = Logger.getLogger("fgsms.QpidJMX");
60 private boolean running = true;
61
62 private boolean done = false;
63 private File file;
64 private FileChannel channel;
65 private FileLock lock;
66 private String[] urls = null;
67 static final String AGENT = "org.miloss.fgsms.agents.amqp.jmx";
68
69
70
71
72 public static void main(String[] args) {
73 new QpidJMXAgent().Main(args);
74
75 }
76
77 public class RunWhenShuttingDown extends Thread {
78
79 public void run() {
80 System.out.println("Control-C caught. Shutting down...");
81 running = false;
82
83 while (!done) {
84 try {
85 Thread.sleep(1000);
86 } catch (InterruptedException ex) {
87 }
88 }
89 closeLock();
90 deleteFile();
91 if (urls != null) {
92 for (int i = 0; i < urls.length; i++) {
93 AuxHelper.TryUpdateStatus(false, IpAddressUtility.modifyURL(urls[i], true), "Agent Stopped", false, PolicyType.STATISTICAL, AuxHelper.UNSPECIFIED, SLACommon.GetHostName());
94 }
95 }
96 }
97 }
98
99 private void closeLock() {
100 try {
101 lock.release();
102 } catch (Exception e) {
103 }
104 try {
105 channel.close();
106 } catch (Exception e) {
107 }
108 }
109
110 private void deleteFile() {
111 try {
112 file.delete();
113 } catch (Exception e) {
114 }
115 }
116
117 private void Main(String[] args) {
118
119 try {
120 file = new File("pyjmx.lck");
121 channel = new RandomAccessFile(file, "rw").getChannel();
122 lock = channel.tryLock();
123 } catch (Exception e) {
124
125 closeLock();
126 System.out.println("Could not obtain the lock, this means that either this program is already running or something went wrong and the file pyjmx.lck needs to be deleted.");
127 return;
128 }
129 if (lock == null) {
130 closeLock();
131 System.out.println("Could not obtain the lock, this means that either this program is already running or something went wrong and the file pyjmx.lck needs to be deleted.");
132 return;
133 }
134
135 Runtime.getRuntime().addShutdownHook(new RunWhenShuttingDown());
136 PropertyConfigurator.configure("log4j.properties");
137 long interval = 10000;
138 if (args.length == 1) {
139 try {
140 interval = Long.parseLong(args[0]);
141 if (interval < 10000) {
142 interval = 10000;
143 }
144 } catch (Exception ex) {
145 }
146 }
147 log.log(Level.INFO, "fgsms Qpid JMX Agent startup...");
148 Properties prop = PropertyLoader.loadProperties("fgsmsqpidjmx/connection");
149
150 String temp = (String) prop.get("JMXServiceURL");
151 if (temp.contains("|")) {
152 urls = temp.split("\\|");
153 } else {
154 urls = new String[1];
155 urls[0] = temp;
156 }
157
158 if ((urls == null || urls.length == 0)) {
159 log.log(Level.FATAL, "no JMX url has been defined.");
160 return;
161 }
162
163 long lastranat = 0;
164 while (running) {
165 if (lastranat < System.currentTimeMillis() - interval) {
166 try {
167 lastranat = System.currentTimeMillis();
168
169 for (int aa = 0; aa < urls.length; aa++) {
170 String originalurl = urls[aa];
171 String url = IpAddressUtility.modifyURL(originalurl, true);
172 try {
173 Connection con = Utility.getConfigurationDB_NONPOOLED_Connection();
174 AuxHelper.CheckStatisticalPolicyAndCreate(url, con, false, AuxHelper.UNSPECIFIED, SLACommon.GetHostName());
175 con.close();
176 } catch (Exception ex) {
177 }
178 Fire(false, originalurl, url);
179 }
180 } catch (Exception ex) {
181 log.log(Level.ERROR, null, ex);
182 }
183 log.log(Level.INFO, "Sleeping " + interval + "ms until next interation....");
184 }
185 if (running) {
186 try {
187 Thread.sleep(1000);
188 } catch (InterruptedException ex) {
189 log.log(Level.ERROR, null, ex);
190 }
191 }
192 done = true;
193 }
194 }
195
196 public static void Fire(final boolean pooled, final String url, final String modifiedurl) throws Exception {
197
198 ServicePolicy p = null;
199 if (pooled) {
200 p = SLACommon.LoadPolicyPooled(modifiedurl);
201 } else {
202 p = SLACommon.LoadPolicyNotPooled(modifiedurl);
203 }
204
205 JMXServiceURL u = new JMXServiceURL(url);
206 Map m = new HashMap();
207 boolean ok = true;
208 String[] data = DBSettingsLoader.GetCredentials(pooled, modifiedurl);
209 if (data != null) {
210 String[] cred = new String[]{data[0], Utility.DE(data[1])};
211 m.put(JMXConnector.CREDENTIALS, cred);
212 }
213 String status = "OK";
214 JMXConnector c = null;
215 try {
216 c = JMXConnectorFactory.connect(u, m);
217 AuxHelper.TryUpdateStatus(true, modifiedurl, status, pooled, PolicyType.STATISTICAL, AuxHelper.UNSPECIFIED, SLACommon.GetHostName());
218 } catch (Exception ex) {
219 AuxHelper.TryUpdateStatus(false, modifiedurl, ex.getLocalizedMessage(), pooled, PolicyType.STATISTICAL, AuxHelper.UNSPECIFIED, SLACommon.GetHostName());
220
221 status = "Offline " + ex.getMessage();
222 log.log(Level.WARN, "AMQP QPID Server offline " + url);
223 ok = false;
224 }
225
226 if (ok && c!=null) {
227 log.log(Level.INFO, "AMQP QPID Server Online, updating statistics" + url);
228 MBeanServerConnection mBeanServerConnection = c.getMBeanServerConnection();
229
230
231 Set<ObjectName> queryNames = mBeanServerConnection.queryNames(ObjectName.WILDCARD, null);
232
233 Iterator<ObjectName> iterator = queryNames.iterator();
234 String[] queueattribs = new String[]{"QueueDepth",
235 "Name",
236 "MessageCount",
237 "ReceivedMessageCount",
238 "ActiveConsumerCount",
239
240
241 "ConsumerCount",
242
243
244
245
246
247
248
249
250 "ExchangeType"
251
252 };
253
254 java.sql.Connection perfcon = null;
255
256 try {
257 if (pooled) {
258 perfcon = Utility.getPerformanceDBConnection();
259
260 } else {
261 perfcon = Utility.getPerformanceDB_NONPOOLED_Connection();
262
263 }
264
265 } catch (Exception ex) {
266 log.log(Level.ERROR, "Cannot connect to fgsms performance db, monitoring is not possible, exiting...");
267 return;
268 }
269
270 PreparedStatement ps = perfcon.prepareStatement("delete from brokerrawdata where host=?");
271 ps.setString(1, modifiedurl);
272 ps.executeUpdate();
273 ps.close();
274 AddStatisticalDataRequestMsg req = new AddStatisticalDataRequestMsg();
275 Map currenttopics = new HashMap();
276 while (iterator.hasNext()) {
277
278 ObjectName n = iterator.next();
279 if (n != null && n.getCanonicalName() != null) {
280 if (n.getCanonicalName().startsWith("org.apache.qpid")) {
281 {
282
283 AttributeList attributes = null;
284 try {
285 attributes = mBeanServerConnection.getAttributes(new ObjectName(n.getCanonicalName()), queueattribs);
286 } catch (Exception ex2) {
287 try {
288 mBeanServerConnection = c.getMBeanServerConnection();
289 attributes = mBeanServerConnection.getAttributes(new ObjectName(n.getCanonicalName()), queueattribs);
290 } catch (Exception ex3) {
291 }
292 }
293
294 log.log(Level.DEBUG, "Updating: " + n.getCanonicalName());
295
296 String bigname = n.getCanonicalName();
297 String ExchangeType = "queue";
298 long ConsumerCount = 0;
299 long ActiveConsumerCount = 0;
300 long RecievedMessageCount = 0;
301 long MessageCount = 0;
302 String name = "";
303 long QueueDepth = 0;
304 if (attributes != null) {
305 for (int i2 = 0; i2 < attributes.size(); i2++) {
306
307 if (attributes.get(i2).toString().toLowerCase().startsWith("activeconsumercount = ")) {
308 ActiveConsumerCount = Long.parseLong(attributes.get(i2).toString().toLowerCase().replace("activeconsumercount = ", "").trim());
309 }
310 if (attributes.get(i2).toString().toLowerCase().startsWith("receivedmessagecount = ")) {
311 RecievedMessageCount = Long.parseLong(attributes.get(i2).toString().toLowerCase().replace("receivedmessagecount = ", "").trim());
312 }
313 if (attributes.get(i2).toString().toLowerCase().startsWith("messagecount = ")) {
314 MessageCount = Long.parseLong(attributes.get(i2).toString().toLowerCase().replace("messagecount = ", "").trim());
315 }
316 if (attributes.get(i2).toString().toLowerCase().startsWith("queuedepth = ")) {
317 QueueDepth = Long.parseLong(attributes.get(i2).toString().toLowerCase().replace("queuedepth = ", "").trim());
318 }
319 if (attributes.get(i2).toString().toLowerCase().startsWith("consumercount = ")) {
320 ConsumerCount = Long.parseLong(attributes.get(i2).toString().toLowerCase().replace("consumercount = ", "").trim());
321 }
322 if (attributes.get(i2).toString().toLowerCase().startsWith("name = ")) {
323 name = attributes.get(i2).toString().split("=")[1].trim();
324 }
325 if (attributes.get(i2).toString().toLowerCase().startsWith("exchangetype = ")) {
326 ExchangeType = attributes.get(i2).toString().split("=")[1].trim();
327 }
328 }
329 }
330
331 if (!Utility.stringIsNullOrEmpty(name)) {
332 currenttopics.put(n.getCanonicalName(), QueueDepth);
333
334 req.getData().add(updateData(modifiedurl, name, bigname, MessageCount, RecievedMessageCount, ConsumerCount, ActiveConsumerCount,
335 QueueDepth, ExchangeType, 0, 0, 0, 0, perfcon));
336
337
338
339
340 }
341 }
342 }
343
344 }
345
346 }
347
348
349 req.setAgentType(AGENT);
350 req.setBrokerURI(modifiedurl);
351 req.setBrokerHostname(p.getMachineName());
352 req.setDomain(p.getDomainName());
353 req.setClassification(SLACommon.GetClassLevel(pooled));
354 req.setOperationalStatus(ok);
355 req.setOperationalStatusMessage(status);
356 SLACommon.ProcessStatisticalSLARules(req, pooled);
357 c.close();
358 perfcon.close();
359 }
360
361 }
362
363 private static BrokerData updateData(final String url, final String name, final String bigname,
364 final long MessageCount, final long RecievedMessageCount, final long ConsumerCount, final long ActiveConsumerCount,
365 final long QueueDepth, final String ExchangeType, final long bytesin, final long bytesout, final long bytesdrop,
366 final long MessageDropCount, final Connection perf) {
367 BrokerData bd = new BrokerData();
368 bd.setActiveConsumers(QueueDepth);
369 bd.setQueueOrTopicCanonicalName(bigname);
370 bd.setQueueOrTopicName(name);
371 bd.setMessagesIn(RecievedMessageCount);
372 bd.setMessagesDropped(MessageDropCount);
373 bd.setDepth(QueueDepth);
374 bd.setActiveConsumers(ActiveConsumerCount);
375 bd.setBytesDropped(bytesdrop);
376 bd.setBytesIn(bytesin);
377 bd.setBytesOut(bytesout);
378 bd.setItemType(ExchangeType);
379 bd.setMessagesOut(MessageCount);
380 bd.setTotalConsumers(ConsumerCount);
381 if (!Utility.stringIsNullOrEmpty(name)) {
382 try {
383 PreparedStatement comm = perf.prepareStatement("Delete from brokerrawdata where host=? and namecol=? and canonicalname=?; "
384 + "INSERT INTO brokerrawdata (host, utcdatetime, namecol, canonicalname, messagecount,recievedmessagecount, consumercount, "
385 + "activeconsumercount, queuedepth, typecol, agenttype, messagedropcount, bytesdropcount, bytesin, bytesout) "
386 + "VALUES (?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"
387 + "INSERT INTO brokerhistory (host, utcdatetime, namecol,canonicalname, messagecount,recievedmessagecount, consumercount, "
388 + "activeconsumercount, queuedepth, typecol, agenttype, messagedropcount, bytesdropcount, bytesin, bytesout) "
389 + "VALUES (?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);");
390 comm.setString(1, url);
391 comm.setString(2, name);
392 comm.setString(3, bigname);
393 comm.setString(4, url);
394 comm.setLong(5, System.currentTimeMillis());
395 comm.setString(6, name);
396 comm.setString(7, bigname);
397 comm.setLong(8, MessageCount);
398 comm.setLong(9, RecievedMessageCount);
399 comm.setLong(10, ConsumerCount);
400 comm.setLong(11, ActiveConsumerCount);
401 comm.setLong(12, QueueDepth);
402 comm.setString(13, ExchangeType);
403 comm.setString(14, AGENT);
404 comm.setLong(15, MessageDropCount);
405 comm.setLong(16, bytesdrop);
406 comm.setLong(17, bytesin);
407 comm.setLong(18, bytesout);
408
409 comm.setString(19, url);
410 comm.setLong(20, System.currentTimeMillis());
411 comm.setString(21, name);
412 comm.setString(22, bigname);
413 comm.setLong(23, MessageCount);
414 comm.setLong(24, RecievedMessageCount);
415 comm.setLong(25, ConsumerCount);
416 comm.setLong(26, ActiveConsumerCount);
417 comm.setLong(27, QueueDepth);
418 comm.setString(28, ExchangeType);
419 comm.setString(29, AGENT);
420 comm.setLong(30, MessageDropCount);
421 comm.setLong(31, bytesdrop);
422 comm.setLong(32, bytesin);
423 comm.setLong(33, bytesout);
424 comm.execute();
425 comm.close();
426 } catch (SQLException ex) {
427 log.log(Level.WARN, "error saving broker data", ex);
428 }
429
430 }
431
432 return bd;
433 }
434
435
436 }