1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.metrics2.util;
20
21 import java.io.IOException;
22 import java.util.Arrays;
23 import java.util.HashMap;
24 import java.util.LinkedList;
25 import java.util.ListIterator;
26 import java.util.Map;
27
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29
30 import com.google.common.annotations.VisibleForTesting;
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 @InterfaceAudience.Private
49 public class MetricSampleQuantiles {
50
51
52
53
54 private long count = 0;
55
56
57
58
59 private LinkedList<SampleItem> samples;
60
61
62
63
64
65
66 private long[] buffer = new long[500];
67 private int bufferCount = 0;
68
69
70
71
72 private final MetricQuantile quantiles[];
73
74 public MetricSampleQuantiles(MetricQuantile[] quantiles) {
75 this.quantiles = Arrays.copyOf(quantiles, quantiles.length);
76 this.samples = new LinkedList<SampleItem>();
77 }
78
79
80
81
82
83
84
85
86
87
88
89 private double allowableError(int rank) {
90 int size = samples.size();
91 double minError = size + 1;
92 for (MetricQuantile q : quantiles) {
93 double error;
94 if (rank <= q.quantile * size) {
95 error = (2.0 * q.error * (size - rank)) / (1.0 - q.quantile);
96 } else {
97 error = (2.0 * q.error * rank) / q.quantile;
98 }
99 if (error < minError) {
100 minError = error;
101 }
102 }
103
104 return minError;
105 }
106
107
108
109
110
111
112 synchronized public void insert(long v) {
113 buffer[bufferCount] = v;
114 bufferCount++;
115
116 count++;
117
118 if (bufferCount == buffer.length) {
119 insertBatch();
120 compress();
121 }
122 }
123
124
125
126
127
128 private void insertBatch() {
129 if (bufferCount == 0) {
130 return;
131 }
132
133 Arrays.sort(buffer, 0, bufferCount);
134
135
136 int start = 0;
137 if (samples.size() == 0) {
138 SampleItem newItem = new SampleItem(buffer[0], 1, 0);
139 samples.add(newItem);
140 start++;
141 }
142
143 ListIterator<SampleItem> it = samples.listIterator();
144 SampleItem item = it.next();
145 for (int i = start; i < bufferCount; i++) {
146 long v = buffer[i];
147 while (it.nextIndex() < samples.size() && item.value < v) {
148 item = it.next();
149 }
150
151 if (item.value > v) {
152 it.previous();
153 }
154
155
156 int delta;
157 if (it.previousIndex() == 0 || it.nextIndex() == samples.size()) {
158 delta = 0;
159 } else {
160 delta = ((int) Math.floor(allowableError(it.nextIndex()))) - 1;
161 }
162 SampleItem newItem = new SampleItem(v, 1, delta);
163 it.add(newItem);
164 item = newItem;
165 }
166
167 bufferCount = 0;
168 }
169
170
171
172
173
174
175 private void compress() {
176 if (samples.size() < 2) {
177 return;
178 }
179
180 ListIterator<SampleItem> it = samples.listIterator();
181 SampleItem prev = null;
182 SampleItem next = it.next();
183
184 while (it.hasNext()) {
185 prev = next;
186 next = it.next();
187 if (prev.g + next.g + next.delta <= allowableError(it.previousIndex())) {
188 next.g += prev.g;
189
190 it.previous();
191 it.previous();
192 it.remove();
193
194 it.next();
195 }
196 }
197 }
198
199
200
201
202
203
204
205 private long query(double quantile) throws IOException {
206 if (samples.size() == 0) {
207 throw new IOException("No samples present");
208 }
209
210 int rankMin = 0;
211 int desired = (int) (quantile * count);
212
213 for (int i = 1; i < samples.size(); i++) {
214 SampleItem prev = samples.get(i - 1);
215 SampleItem cur = samples.get(i);
216
217 rankMin += prev.g;
218
219 if (rankMin + cur.g + cur.delta > desired + (allowableError(i) / 2)) {
220 return prev.value;
221 }
222 }
223
224
225 return samples.get(samples.size() - 1).value;
226 }
227
228
229
230
231
232
233
234
235 synchronized public Map<MetricQuantile, Long> snapshot() throws IOException {
236
237 insertBatch();
238 Map<MetricQuantile, Long> values = new HashMap<MetricQuantile, Long>(quantiles.length);
239 for (int i = 0; i < quantiles.length; i++) {
240 values.put(quantiles[i], query(quantiles[i].quantile));
241 }
242
243 return values;
244 }
245
246
247
248
249
250
251 synchronized public long getCount() {
252 return count;
253 }
254
255
256
257
258
259
260 @VisibleForTesting
261 synchronized public int getSampleCount() {
262 return samples.size();
263 }
264
265
266
267
268 synchronized public void clear() {
269 count = 0;
270 bufferCount = 0;
271 samples.clear();
272 }
273
274
275
276
277
278 private static class SampleItem {
279
280
281
282
283 public final long value;
284
285
286
287
288
289
290
291 public int g;
292
293
294
295
296
297 public final int delta;
298
299 public SampleItem(long value, int lowerDelta, int delta) {
300 this.value = value;
301 this.g = lowerDelta;
302 this.delta = delta;
303 }
304
305 @Override
306 public String toString() {
307 return String.format("%d, %d, %d", value, g, delta);
308 }
309 }
310 }