View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.metrics2.util;
20  
21  import java.io.IOException;
22  import java.util.Arrays;
23  import java.util.HashMap;
24  import java.util.LinkedList;
25  import java.util.ListIterator;
26  import java.util.Map;
27  
28  import org.apache.hadoop.classification.InterfaceAudience;
29  
30  import com.google.common.annotations.VisibleForTesting;
31  
32  /**
33   * Implementation of the Cormode, Korn, Muthukrishnan, and Srivastava algorithm
34   * for streaming calculation of targeted high-percentile epsilon-approximate
35   * quantiles.
36   * 
37   * This is a generalization of the earlier work by Greenwald and Khanna (GK),
38   * which essentially allows different error bounds on the targeted quantiles,
39   * which allows for far more efficient calculation of high-percentiles.
40   * 
41   * See: Cormode, Korn, Muthukrishnan, and Srivastava
42   * "Effective Computation of Biased Quantiles over Data Streams" in ICDE 2005
43   * 
44   * Greenwald and Khanna,
45   * "Space-efficient online computation of quantile summaries" in SIGMOD 2001
46   * 
47   */
48  @InterfaceAudience.Private
49  public class MetricSampleQuantiles {
50  
51    /**
52     * Total number of items in stream
53     */
54    private long count = 0;
55  
56    /**
57     * Current list of sampled items, maintained in sorted order with error bounds
58     */
59    private LinkedList<SampleItem> samples;
60  
61    /**
62     * Buffers incoming items to be inserted in batch. Items are inserted into 
63     * the buffer linearly. When the buffer fills, it is flushed into the samples
64     * array in its entirety.
65     */
66    private long[] buffer = new long[500];
67    private int bufferCount = 0;
68  
69    /**
70     * Array of Quantiles that we care about, along with desired error.
71     */
72    private final MetricQuantile quantiles[];
73  
74    public MetricSampleQuantiles(MetricQuantile[] quantiles) {
75      this.quantiles = Arrays.copyOf(quantiles, quantiles.length);
76      this.samples = new LinkedList<SampleItem>();
77    }
78  
79    /**
80     * Specifies the allowable error for this rank, depending on which quantiles
81     * are being targeted.
82     * 
83     * This is the f(r_i, n) function from the CKMS paper. It's basically how wide
84     * the range of this rank can be.
85     * 
86     * @param rank
87     *          the index in the list of samples
88     */
89    private double allowableError(int rank) {
90      int size = samples.size();
91      double minError = size + 1;
92      for (MetricQuantile q : quantiles) {
93        double error;
94        if (rank <= q.quantile * size) {
95          error = (2.0 * q.error * (size - rank)) / (1.0 - q.quantile);
96        } else {
97          error = (2.0 * q.error * rank) / q.quantile;
98        }
99        if (error < minError) {
100         minError = error;
101       }
102     }
103 
104     return minError;
105   }
106 
107   /**
108    * Add a new value from the stream.
109    * 
110    * @param v
111    */
112   synchronized public void insert(long v) {
113     buffer[bufferCount] = v;
114     bufferCount++;
115 
116     count++;
117 
118     if (bufferCount == buffer.length) {
119       insertBatch();
120       compress();
121     }
122   }
123 
124   /**
125    * Merges items from buffer into the samples array in one pass.
126    * This is more efficient than doing an insert on every item.
127    */
128   private void insertBatch() {
129     if (bufferCount == 0) {
130       return;
131     }
132 
133     Arrays.sort(buffer, 0, bufferCount);
134 
135     // Base case: no samples
136     int start = 0;
137     if (samples.size() == 0) {
138       SampleItem newItem = new SampleItem(buffer[0], 1, 0);
139       samples.add(newItem);
140       start++;
141     }
142 
143     ListIterator<SampleItem> it = samples.listIterator();
144     SampleItem item = it.next();
145     for (int i = start; i < bufferCount; i++) {
146       long v = buffer[i];
147       while (it.nextIndex() < samples.size() && item.value < v) {
148         item = it.next();
149       }
150       // If we found that bigger item, back up so we insert ourselves before it
151       if (item.value > v) {
152         it.previous();
153       }
154       // We use different indexes for the edge comparisons, because of the above
155       // if statement that adjusts the iterator
156       int delta;
157       if (it.previousIndex() == 0 || it.nextIndex() == samples.size()) {
158         delta = 0;
159       } else {
160         delta = ((int) Math.floor(allowableError(it.nextIndex()))) - 1;
161       }
162       SampleItem newItem = new SampleItem(v, 1, delta);
163       it.add(newItem);
164       item = newItem;
165     }
166 
167     bufferCount = 0;
168   }
169 
170   /**
171    * Try to remove extraneous items from the set of sampled items. This checks
172    * if an item is unnecessary based on the desired error bounds, and merges it
173    * with the adjacent item if it is.
174    */
175   private void compress() {
176     if (samples.size() < 2) {
177       return;
178     }
179 
180     ListIterator<SampleItem> it = samples.listIterator();
181     SampleItem prev = null;
182     SampleItem next = it.next();
183 
184     while (it.hasNext()) {
185       prev = next;
186       next = it.next();
187       if (prev.g + next.g + next.delta <= allowableError(it.previousIndex())) {
188         next.g += prev.g;
189         // Remove prev. it.remove() kills the last thing returned.
190         it.previous();
191         it.previous();
192         it.remove();
193         // it.next() is now equal to next, skip it back forward again
194         it.next();
195       }
196     }
197   }
198 
199   /**
200    * Get the estimated value at the specified quantile.
201    * 
202    * @param quantile Queried quantile, e.g. 0.50 or 0.99.
203    * @return Estimated value at that quantile.
204    */
205   private long query(double quantile) throws IOException {
206     if (samples.size() == 0) {
207       throw new IOException("No samples present");
208     }
209 
210     int rankMin = 0;
211     int desired = (int) (quantile * count);
212 
213     for (int i = 1; i < samples.size(); i++) {
214       SampleItem prev = samples.get(i - 1);
215       SampleItem cur = samples.get(i);
216 
217       rankMin += prev.g;
218 
219       if (rankMin + cur.g + cur.delta > desired + (allowableError(i) / 2)) {
220         return prev.value;
221       }
222     }
223 
224     // edge case of wanting max value
225     return samples.get(samples.size() - 1).value;
226   }
227 
228   /**
229    * Get a snapshot of the current values of all the tracked quantiles.
230    * 
231    * @return snapshot of the tracked quantiles
232    * @throws IOException
233    *           if no items have been added to the estimator
234    */
235   synchronized public Map<MetricQuantile, Long> snapshot() throws IOException {
236     // flush the buffer first for best results
237     insertBatch();
238     Map<MetricQuantile, Long> values = new HashMap<MetricQuantile, Long>(quantiles.length);
239     for (int i = 0; i < quantiles.length; i++) {
240       values.put(quantiles[i], query(quantiles[i].quantile));
241     }
242 
243     return values;
244   }
245 
246   /**
247    * Returns the number of items that the estimator has processed
248    * 
249    * @return count total number of items processed
250    */
251   synchronized public long getCount() {
252     return count;
253   }
254 
255   /**
256    * Returns the number of samples kept by the estimator
257    * 
258    * @return count current number of samples
259    */
260   @VisibleForTesting
261   synchronized public int getSampleCount() {
262     return samples.size();
263   }
264 
265   /**
266    * Resets the estimator, clearing out all previously inserted items
267    */
268   synchronized public void clear() {
269     count = 0;
270     bufferCount = 0;
271     samples.clear();
272   }
273 
274   /**
275    * Describes a measured value passed to the estimator, tracking additional
276    * metadata required by the CKMS algorithm.
277    */
278   private static class SampleItem {
279     
280     /**
281      * Value of the sampled item (e.g. a measured latency value)
282      */
283     public final long value;
284     
285     /**
286      * Difference between the lowest possible rank of the previous item, and 
287      * the lowest possible rank of this item.
288      * 
289      * The sum of the g of all previous items yields this item's lower bound. 
290      */
291     public int g;
292     
293     /**
294      * Difference between the item's greatest possible rank and lowest possible
295      * rank.
296      */
297     public final int delta;
298 
299     public SampleItem(long value, int lowerDelta, int delta) {
300       this.value = value;
301       this.g = lowerDelta;
302       this.delta = delta;
303     }
304 
305     @Override
306     public String toString() {
307       return String.format("%d, %d, %d", value, g, delta);
308     }
309   }
310 }