001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.metrics2.util;
020
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.HashMap;
024import java.util.LinkedList;
025import java.util.ListIterator;
026import java.util.Map;
027
028import org.apache.yetus.audience.InterfaceAudience;
029
030import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
031
032/**
033 * Implementation of the Cormode, Korn, Muthukrishnan, and Srivastava algorithm
034 * for streaming calculation of targeted high-percentile epsilon-approximate
035 * quantiles.
036 * 
037 * This is a generalization of the earlier work by Greenwald and Khanna (GK),
038 * which essentially allows different error bounds on the targeted quantiles,
039 * which allows for far more efficient calculation of high-percentiles.
040 * 
041 * See: Cormode, Korn, Muthukrishnan, and Srivastava
042 * "Effective Computation of Biased Quantiles over Data Streams" in ICDE 2005
043 * 
044 * Greenwald and Khanna,
045 * "Space-efficient online computation of quantile summaries" in SIGMOD 2001
046 * 
047 */
048@InterfaceAudience.Private
049public class MetricSampleQuantiles {
050
051  /**
052   * Total number of items in stream
053   */
054  private long count = 0;
055
056  /**
057   * Current list of sampled items, maintained in sorted order with error bounds
058   */
059  private LinkedList<SampleItem> samples;
060
061  /**
062   * Buffers incoming items to be inserted in batch. Items are inserted into 
063   * the buffer linearly. When the buffer fills, it is flushed into the samples
064   * array in its entirety.
065   */
066  private long[] buffer = new long[500];
067  private int bufferCount = 0;
068
069  /**
070   * Array of Quantiles that we care about, along with desired error.
071   */
072  private final MetricQuantile[] quantiles;
073
074  public MetricSampleQuantiles(MetricQuantile[] quantiles) {
075    this.quantiles = Arrays.copyOf(quantiles, quantiles.length);
076    this.samples = new LinkedList<>();
077  }
078
079  /**
080   * Specifies the allowable error for this rank, depending on which quantiles
081   * are being targeted.
082   * 
083   * This is the f(r_i, n) function from the CKMS paper. It's basically how wide
084   * the range of this rank can be.
085   * 
086   * @param rank
087   *          the index in the list of samples
088   */
089  private double allowableError(int rank) {
090    int size = samples.size();
091    double minError = size + 1;
092    for (MetricQuantile q : quantiles) {
093      double error;
094      if (rank <= q.quantile * size) {
095        error = (2.0 * q.error * (size - rank)) / (1.0 - q.quantile);
096      } else {
097        error = (2.0 * q.error * rank) / q.quantile;
098      }
099      if (error < minError) {
100        minError = error;
101      }
102    }
103
104    return minError;
105  }
106
107  /**
108   * Add a new value from the stream.
109   * 
110   * @param v the value to insert
111   */
112  synchronized public void insert(long v) {
113    buffer[bufferCount] = v;
114    bufferCount++;
115
116    count++;
117
118    if (bufferCount == buffer.length) {
119      insertBatch();
120      compress();
121    }
122  }
123
124  /**
125   * Merges items from buffer into the samples array in one pass.
126   * This is more efficient than doing an insert on every item.
127   */
128  private void insertBatch() {
129    if (bufferCount == 0) {
130      return;
131    }
132
133    Arrays.sort(buffer, 0, bufferCount);
134
135    // Base case: no samples
136    int start = 0;
137    if (samples.isEmpty()) {
138      SampleItem newItem = new SampleItem(buffer[0], 1, 0);
139      samples.add(newItem);
140      start++;
141    }
142
143    ListIterator<SampleItem> it = samples.listIterator();
144    SampleItem item = it.next();
145    for (int i = start; i < bufferCount; i++) {
146      long v = buffer[i];
147      while (it.nextIndex() < samples.size() && item.value < v) {
148        item = it.next();
149      }
150      // If we found that bigger item, back up so we insert ourselves before it
151      if (item.value > v) {
152        it.previous();
153      }
154      // We use different indexes for the edge comparisons, because of the above
155      // if statement that adjusts the iterator
156      int delta;
157      if (it.previousIndex() == 0 || it.nextIndex() == samples.size()) {
158        delta = 0;
159      } else {
160        delta = ((int) Math.floor(allowableError(it.nextIndex()))) - 1;
161      }
162      SampleItem newItem = new SampleItem(v, 1, delta);
163      it.add(newItem);
164      item = newItem;
165    }
166
167    bufferCount = 0;
168  }
169
170  /**
171   * Try to remove extraneous items from the set of sampled items. This checks
172   * if an item is unnecessary based on the desired error bounds, and merges it
173   * with the adjacent item if it is.
174   */
175  private void compress() {
176    if (samples.size() < 2) {
177      return;
178    }
179
180    ListIterator<SampleItem> it = samples.listIterator();
181    SampleItem prev = null;
182    SampleItem next = it.next();
183
184    while (it.hasNext()) {
185      prev = next;
186      next = it.next();
187      if (prev.g + next.g + next.delta <= allowableError(it.previousIndex())) {
188        next.g += prev.g;
189        // Remove prev. it.remove() kills the last thing returned.
190        it.previous();
191        it.previous();
192        it.remove();
193        // it.next() is now equal to next, skip it back forward again
194        it.next();
195      }
196    }
197  }
198
199  /**
200   * Get the estimated value at the specified quantile.
201   * 
202   * @param quantile Queried quantile, e.g. 0.50 or 0.99.
203   * @return Estimated value at that quantile.
204   */
205  private long query(double quantile) throws IOException {
206    if (samples.isEmpty()) {
207      throw new IOException("No samples present");
208    }
209
210    int rankMin = 0;
211    int desired = (int) (quantile * count);
212
213    for (int i = 1; i < samples.size(); i++) {
214      SampleItem prev = samples.get(i - 1);
215      SampleItem cur = samples.get(i);
216
217      rankMin += prev.g;
218
219      if (rankMin + cur.g + cur.delta > desired + (allowableError(i) / 2)) {
220        return prev.value;
221      }
222    }
223
224    // edge case of wanting max value
225    return samples.get(samples.size() - 1).value;
226  }
227
228  /**
229   * Get a snapshot of the current values of all the tracked quantiles.
230   * 
231   * @return snapshot of the tracked quantiles
232   * @throws IOException
233   *           if no items have been added to the estimator
234   */
235  synchronized public Map<MetricQuantile, Long> snapshot() throws IOException {
236    // flush the buffer first for best results
237    insertBatch();
238    Map<MetricQuantile, Long> values = new HashMap<>(quantiles.length);
239    for (int i = 0; i < quantiles.length; i++) {
240      values.put(quantiles[i], query(quantiles[i].quantile));
241    }
242
243    return values;
244  }
245
246  /**
247   * Returns the number of items that the estimator has processed
248   * 
249   * @return count total number of items processed
250   */
251  synchronized public long getCount() {
252    return count;
253  }
254
255  /**
256   * Returns the number of samples kept by the estimator
257   * 
258   * @return count current number of samples
259   */
260  @VisibleForTesting
261  synchronized public int getSampleCount() {
262    return samples.size();
263  }
264
265  /**
266   * Resets the estimator, clearing out all previously inserted items
267   */
268  synchronized public void clear() {
269    count = 0;
270    bufferCount = 0;
271    samples.clear();
272  }
273
274  /**
275   * Describes a measured value passed to the estimator, tracking additional
276   * metadata required by the CKMS algorithm.
277   */
278  private static class SampleItem {
279    
280    /**
281     * Value of the sampled item (e.g. a measured latency value)
282     */
283    private final long value;
284    
285    /**
286     * Difference between the lowest possible rank of the previous item, and 
287     * the lowest possible rank of this item.
288     * 
289     * The sum of the g of all previous items yields this item's lower bound. 
290     */
291    private int g;
292    
293    /**
294     * Difference between the item's greatest possible rank and lowest possible
295     * rank.
296     */
297    private final int delta;
298
299    public SampleItem(long value, int lowerDelta, int delta) {
300      this.value = value;
301      this.g = lowerDelta;
302      this.delta = delta;
303    }
304
305    @Override
306    public String toString() {
307      return String.format("%d, %d, %d", value, g, delta);
308    }
309  }
310}