View Javadoc
1   /*
2    * To change this license header, choose License Headers in Project Properties.
3    * To change this template file, choose Tools | Templates
4    * and open the template in the editor.
5    */
6   
7   package org.miloss.fgsms.statistics.jobs;
8   
9   import java.sql.Connection;
10  import java.sql.PreparedStatement;
11  import java.sql.ResultSet;
12  import java.util.HashMap;
13  import java.util.List;
14  import java.util.UUID;
15  import org.apache.log4j.Level;
16  import org.miloss.fgsms.agentcore.MessageProcessor;
17  import org.miloss.fgsms.common.DBUtils;
18  import org.miloss.fgsms.common.Utility;
19  import org.miloss.fgsms.services.interfaces.common.PolicyType;
20  import static org.miloss.fgsms.statistics.FgsmsStatsv2.SERVICE_NAME;
21  import static org.miloss.fgsms.statistics.FgsmsStatsv2.allitems;
22  import static org.miloss.fgsms.statistics.FgsmsStatsv2.log;
23  import static org.miloss.fgsms.statistics.FgsmsStatsv2.myUrl;
24  
25  /**
26   *
27   * @author AO
28   */
29  public class BrokerStatisticsJob extends BaseJob {
30   List<Long> periods;
31  
32      public BrokerStatisticsJob(List<Long> periods) {
33          this.periods = periods;
34      }
35      @Override
36      public void run() {
37          UUID random = UUID.randomUUID();
38          MessageProcessor.getSingletonObject().processMessageInput(SERVICE_NAME, 0, myUrl, "machine/process", "system", random.toString(), new HashMap(), "", this.getClass().getCanonicalName(), "", "");
39          Connection ConfigCon = Utility.getConfigurationDBConnection();
40          Connection PerfCon = Utility.getPerformanceDBConnection();
41          try {
42              doWorkBrokers(ConfigCon, PerfCon, periods);
43              MessageProcessor.getSingletonObject().processMessageOutput(random.toString(), "success", 0, false, System.currentTimeMillis(), new HashMap());
44          } catch (Exception ex) {
45              MessageProcessor.getSingletonObject().processMessageOutput(random.toString(), "error " + ex.getMessage(), 0, true, System.currentTimeMillis(), new HashMap());
46  
47          } finally {
48              DBUtils.safeClose(PerfCon);
49              DBUtils.safeClose(ConfigCon);
50          }
51      }
52      
53      private long getMaxQueueDepth(final String url, final Long ts, Connection con) {
54          long r = 0;
55          PreparedStatement cmd = null;
56          ResultSet rs = null;
57          try {
58              cmd = con.prepareStatement("select max(queuedepth) from brokerhistory where host=? and utcdatetime > ?");
59              cmd.setString(1, url);
60              cmd.setLong(2, System.currentTimeMillis() - ts);
61              rs = cmd.executeQuery();
62              if (rs.next()) {
63                  r = rs.getLong(1);
64              }
65  
66          } catch (Exception ex) {
67              log.log(Level.ERROR, null, ex);
68          } finally {
69              DBUtils.safeClose(rs);
70              DBUtils.safeClose(cmd);
71          }
72          return r;
73      }
74  
75      private long getMessagesIn(final String url, final Long ts, Connection con) {
76          long r = 0;
77          PreparedStatement cmd = null;
78          ResultSet rs = null;
79          try {
80              cmd = con.prepareStatement("select avg(recievedmessagecount) from brokerhistory where host=? and utcdatetime > ?");
81              cmd.setString(1, url);
82              cmd.setLong(2, System.currentTimeMillis() - ts);
83              rs = cmd.executeQuery();
84              if (rs.next()) {
85                  r = rs.getLong(1);
86              }
87              rs.close();
88              cmd.close();
89  
90          } catch (Exception ex) {
91              log.log(Level.ERROR, null, ex);
92          } finally {
93              try {
94                  if (rs != null && !rs.isClosed()) {
95                      rs.close();
96                  }
97              } catch (Throwable ex) {
98              }
99              try {
100                 if (cmd != null && !cmd.isClosed()) {
101                     cmd.close();
102                 }
103             } catch (Throwable ex) {
104             }
105         }
106         return r;
107     }
108 
109     private long getMessagesOut(String url, Long ts, Connection con) {
110         long r = 0;
111         PreparedStatement cmd = null;
112         ResultSet rs = null;
113         try {
114             cmd = con.prepareStatement("select avg(messagecount) from brokerhistory where host=? and utcdatetime > ?");
115             cmd.setString(1, url);
116             cmd.setLong(2, System.currentTimeMillis() - ts);
117             rs = cmd.executeQuery();
118             if (rs.next()) {
119                 r = rs.getLong(1);
120             }
121 
122         } catch (Exception ex) {
123             log.log(Level.ERROR, null, ex);
124         } finally {
125             DBUtils.safeClose(rs);
126             DBUtils.safeClose(cmd);
127         }
128         return r;
129     }
130 
131     private long getMessagesDropped(final String url, final Long ts, Connection con) {
132         long r = 0;
133         PreparedStatement cmd = null;
134         ResultSet rs = null;
135         try {
136             cmd = con.prepareStatement("select avg(messagedropcount) from brokerhistory where host=? and utcdatetime > ?");
137             cmd.setString(1, url);
138             cmd.setLong(2, System.currentTimeMillis() - ts);
139             rs = cmd.executeQuery();
140             if (rs.next()) {
141                 r = rs.getLong(1);
142             }
143 
144         } catch (Exception ex) {
145             log.log(Level.ERROR, null, ex);
146         } finally {
147             DBUtils.safeClose(rs);
148             DBUtils.safeClose(cmd);
149         }
150         return r;
151     }
152      private void doWorkBrokers(Connection ConfigCon, Connection PerfCon, List<Long> periods) throws Exception {
153         PreparedStatement com = null;
154         ResultSet rs = null;
155         try {
156             long now = System.currentTimeMillis();
157             if (ConfigCon == null || PerfCon == null) {
158                 log.log(Level.ERROR, "doWorkBrokers database unavailable");
159                 return;
160             }
161             com = ConfigCon.prepareStatement("select uri,policytype from servicepolicies where policytype=?;");
162             com.setInt(1, PolicyType.STATISTICAL.ordinal());
163             rs = com.executeQuery();
164             while (rs.next()) {          //for each service
165 
166                 log.log(Level.INFO, "calculating statistics for " + rs.getString("uri"));
167                 String t = allitems;
168                 //do rollup
169                 for (int i = 0; i < periods.size(); i++) {
170                     insertRow(PerfCon, rs.getString("uri"), t, periods.get(i));
171                     //TODO check policy type and change queries
172                     PreparedStatement up = PerfCon.prepareStatement("UPDATE agg2 set "
173                             + " avail=?, "
174                             + "  timestampepoch=?,  "
175                             + "  sla=?,  "
176                             + " avgchan =?,  maxqueuedepth =?,  avgmsgin =?,  avgmsgout =?,  avgmsgdropped =? "
177                             + "WHERE uri=? and soapaction=? and timerange=?;");
178 
179                     double avail = getAvailability(now, periods.get(i), rs.getString("uri"), t, PerfCon, ConfigCon);
180 
181                     up.setDouble(1, avail);
182                     up.setLong(2, now);
183                     up.setLong(3, getSLACount(rs.getString("uri"), periods.get(i), PerfCon));
184                     up.setLong(4, getAverageChannelCount(rs.getString("uri"), periods.get(i), PerfCon));
185                     up.setLong(5, getMaxQueueDepth(rs.getString("uri"), periods.get(i), PerfCon));
186                     up.setLong(6, getMessagesIn(rs.getString("uri"), periods.get(i), PerfCon));
187                     up.setLong(7, getMessagesOut(rs.getString("uri"), periods.get(i), PerfCon));
188                     up.setLong(8, getMessagesDropped(rs.getString("uri"), periods.get(i), PerfCon));
189                     up.setString(9, rs.getString("uri"));
190                     up.setString(10, t);
191                     up.setLong(11, periods.get(i));
192                     up.executeUpdate();
193                     log.log(Level.DEBUG, "Updated stats for service " + rs.getString("uri") + " action " + t);
194                     up.close();
195                 }
196             }
197 
198             now = System.currentTimeMillis() - now;
199             log.log(Level.INFO, "Statistics calculations took " + now + "ms");
200 
201         } catch (Exception ex) {
202             log.log(Level.ERROR, null, ex);
203             throw ex;
204         } finally {
205             DBUtils.safeClose(rs);
206             DBUtils.safeClose(com);
207         }
208     }
209 
210     private long getAverageChannelCount(String url, Long ts, Connection con) {
211         //FIXME
212         return -1;
213         /*
214          Double r = Double.valueOf(0);
215          try {asdasd
216          PreparedStatement cmd = con.prepareStatement("select avg(openfiles) from brokerhistory where uri=? and utcdatetime > ?");
217          cmd.setString(1, url);
218          cmd.setLong(2, System.currentTimeMillis() - ts);
219          ResultSet rs = cmd.executeQuery();
220          if (rs.next()) {
221          r = rs.getDouble(1);
222          }
223          rs.close();
224          cmd.close();
225         
226          } catch (Exception ex) {
227          log.log(Level.ERROR, null, ex);
228          }
229          return r.longValue();*/
230     }
231 
232 }