001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.metrics2.util;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.HashMap;
023import java.util.LinkedList;
024import java.util.ListIterator;
025import java.util.Map;
026import org.apache.yetus.audience.InterfaceAudience;
027
028/**
029 * Implementation of the Cormode, Korn, Muthukrishnan, and Srivastava algorithm for streaming
030 * calculation of targeted high-percentile epsilon-approximate quantiles. This is a generalization
031 * of the earlier work by Greenwald and Khanna (GK), which essentially allows different error bounds
032 * on the targeted quantiles, which allows for far more efficient calculation of high-percentiles.
033 * See: Cormode, Korn, Muthukrishnan, and Srivastava "Effective Computation of Biased Quantiles over
034 * Data Streams" in ICDE 2005 Greenwald and Khanna, "Space-efficient online computation of quantile
035 * summaries" in SIGMOD 2001
036 */
037@InterfaceAudience.Private
038@SuppressWarnings("JdkObsolete") // This is a use case for LinkedList
039public class MetricSampleQuantiles {
040
041  /**
042   * Total number of items in stream
043   */
044  private long count = 0;
045
046  /**
047   * Current list of sampled items, maintained in sorted order with error bounds
048   */
049  private LinkedList<SampleItem> samples;
050
051  /**
052   * Buffers incoming items to be inserted in batch. Items are inserted into the buffer linearly.
053   * When the buffer fills, it is flushed into the samples array in its entirety.
054   */
055  private long[] buffer = new long[500];
056  private int bufferCount = 0;
057
058  /**
059   * Array of Quantiles that we care about, along with desired error.
060   */
061  private final MetricQuantile[] quantiles;
062
063  public MetricSampleQuantiles(MetricQuantile[] quantiles) {
064    this.quantiles = Arrays.copyOf(quantiles, quantiles.length);
065    this.samples = new LinkedList<>();
066  }
067
068  /**
069   * Specifies the allowable error for this rank, depending on which quantiles are being targeted.
070   * This is the f(r_i, n) function from the CKMS paper. It's basically how wide the range of this
071   * rank can be. n * the index in the list of samples
072   */
073  private double allowableError(int rank) {
074    int size = samples.size();
075    double minError = size + 1;
076    for (MetricQuantile q : quantiles) {
077      double error;
078      if (rank <= q.quantile * size) {
079        error = (2.0 * q.error * (size - rank)) / (1.0 - q.quantile);
080      } else {
081        error = (2.0 * q.error * rank) / q.quantile;
082      }
083      if (error < minError) {
084        minError = error;
085      }
086    }
087
088    return minError;
089  }
090
091  /**
092   * Add a new value from the stream.
093   * @param v the value to insert
094   */
095  synchronized public void insert(long v) {
096    buffer[bufferCount] = v;
097    bufferCount++;
098
099    count++;
100
101    if (bufferCount == buffer.length) {
102      insertBatch();
103      compress();
104    }
105  }
106
107  /**
108   * Merges items from buffer into the samples array in one pass. This is more efficient than doing
109   * an insert on every item.
110   */
111  private void insertBatch() {
112    if (bufferCount == 0) {
113      return;
114    }
115
116    Arrays.sort(buffer, 0, bufferCount);
117
118    // Base case: no samples
119    int start = 0;
120    if (samples.isEmpty()) {
121      SampleItem newItem = new SampleItem(buffer[0], 1, 0);
122      samples.add(newItem);
123      start++;
124    }
125
126    ListIterator<SampleItem> it = samples.listIterator();
127    SampleItem item = it.next();
128    for (int i = start; i < bufferCount; i++) {
129      long v = buffer[i];
130      while (it.nextIndex() < samples.size() && item.value < v) {
131        item = it.next();
132      }
133      // If we found that bigger item, back up so we insert ourselves before it
134      if (item.value > v) {
135        it.previous();
136      }
137      // We use different indexes for the edge comparisons, because of the above
138      // if statement that adjusts the iterator
139      int delta;
140      if (it.previousIndex() == 0 || it.nextIndex() == samples.size()) {
141        delta = 0;
142      } else {
143        delta = ((int) Math.floor(allowableError(it.nextIndex()))) - 1;
144      }
145      SampleItem newItem = new SampleItem(v, 1, delta);
146      it.add(newItem);
147      item = newItem;
148    }
149
150    bufferCount = 0;
151  }
152
153  /**
154   * Try to remove extraneous items from the set of sampled items. This checks if an item is
155   * unnecessary based on the desired error bounds, and merges it with the adjacent item if it is.
156   */
157  private void compress() {
158    if (samples.size() < 2) {
159      return;
160    }
161
162    ListIterator<SampleItem> it = samples.listIterator();
163    SampleItem prev = null;
164    SampleItem next = it.next();
165
166    while (it.hasNext()) {
167      prev = next;
168      next = it.next();
169      if (prev.g + next.g + next.delta <= allowableError(it.previousIndex())) {
170        next.g += prev.g;
171        // Remove prev. it.remove() kills the last thing returned.
172        it.previous();
173        it.previous();
174        it.remove();
175        // it.next() is now equal to next, skip it back forward again
176        it.next();
177      }
178    }
179  }
180
181  /**
182   * Get the estimated value at the specified quantile.
183   * @param quantile Queried quantile, e.g. 0.50 or 0.99.
184   * @return Estimated value at that quantile.
185   */
186  private long query(double quantile) throws IOException {
187    if (samples.isEmpty()) {
188      throw new IOException("No samples present");
189    }
190
191    int rankMin = 0;
192    int desired = (int) (quantile * count);
193
194    for (int i = 1; i < samples.size(); i++) {
195      SampleItem prev = samples.get(i - 1);
196      SampleItem cur = samples.get(i);
197
198      rankMin += prev.g;
199
200      if (rankMin + cur.g + cur.delta > desired + (allowableError(i) / 2)) {
201        return prev.value;
202      }
203    }
204
205    // edge case of wanting max value
206    return samples.get(samples.size() - 1).value;
207  }
208
209  /**
210   * Get a snapshot of the current values of all the tracked quantiles.
211   * @return snapshot of the tracked quantiles n * if no items have been added to the estimator
212   */
213  synchronized public Map<MetricQuantile, Long> snapshot() throws IOException {
214    // flush the buffer first for best results
215    insertBatch();
216    Map<MetricQuantile, Long> values = new HashMap<>(quantiles.length);
217    for (int i = 0; i < quantiles.length; i++) {
218      values.put(quantiles[i], query(quantiles[i].quantile));
219    }
220
221    return values;
222  }
223
224  /**
225   * Returns the number of items that the estimator has processed
226   * @return count total number of items processed
227   */
228  synchronized public long getCount() {
229    return count;
230  }
231
232  /**
233   * Returns the number of samples kept by the estimator
234   * @return count current number of samples
235   */
236  synchronized public int getSampleCount() {
237    return samples.size();
238  }
239
240  /**
241   * Resets the estimator, clearing out all previously inserted items
242   */
243  synchronized public void clear() {
244    count = 0;
245    bufferCount = 0;
246    samples.clear();
247  }
248
249  /**
250   * Describes a measured value passed to the estimator, tracking additional metadata required by
251   * the CKMS algorithm.
252   */
253  private static class SampleItem {
254
255    /**
256     * Value of the sampled item (e.g. a measured latency value)
257     */
258    private final long value;
259
260    /**
261     * Difference between the lowest possible rank of the previous item, and the lowest possible
262     * rank of this item. The sum of the g of all previous items yields this item's lower bound.
263     */
264    private int g;
265
266    /**
267     * Difference between the item's greatest possible rank and lowest possible rank.
268     */
269    private final int delta;
270
271    public SampleItem(long value, int lowerDelta, int delta) {
272      this.value = value;
273      this.g = lowerDelta;
274      this.delta = delta;
275    }
276
277    @Override
278    public String toString() {
279      return String.format("%d, %d, %d", value, g, delta);
280    }
281  }
282}