001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.metrics2.util;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.HashMap;
023import java.util.LinkedList;
024import java.util.ListIterator;
025import java.util.Map;
026import org.apache.yetus.audience.InterfaceAudience;
027
028/**
029 * Implementation of the Cormode, Korn, Muthukrishnan, and Srivastava algorithm for streaming
030 * calculation of targeted high-percentile epsilon-approximate quantiles. This is a generalization
031 * of the earlier work by Greenwald and Khanna (GK), which essentially allows different error bounds
032 * on the targeted quantiles, which allows for far more efficient calculation of high-percentiles.
033 * See: Cormode, Korn, Muthukrishnan, and Srivastava "Effective Computation of Biased Quantiles over
034 * Data Streams" in ICDE 2005 Greenwald and Khanna, "Space-efficient online computation of quantile
035 * summaries" in SIGMOD 2001
036 */
037@InterfaceAudience.Private
038public class MetricSampleQuantiles {
039
040  /**
041   * Total number of items in stream
042   */
043  private long count = 0;
044
045  /**
046   * Current list of sampled items, maintained in sorted order with error bounds
047   */
048  private LinkedList<SampleItem> samples;
049
050  /**
051   * Buffers incoming items to be inserted in batch. Items are inserted into the buffer linearly.
052   * When the buffer fills, it is flushed into the samples array in its entirety.
053   */
054  private long[] buffer = new long[500];
055  private int bufferCount = 0;
056
057  /**
058   * Array of Quantiles that we care about, along with desired error.
059   */
060  private final MetricQuantile[] quantiles;
061
062  public MetricSampleQuantiles(MetricQuantile[] quantiles) {
063    this.quantiles = Arrays.copyOf(quantiles, quantiles.length);
064    this.samples = new LinkedList<>();
065  }
066
067  /**
068   * Specifies the allowable error for this rank, depending on which quantiles are being targeted.
069   * This is the f(r_i, n) function from the CKMS paper. It's basically how wide the range of this
070   * rank can be. n * the index in the list of samples
071   */
072  private double allowableError(int rank) {
073    int size = samples.size();
074    double minError = size + 1;
075    for (MetricQuantile q : quantiles) {
076      double error;
077      if (rank <= q.quantile * size) {
078        error = (2.0 * q.error * (size - rank)) / (1.0 - q.quantile);
079      } else {
080        error = (2.0 * q.error * rank) / q.quantile;
081      }
082      if (error < minError) {
083        minError = error;
084      }
085    }
086
087    return minError;
088  }
089
090  /**
091   * Add a new value from the stream.
092   * @param v the value to insert
093   */
094  synchronized public void insert(long v) {
095    buffer[bufferCount] = v;
096    bufferCount++;
097
098    count++;
099
100    if (bufferCount == buffer.length) {
101      insertBatch();
102      compress();
103    }
104  }
105
106  /**
107   * Merges items from buffer into the samples array in one pass. This is more efficient than doing
108   * an insert on every item.
109   */
110  private void insertBatch() {
111    if (bufferCount == 0) {
112      return;
113    }
114
115    Arrays.sort(buffer, 0, bufferCount);
116
117    // Base case: no samples
118    int start = 0;
119    if (samples.isEmpty()) {
120      SampleItem newItem = new SampleItem(buffer[0], 1, 0);
121      samples.add(newItem);
122      start++;
123    }
124
125    ListIterator<SampleItem> it = samples.listIterator();
126    SampleItem item = it.next();
127    for (int i = start; i < bufferCount; i++) {
128      long v = buffer[i];
129      while (it.nextIndex() < samples.size() && item.value < v) {
130        item = it.next();
131      }
132      // If we found that bigger item, back up so we insert ourselves before it
133      if (item.value > v) {
134        it.previous();
135      }
136      // We use different indexes for the edge comparisons, because of the above
137      // if statement that adjusts the iterator
138      int delta;
139      if (it.previousIndex() == 0 || it.nextIndex() == samples.size()) {
140        delta = 0;
141      } else {
142        delta = ((int) Math.floor(allowableError(it.nextIndex()))) - 1;
143      }
144      SampleItem newItem = new SampleItem(v, 1, delta);
145      it.add(newItem);
146      item = newItem;
147    }
148
149    bufferCount = 0;
150  }
151
152  /**
153   * Try to remove extraneous items from the set of sampled items. This checks if an item is
154   * unnecessary based on the desired error bounds, and merges it with the adjacent item if it is.
155   */
156  private void compress() {
157    if (samples.size() < 2) {
158      return;
159    }
160
161    ListIterator<SampleItem> it = samples.listIterator();
162    SampleItem prev = null;
163    SampleItem next = it.next();
164
165    while (it.hasNext()) {
166      prev = next;
167      next = it.next();
168      if (prev.g + next.g + next.delta <= allowableError(it.previousIndex())) {
169        next.g += prev.g;
170        // Remove prev. it.remove() kills the last thing returned.
171        it.previous();
172        it.previous();
173        it.remove();
174        // it.next() is now equal to next, skip it back forward again
175        it.next();
176      }
177    }
178  }
179
180  /**
181   * Get the estimated value at the specified quantile.
182   * @param quantile Queried quantile, e.g. 0.50 or 0.99.
183   * @return Estimated value at that quantile.
184   */
185  private long query(double quantile) throws IOException {
186    if (samples.isEmpty()) {
187      throw new IOException("No samples present");
188    }
189
190    int rankMin = 0;
191    int desired = (int) (quantile * count);
192
193    for (int i = 1; i < samples.size(); i++) {
194      SampleItem prev = samples.get(i - 1);
195      SampleItem cur = samples.get(i);
196
197      rankMin += prev.g;
198
199      if (rankMin + cur.g + cur.delta > desired + (allowableError(i) / 2)) {
200        return prev.value;
201      }
202    }
203
204    // edge case of wanting max value
205    return samples.get(samples.size() - 1).value;
206  }
207
208  /**
209   * Get a snapshot of the current values of all the tracked quantiles.
210   * @return snapshot of the tracked quantiles n * if no items have been added to the estimator
211   */
212  synchronized public Map<MetricQuantile, Long> snapshot() throws IOException {
213    // flush the buffer first for best results
214    insertBatch();
215    Map<MetricQuantile, Long> values = new HashMap<>(quantiles.length);
216    for (int i = 0; i < quantiles.length; i++) {
217      values.put(quantiles[i], query(quantiles[i].quantile));
218    }
219
220    return values;
221  }
222
223  /**
224   * Returns the number of items that the estimator has processed
225   * @return count total number of items processed
226   */
227  synchronized public long getCount() {
228    return count;
229  }
230
231  /**
232   * Returns the number of samples kept by the estimator
233   * @return count current number of samples
234   */
235  synchronized public int getSampleCount() {
236    return samples.size();
237  }
238
239  /**
240   * Resets the estimator, clearing out all previously inserted items
241   */
242  synchronized public void clear() {
243    count = 0;
244    bufferCount = 0;
245    samples.clear();
246  }
247
248  /**
249   * Describes a measured value passed to the estimator, tracking additional metadata required by
250   * the CKMS algorithm.
251   */
252  private static class SampleItem {
253
254    /**
255     * Value of the sampled item (e.g. a measured latency value)
256     */
257    private final long value;
258
259    /**
260     * Difference between the lowest possible rank of the previous item, and the lowest possible
261     * rank of this item. The sum of the g of all previous items yields this item's lower bound.
262     */
263    private int g;
264
265    /**
266     * Difference between the item's greatest possible rank and lowest possible rank.
267     */
268    private final int delta;
269
270    public SampleItem(long value, int lowerDelta, int delta) {
271      this.value = value;
272      this.g = lowerDelta;
273      this.delta = delta;
274    }
275
276    @Override
277    public String toString() {
278      return String.format("%d, %d, %d", value, g, delta);
279    }
280  }
281}