001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.metrics2.util; 020 021import java.io.IOException; 022import java.util.Arrays; 023import java.util.HashMap; 024import java.util.LinkedList; 025import java.util.ListIterator; 026import java.util.Map; 027 028import org.apache.yetus.audience.InterfaceAudience; 029 030/** 031 * Implementation of the Cormode, Korn, Muthukrishnan, and Srivastava algorithm 032 * for streaming calculation of targeted high-percentile epsilon-approximate 033 * quantiles. 034 * 035 * This is a generalization of the earlier work by Greenwald and Khanna (GK), 036 * which essentially allows different error bounds on the targeted quantiles, 037 * which allows for far more efficient calculation of high-percentiles. 038 * 039 * See: Cormode, Korn, Muthukrishnan, and Srivastava 040 * "Effective Computation of Biased Quantiles over Data Streams" in ICDE 2005 041 * 042 * Greenwald and Khanna, 043 * "Space-efficient online computation of quantile summaries" in SIGMOD 2001 044 * 045 */ 046@InterfaceAudience.Private 047public class MetricSampleQuantiles { 048 049 /** 050 * Total number of items in stream 051 */ 052 private long count = 0; 053 054 /** 055 * Current list of sampled items, maintained in sorted order with error bounds 056 */ 057 private LinkedList<SampleItem> samples; 058 059 /** 060 * Buffers incoming items to be inserted in batch. Items are inserted into 061 * the buffer linearly. When the buffer fills, it is flushed into the samples 062 * array in its entirety. 063 */ 064 private long[] buffer = new long[500]; 065 private int bufferCount = 0; 066 067 /** 068 * Array of Quantiles that we care about, along with desired error. 069 */ 070 private final MetricQuantile[] quantiles; 071 072 public MetricSampleQuantiles(MetricQuantile[] quantiles) { 073 this.quantiles = Arrays.copyOf(quantiles, quantiles.length); 074 this.samples = new LinkedList<>(); 075 } 076 077 /** 078 * Specifies the allowable error for this rank, depending on which quantiles 079 * are being targeted. 080 * 081 * This is the f(r_i, n) function from the CKMS paper. It's basically how wide 082 * the range of this rank can be. 083 * 084 * @param rank 085 * the index in the list of samples 086 */ 087 private double allowableError(int rank) { 088 int size = samples.size(); 089 double minError = size + 1; 090 for (MetricQuantile q : quantiles) { 091 double error; 092 if (rank <= q.quantile * size) { 093 error = (2.0 * q.error * (size - rank)) / (1.0 - q.quantile); 094 } else { 095 error = (2.0 * q.error * rank) / q.quantile; 096 } 097 if (error < minError) { 098 minError = error; 099 } 100 } 101 102 return minError; 103 } 104 105 /** 106 * Add a new value from the stream. 107 * 108 * @param v the value to insert 109 */ 110 synchronized public void insert(long v) { 111 buffer[bufferCount] = v; 112 bufferCount++; 113 114 count++; 115 116 if (bufferCount == buffer.length) { 117 insertBatch(); 118 compress(); 119 } 120 } 121 122 /** 123 * Merges items from buffer into the samples array in one pass. 124 * This is more efficient than doing an insert on every item. 125 */ 126 private void insertBatch() { 127 if (bufferCount == 0) { 128 return; 129 } 130 131 Arrays.sort(buffer, 0, bufferCount); 132 133 // Base case: no samples 134 int start = 0; 135 if (samples.isEmpty()) { 136 SampleItem newItem = new SampleItem(buffer[0], 1, 0); 137 samples.add(newItem); 138 start++; 139 } 140 141 ListIterator<SampleItem> it = samples.listIterator(); 142 SampleItem item = it.next(); 143 for (int i = start; i < bufferCount; i++) { 144 long v = buffer[i]; 145 while (it.nextIndex() < samples.size() && item.value < v) { 146 item = it.next(); 147 } 148 // If we found that bigger item, back up so we insert ourselves before it 149 if (item.value > v) { 150 it.previous(); 151 } 152 // We use different indexes for the edge comparisons, because of the above 153 // if statement that adjusts the iterator 154 int delta; 155 if (it.previousIndex() == 0 || it.nextIndex() == samples.size()) { 156 delta = 0; 157 } else { 158 delta = ((int) Math.floor(allowableError(it.nextIndex()))) - 1; 159 } 160 SampleItem newItem = new SampleItem(v, 1, delta); 161 it.add(newItem); 162 item = newItem; 163 } 164 165 bufferCount = 0; 166 } 167 168 /** 169 * Try to remove extraneous items from the set of sampled items. This checks 170 * if an item is unnecessary based on the desired error bounds, and merges it 171 * with the adjacent item if it is. 172 */ 173 private void compress() { 174 if (samples.size() < 2) { 175 return; 176 } 177 178 ListIterator<SampleItem> it = samples.listIterator(); 179 SampleItem prev = null; 180 SampleItem next = it.next(); 181 182 while (it.hasNext()) { 183 prev = next; 184 next = it.next(); 185 if (prev.g + next.g + next.delta <= allowableError(it.previousIndex())) { 186 next.g += prev.g; 187 // Remove prev. it.remove() kills the last thing returned. 188 it.previous(); 189 it.previous(); 190 it.remove(); 191 // it.next() is now equal to next, skip it back forward again 192 it.next(); 193 } 194 } 195 } 196 197 /** 198 * Get the estimated value at the specified quantile. 199 * 200 * @param quantile Queried quantile, e.g. 0.50 or 0.99. 201 * @return Estimated value at that quantile. 202 */ 203 private long query(double quantile) throws IOException { 204 if (samples.isEmpty()) { 205 throw new IOException("No samples present"); 206 } 207 208 int rankMin = 0; 209 int desired = (int) (quantile * count); 210 211 for (int i = 1; i < samples.size(); i++) { 212 SampleItem prev = samples.get(i - 1); 213 SampleItem cur = samples.get(i); 214 215 rankMin += prev.g; 216 217 if (rankMin + cur.g + cur.delta > desired + (allowableError(i) / 2)) { 218 return prev.value; 219 } 220 } 221 222 // edge case of wanting max value 223 return samples.get(samples.size() - 1).value; 224 } 225 226 /** 227 * Get a snapshot of the current values of all the tracked quantiles. 228 * 229 * @return snapshot of the tracked quantiles 230 * @throws IOException 231 * if no items have been added to the estimator 232 */ 233 synchronized public Map<MetricQuantile, Long> snapshot() throws IOException { 234 // flush the buffer first for best results 235 insertBatch(); 236 Map<MetricQuantile, Long> values = new HashMap<>(quantiles.length); 237 for (int i = 0; i < quantiles.length; i++) { 238 values.put(quantiles[i], query(quantiles[i].quantile)); 239 } 240 241 return values; 242 } 243 244 /** 245 * Returns the number of items that the estimator has processed 246 * 247 * @return count total number of items processed 248 */ 249 synchronized public long getCount() { 250 return count; 251 } 252 253 /** 254 * Returns the number of samples kept by the estimator 255 * 256 * @return count current number of samples 257 */ 258 synchronized public int getSampleCount() { 259 return samples.size(); 260 } 261 262 /** 263 * Resets the estimator, clearing out all previously inserted items 264 */ 265 synchronized public void clear() { 266 count = 0; 267 bufferCount = 0; 268 samples.clear(); 269 } 270 271 /** 272 * Describes a measured value passed to the estimator, tracking additional 273 * metadata required by the CKMS algorithm. 274 */ 275 private static class SampleItem { 276 277 /** 278 * Value of the sampled item (e.g. a measured latency value) 279 */ 280 private final long value; 281 282 /** 283 * Difference between the lowest possible rank of the previous item, and 284 * the lowest possible rank of this item. 285 * 286 * The sum of the g of all previous items yields this item's lower bound. 287 */ 288 private int g; 289 290 /** 291 * Difference between the item's greatest possible rank and lowest possible 292 * rank. 293 */ 294 private final int delta; 295 296 public SampleItem(long value, int lowerDelta, int delta) { 297 this.value = value; 298 this.g = lowerDelta; 299 this.delta = delta; 300 } 301 302 @Override 303 public String toString() { 304 return String.format("%d, %d, %d", value, g, delta); 305 } 306 } 307}