1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.miloss.fgsms.statistics;
21
22 import java.sql.Connection;
23 import java.sql.PreparedStatement;
24 import java.sql.ResultSet;
25 import java.sql.SQLException;
26 import java.util.*;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.ThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
30 import org.miloss.fgsms.common.DBSettingsLoader;
31 import org.miloss.fgsms.common.Utility;
32 import org.miloss.fgsms.services.interfaces.common.PolicyType;
33 import org.miloss.fgsms.services.interfaces.dataaccessservice.OperationalRecord;
34 import org.miloss.fgsms.services.interfaces.policyconfiguration.KeyNameValueEnc;
35 import org.apache.log4j.Level;
36 import org.miloss.fgsms.agentcore.MessageProcessor;
37 import org.miloss.fgsms.common.Logger;
38 import org.miloss.fgsms.common.DBUtils;
39 import org.miloss.fgsms.statistics.jobs.BrokerStatisticsJob;
40 import org.miloss.fgsms.statistics.jobs.MachineProcessJob;
41 import org.miloss.fgsms.statistics.jobs.StatusStatisticsJob;
42 import org.miloss.fgsms.statistics.jobs.TransactionalStatisticsJob;
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 public class FgsmsStatsv2 {
58
59 public static final String myUrl = "urn:fgsms:StatisticsAggregator:" + Utility.getHostName();
60 public static final String SERVICE_NAME = "Statistics Aggregation";
61 public static Logger log = Logger.getLogger("fgsms.Stats");
62 public static String allitems = "All-Methods";
63 ThreadPoolExecutor threadPool;
64
65 public FgsmsStatsv2() {
66 threadPool = new ThreadPoolExecutor(0, 100, 25, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
67 }
68
69
70 private List<Long> addIfMissingMandatoryTimePeriods(List<Long> periods) {
71 periods.add(Long.valueOf(5 * 60 * 1000));
72 periods.add(Long.valueOf(15 * 60 * 1000));
73 periods.add(Long.valueOf(60 * 60 * 1000));
74 periods.add(Long.valueOf(24 * 60 * 60 * 1000));
75 Set<Long> minified = new HashSet<Long>(periods);
76 return new ArrayList<Long>(minified);
77 }
78
79 private void printPeriods(List<Long> periods) {
80 String s = "";
81 for (int i = 0; i < periods.size(); i++) {
82 s += periods.get(i) + " ";
83 }
84 log.log(Level.DEBUG, "calculating status using the following time periods " + s.trim());
85 }
86
87
88
89
90
91
92
93
94
95 private void cleanup(Connection perfCon, List<Long> periods) {
96 if (perfCon == null) {
97 log.log(Level.ERROR, "cleanup database unavailable");
98 return;
99 }
100 String s = "";
101 for (int i = 0; i < periods.size(); i++) {
102 s += "timerange != " + periods.get(i) + " ";
103 if (i + 1 < periods.size()) {
104 s += " AND ";
105 }
106 }
107 PreparedStatement cmd = null;
108 try {
109 cmd = perfCon.prepareStatement("delete from agg2 where " + s);
110 cmd.executeUpdate();
111 } catch (Exception e) {
112 log.log(Level.WARN, null, e);
113 } finally {
114 DBUtils.safeClose(cmd);
115 }
116 }
117
118
119
120
121
122
123
124
125 public void doWork(boolean pooled) throws Exception {
126 List<Long> periods = new ArrayList<Long>();
127 try {
128 KeyNameValueEnc GetPropertiesFromDB = DBSettingsLoader.GetPropertiesFromDB(pooled, "StatisticsAggregator", "Periods");
129 String[] items = GetPropertiesFromDB.getKeyNameValue().getPropertyValue().split(",");
130 for (int i = 0; i < items.length; i++) {
131 long l = Long.parseLong(items[i]);
132 if (l > 0) {
133 periods.add(l);
134 }
135 }
136 periods = addIfMissingMandatoryTimePeriods(periods);
137 } catch (Exception e) {
138 log.log(Level.WARN, "settings from the database for time periods was unparsable, reverting to the defaults.", e);
139 periods.add(Long.valueOf(5 * 60 * 1000));
140 periods.add(Long.valueOf(15 * 60 * 1000));
141 periods.add(Long.valueOf(60 * 60 * 1000));
142 periods.add(Long.valueOf(24 * 60 * 60 * 1000));
143 }
144
145 printPeriods(periods);
146 Connection ConfigCon = null;
147 Connection PerfCon = null;
148
149 PreparedStatement com = null;
150
151 ResultSet rs = null;
152
153 try {
154
155 if (pooled) {
156 ConfigCon = Utility.getConfigurationDBConnection();
157 PerfCon = Utility.getPerformanceDBConnection();
158 } else {
159 ConfigCon = Utility.getConfigurationDB_NONPOOLED_Connection();
160 PerfCon = Utility.getPerformanceDB_NONPOOLED_Connection();
161 }
162 if (ConfigCon == null || PerfCon == null) {
163 log.log(Level.FATAL, "statistics job on start, database unavailable");
164 DBUtils.safeClose(PerfCon);
165 DBUtils.safeClose(ConfigCon);
166 throw new Exception("database unavailable");
167 }
168
169 threadPool.execute(new StatusStatisticsJob(periods));
170 threadPool.execute(new MachineProcessJob(periods));
171 threadPool.execute(new BrokerStatisticsJob(periods));
172
173 com = ConfigCon.prepareStatement("select uri from servicepolicies where policytype=?;");
174 com.setInt(1, PolicyType.TRANSACTIONAL.ordinal());
175 rs = com.executeQuery();
176 while (rs.next()) {
177 threadPool.execute(new TransactionalStatisticsJob(periods, rs.getString("uri")));
178 }
179 rs.close();
180 com.close();
181
182 } catch (Exception ex) {
183
184 log.log(Level.ERROR, "Error caught during statistics calculation, please report", ex);
185 throw ex;
186 } finally {
187 cleanup(PerfCon, periods);
188 DBUtils.safeClose(com);
189 DBUtils.safeClose(rs);
190 DBUtils.safeClose(PerfCon);
191 DBUtils.safeClose(ConfigCon);
192 }
193
194 }
195
196
197
198 }