001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.metrics2.util;
020
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.HashMap;
024import java.util.LinkedList;
025import java.util.ListIterator;
026import java.util.Map;
027
028import org.apache.yetus.audience.InterfaceAudience;
029
030/**
031 * Implementation of the Cormode, Korn, Muthukrishnan, and Srivastava algorithm
032 * for streaming calculation of targeted high-percentile epsilon-approximate
033 * quantiles.
034 * 
035 * This is a generalization of the earlier work by Greenwald and Khanna (GK),
036 * which essentially allows different error bounds on the targeted quantiles,
037 * which allows for far more efficient calculation of high-percentiles.
038 * 
039 * See: Cormode, Korn, Muthukrishnan, and Srivastava
040 * "Effective Computation of Biased Quantiles over Data Streams" in ICDE 2005
041 * 
042 * Greenwald and Khanna,
043 * "Space-efficient online computation of quantile summaries" in SIGMOD 2001
044 * 
045 */
046@InterfaceAudience.Private
047public class MetricSampleQuantiles {
048
049  /**
050   * Total number of items in stream
051   */
052  private long count = 0;
053
054  /**
055   * Current list of sampled items, maintained in sorted order with error bounds
056   */
057  private LinkedList<SampleItem> samples;
058
059  /**
060   * Buffers incoming items to be inserted in batch. Items are inserted into 
061   * the buffer linearly. When the buffer fills, it is flushed into the samples
062   * array in its entirety.
063   */
064  private long[] buffer = new long[500];
065  private int bufferCount = 0;
066
067  /**
068   * Array of Quantiles that we care about, along with desired error.
069   */
070  private final MetricQuantile[] quantiles;
071
072  public MetricSampleQuantiles(MetricQuantile[] quantiles) {
073    this.quantiles = Arrays.copyOf(quantiles, quantiles.length);
074    this.samples = new LinkedList<>();
075  }
076
077  /**
078   * Specifies the allowable error for this rank, depending on which quantiles
079   * are being targeted.
080   * 
081   * This is the f(r_i, n) function from the CKMS paper. It's basically how wide
082   * the range of this rank can be.
083   * 
084   * @param rank
085   *          the index in the list of samples
086   */
087  private double allowableError(int rank) {
088    int size = samples.size();
089    double minError = size + 1;
090    for (MetricQuantile q : quantiles) {
091      double error;
092      if (rank <= q.quantile * size) {
093        error = (2.0 * q.error * (size - rank)) / (1.0 - q.quantile);
094      } else {
095        error = (2.0 * q.error * rank) / q.quantile;
096      }
097      if (error < minError) {
098        minError = error;
099      }
100    }
101
102    return minError;
103  }
104
105  /**
106   * Add a new value from the stream.
107   * 
108   * @param v the value to insert
109   */
110  synchronized public void insert(long v) {
111    buffer[bufferCount] = v;
112    bufferCount++;
113
114    count++;
115
116    if (bufferCount == buffer.length) {
117      insertBatch();
118      compress();
119    }
120  }
121
122  /**
123   * Merges items from buffer into the samples array in one pass.
124   * This is more efficient than doing an insert on every item.
125   */
126  private void insertBatch() {
127    if (bufferCount == 0) {
128      return;
129    }
130
131    Arrays.sort(buffer, 0, bufferCount);
132
133    // Base case: no samples
134    int start = 0;
135    if (samples.isEmpty()) {
136      SampleItem newItem = new SampleItem(buffer[0], 1, 0);
137      samples.add(newItem);
138      start++;
139    }
140
141    ListIterator<SampleItem> it = samples.listIterator();
142    SampleItem item = it.next();
143    for (int i = start; i < bufferCount; i++) {
144      long v = buffer[i];
145      while (it.nextIndex() < samples.size() && item.value < v) {
146        item = it.next();
147      }
148      // If we found that bigger item, back up so we insert ourselves before it
149      if (item.value > v) {
150        it.previous();
151      }
152      // We use different indexes for the edge comparisons, because of the above
153      // if statement that adjusts the iterator
154      int delta;
155      if (it.previousIndex() == 0 || it.nextIndex() == samples.size()) {
156        delta = 0;
157      } else {
158        delta = ((int) Math.floor(allowableError(it.nextIndex()))) - 1;
159      }
160      SampleItem newItem = new SampleItem(v, 1, delta);
161      it.add(newItem);
162      item = newItem;
163    }
164
165    bufferCount = 0;
166  }
167
168  /**
169   * Try to remove extraneous items from the set of sampled items. This checks
170   * if an item is unnecessary based on the desired error bounds, and merges it
171   * with the adjacent item if it is.
172   */
173  private void compress() {
174    if (samples.size() < 2) {
175      return;
176    }
177
178    ListIterator<SampleItem> it = samples.listIterator();
179    SampleItem prev = null;
180    SampleItem next = it.next();
181
182    while (it.hasNext()) {
183      prev = next;
184      next = it.next();
185      if (prev.g + next.g + next.delta <= allowableError(it.previousIndex())) {
186        next.g += prev.g;
187        // Remove prev. it.remove() kills the last thing returned.
188        it.previous();
189        it.previous();
190        it.remove();
191        // it.next() is now equal to next, skip it back forward again
192        it.next();
193      }
194    }
195  }
196
197  /**
198   * Get the estimated value at the specified quantile.
199   * 
200   * @param quantile Queried quantile, e.g. 0.50 or 0.99.
201   * @return Estimated value at that quantile.
202   */
203  private long query(double quantile) throws IOException {
204    if (samples.isEmpty()) {
205      throw new IOException("No samples present");
206    }
207
208    int rankMin = 0;
209    int desired = (int) (quantile * count);
210
211    for (int i = 1; i < samples.size(); i++) {
212      SampleItem prev = samples.get(i - 1);
213      SampleItem cur = samples.get(i);
214
215      rankMin += prev.g;
216
217      if (rankMin + cur.g + cur.delta > desired + (allowableError(i) / 2)) {
218        return prev.value;
219      }
220    }
221
222    // edge case of wanting max value
223    return samples.get(samples.size() - 1).value;
224  }
225
226  /**
227   * Get a snapshot of the current values of all the tracked quantiles.
228   * 
229   * @return snapshot of the tracked quantiles
230   * @throws IOException
231   *           if no items have been added to the estimator
232   */
233  synchronized public Map<MetricQuantile, Long> snapshot() throws IOException {
234    // flush the buffer first for best results
235    insertBatch();
236    Map<MetricQuantile, Long> values = new HashMap<>(quantiles.length);
237    for (int i = 0; i < quantiles.length; i++) {
238      values.put(quantiles[i], query(quantiles[i].quantile));
239    }
240
241    return values;
242  }
243
244  /**
245   * Returns the number of items that the estimator has processed
246   * 
247   * @return count total number of items processed
248   */
249  synchronized public long getCount() {
250    return count;
251  }
252
253  /**
254   * Returns the number of samples kept by the estimator
255   * 
256   * @return count current number of samples
257   */
258  synchronized public int getSampleCount() {
259    return samples.size();
260  }
261
262  /**
263   * Resets the estimator, clearing out all previously inserted items
264   */
265  synchronized public void clear() {
266    count = 0;
267    bufferCount = 0;
268    samples.clear();
269  }
270
271  /**
272   * Describes a measured value passed to the estimator, tracking additional
273   * metadata required by the CKMS algorithm.
274   */
275  private static class SampleItem {
276    
277    /**
278     * Value of the sampled item (e.g. a measured latency value)
279     */
280    private final long value;
281    
282    /**
283     * Difference between the lowest possible rank of the previous item, and 
284     * the lowest possible rank of this item.
285     * 
286     * The sum of the g of all previous items yields this item's lower bound. 
287     */
288    private int g;
289    
290    /**
291     * Difference between the item's greatest possible rank and lowest possible
292     * rank.
293     */
294    private final int delta;
295
296    public SampleItem(long value, int lowerDelta, int delta) {
297      this.value = value;
298      this.g = lowerDelta;
299      this.delta = delta;
300    }
301
302    @Override
303    public String toString() {
304      return String.format("%d, %d, %d", value, g, delta);
305    }
306  }
307}