1
2
3
4
5
6
7 package org.miloss.fgsms.statistics.jobs;
8
9 import java.sql.Connection;
10 import java.sql.PreparedStatement;
11 import java.sql.ResultSet;
12 import java.util.HashMap;
13 import java.util.List;
14 import java.util.UUID;
15 import org.apache.log4j.Level;
16 import org.miloss.fgsms.agentcore.MessageProcessor;
17 import org.miloss.fgsms.common.DBUtils;
18 import org.miloss.fgsms.common.Utility;
19 import org.miloss.fgsms.services.interfaces.common.PolicyType;
20 import static org.miloss.fgsms.statistics.FgsmsStatsv2.SERVICE_NAME;
21 import static org.miloss.fgsms.statistics.FgsmsStatsv2.allitems;
22 import static org.miloss.fgsms.statistics.FgsmsStatsv2.log;
23 import static org.miloss.fgsms.statistics.FgsmsStatsv2.myUrl;
24
25
26
27
28
29 public class BrokerStatisticsJob extends BaseJob {
30 List<Long> periods;
31
32 public BrokerStatisticsJob(List<Long> periods) {
33 this.periods = periods;
34 }
35 @Override
36 public void run() {
37 UUID random = UUID.randomUUID();
38 MessageProcessor.getSingletonObject().processMessageInput(SERVICE_NAME, 0, myUrl, "machine/process", "system", random.toString(), new HashMap(), "", this.getClass().getCanonicalName(), "", "");
39 Connection ConfigCon = Utility.getConfigurationDBConnection();
40 Connection PerfCon = Utility.getPerformanceDBConnection();
41 try {
42 doWorkBrokers(ConfigCon, PerfCon, periods);
43 MessageProcessor.getSingletonObject().processMessageOutput(random.toString(), "success", 0, false, System.currentTimeMillis(), new HashMap());
44 } catch (Exception ex) {
45 MessageProcessor.getSingletonObject().processMessageOutput(random.toString(), "error " + ex.getMessage(), 0, true, System.currentTimeMillis(), new HashMap());
46
47 } finally {
48 DBUtils.safeClose(PerfCon);
49 DBUtils.safeClose(ConfigCon);
50 }
51 }
52
53 private long getMaxQueueDepth(final String url, final Long ts, Connection con) {
54 long r = 0;
55 PreparedStatement cmd = null;
56 ResultSet rs = null;
57 try {
58 cmd = con.prepareStatement("select max(queuedepth) from brokerhistory where host=? and utcdatetime > ?");
59 cmd.setString(1, url);
60 cmd.setLong(2, System.currentTimeMillis() - ts);
61 rs = cmd.executeQuery();
62 if (rs.next()) {
63 r = rs.getLong(1);
64 }
65
66 } catch (Exception ex) {
67 log.log(Level.ERROR, null, ex);
68 } finally {
69 DBUtils.safeClose(rs);
70 DBUtils.safeClose(cmd);
71 }
72 return r;
73 }
74
75 private long getMessagesIn(final String url, final Long ts, Connection con) {
76 long r = 0;
77 PreparedStatement cmd = null;
78 ResultSet rs = null;
79 try {
80 cmd = con.prepareStatement("select avg(recievedmessagecount) from brokerhistory where host=? and utcdatetime > ?");
81 cmd.setString(1, url);
82 cmd.setLong(2, System.currentTimeMillis() - ts);
83 rs = cmd.executeQuery();
84 if (rs.next()) {
85 r = rs.getLong(1);
86 }
87 rs.close();
88 cmd.close();
89
90 } catch (Exception ex) {
91 log.log(Level.ERROR, null, ex);
92 } finally {
93 try {
94 if (rs != null && !rs.isClosed()) {
95 rs.close();
96 }
97 } catch (Throwable ex) {
98 }
99 try {
100 if (cmd != null && !cmd.isClosed()) {
101 cmd.close();
102 }
103 } catch (Throwable ex) {
104 }
105 }
106 return r;
107 }
108
109 private long getMessagesOut(String url, Long ts, Connection con) {
110 long r = 0;
111 PreparedStatement cmd = null;
112 ResultSet rs = null;
113 try {
114 cmd = con.prepareStatement("select avg(messagecount) from brokerhistory where host=? and utcdatetime > ?");
115 cmd.setString(1, url);
116 cmd.setLong(2, System.currentTimeMillis() - ts);
117 rs = cmd.executeQuery();
118 if (rs.next()) {
119 r = rs.getLong(1);
120 }
121
122 } catch (Exception ex) {
123 log.log(Level.ERROR, null, ex);
124 } finally {
125 DBUtils.safeClose(rs);
126 DBUtils.safeClose(cmd);
127 }
128 return r;
129 }
130
131 private long getMessagesDropped(final String url, final Long ts, Connection con) {
132 long r = 0;
133 PreparedStatement cmd = null;
134 ResultSet rs = null;
135 try {
136 cmd = con.prepareStatement("select avg(messagedropcount) from brokerhistory where host=? and utcdatetime > ?");
137 cmd.setString(1, url);
138 cmd.setLong(2, System.currentTimeMillis() - ts);
139 rs = cmd.executeQuery();
140 if (rs.next()) {
141 r = rs.getLong(1);
142 }
143
144 } catch (Exception ex) {
145 log.log(Level.ERROR, null, ex);
146 } finally {
147 DBUtils.safeClose(rs);
148 DBUtils.safeClose(cmd);
149 }
150 return r;
151 }
152 private void doWorkBrokers(Connection ConfigCon, Connection PerfCon, List<Long> periods) throws Exception {
153 PreparedStatement com = null;
154 ResultSet rs = null;
155 try {
156 long now = System.currentTimeMillis();
157 if (ConfigCon == null || PerfCon == null) {
158 log.log(Level.ERROR, "doWorkBrokers database unavailable");
159 return;
160 }
161 com = ConfigCon.prepareStatement("select uri,policytype from servicepolicies where policytype=?;");
162 com.setInt(1, PolicyType.STATISTICAL.ordinal());
163 rs = com.executeQuery();
164 while (rs.next()) {
165
166 log.log(Level.INFO, "calculating statistics for " + rs.getString("uri"));
167 String t = allitems;
168
169 for (int i = 0; i < periods.size(); i++) {
170 insertRow(PerfCon, rs.getString("uri"), t, periods.get(i));
171
172 PreparedStatement up = PerfCon.prepareStatement("UPDATE agg2 set "
173 + " avail=?, "
174 + " timestampepoch=?, "
175 + " sla=?, "
176 + " avgchan =?, maxqueuedepth =?, avgmsgin =?, avgmsgout =?, avgmsgdropped =? "
177 + "WHERE uri=? and soapaction=? and timerange=?;");
178
179 double avail = getAvailability(now, periods.get(i), rs.getString("uri"), t, PerfCon, ConfigCon);
180
181 up.setDouble(1, avail);
182 up.setLong(2, now);
183 up.setLong(3, getSLACount(rs.getString("uri"), periods.get(i), PerfCon));
184 up.setLong(4, getAverageChannelCount(rs.getString("uri"), periods.get(i), PerfCon));
185 up.setLong(5, getMaxQueueDepth(rs.getString("uri"), periods.get(i), PerfCon));
186 up.setLong(6, getMessagesIn(rs.getString("uri"), periods.get(i), PerfCon));
187 up.setLong(7, getMessagesOut(rs.getString("uri"), periods.get(i), PerfCon));
188 up.setLong(8, getMessagesDropped(rs.getString("uri"), periods.get(i), PerfCon));
189 up.setString(9, rs.getString("uri"));
190 up.setString(10, t);
191 up.setLong(11, periods.get(i));
192 up.executeUpdate();
193 log.log(Level.DEBUG, "Updated stats for service " + rs.getString("uri") + " action " + t);
194 up.close();
195 }
196 }
197
198 now = System.currentTimeMillis() - now;
199 log.log(Level.INFO, "Statistics calculations took " + now + "ms");
200
201 } catch (Exception ex) {
202 log.log(Level.ERROR, null, ex);
203 throw ex;
204 } finally {
205 DBUtils.safeClose(rs);
206 DBUtils.safeClose(com);
207 }
208 }
209
210 private long getAverageChannelCount(String url, Long ts, Connection con) {
211
212 return -1;
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230 }
231
232 }