001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.metrics2.util; 020 021import java.io.IOException; 022import java.util.Arrays; 023import java.util.HashMap; 024import java.util.LinkedList; 025import java.util.ListIterator; 026import java.util.Map; 027 028import org.apache.yetus.audience.InterfaceAudience; 029 030import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 031 032/** 033 * Implementation of the Cormode, Korn, Muthukrishnan, and Srivastava algorithm 034 * for streaming calculation of targeted high-percentile epsilon-approximate 035 * quantiles. 036 * 037 * This is a generalization of the earlier work by Greenwald and Khanna (GK), 038 * which essentially allows different error bounds on the targeted quantiles, 039 * which allows for far more efficient calculation of high-percentiles. 040 * 041 * See: Cormode, Korn, Muthukrishnan, and Srivastava 042 * "Effective Computation of Biased Quantiles over Data Streams" in ICDE 2005 043 * 044 * Greenwald and Khanna, 045 * "Space-efficient online computation of quantile summaries" in SIGMOD 2001 046 * 047 */ 048@InterfaceAudience.Private 049public class MetricSampleQuantiles { 050 051 /** 052 * Total number of items in stream 053 */ 054 private long count = 0; 055 056 /** 057 * Current list of sampled items, maintained in sorted order with error bounds 058 */ 059 private LinkedList<SampleItem> samples; 060 061 /** 062 * Buffers incoming items to be inserted in batch. Items are inserted into 063 * the buffer linearly. When the buffer fills, it is flushed into the samples 064 * array in its entirety. 065 */ 066 private long[] buffer = new long[500]; 067 private int bufferCount = 0; 068 069 /** 070 * Array of Quantiles that we care about, along with desired error. 071 */ 072 private final MetricQuantile[] quantiles; 073 074 public MetricSampleQuantiles(MetricQuantile[] quantiles) { 075 this.quantiles = Arrays.copyOf(quantiles, quantiles.length); 076 this.samples = new LinkedList<>(); 077 } 078 079 /** 080 * Specifies the allowable error for this rank, depending on which quantiles 081 * are being targeted. 082 * 083 * This is the f(r_i, n) function from the CKMS paper. It's basically how wide 084 * the range of this rank can be. 085 * 086 * @param rank 087 * the index in the list of samples 088 */ 089 private double allowableError(int rank) { 090 int size = samples.size(); 091 double minError = size + 1; 092 for (MetricQuantile q : quantiles) { 093 double error; 094 if (rank <= q.quantile * size) { 095 error = (2.0 * q.error * (size - rank)) / (1.0 - q.quantile); 096 } else { 097 error = (2.0 * q.error * rank) / q.quantile; 098 } 099 if (error < minError) { 100 minError = error; 101 } 102 } 103 104 return minError; 105 } 106 107 /** 108 * Add a new value from the stream. 109 * 110 * @param v the value to insert 111 */ 112 synchronized public void insert(long v) { 113 buffer[bufferCount] = v; 114 bufferCount++; 115 116 count++; 117 118 if (bufferCount == buffer.length) { 119 insertBatch(); 120 compress(); 121 } 122 } 123 124 /** 125 * Merges items from buffer into the samples array in one pass. 126 * This is more efficient than doing an insert on every item. 127 */ 128 private void insertBatch() { 129 if (bufferCount == 0) { 130 return; 131 } 132 133 Arrays.sort(buffer, 0, bufferCount); 134 135 // Base case: no samples 136 int start = 0; 137 if (samples.isEmpty()) { 138 SampleItem newItem = new SampleItem(buffer[0], 1, 0); 139 samples.add(newItem); 140 start++; 141 } 142 143 ListIterator<SampleItem> it = samples.listIterator(); 144 SampleItem item = it.next(); 145 for (int i = start; i < bufferCount; i++) { 146 long v = buffer[i]; 147 while (it.nextIndex() < samples.size() && item.value < v) { 148 item = it.next(); 149 } 150 // If we found that bigger item, back up so we insert ourselves before it 151 if (item.value > v) { 152 it.previous(); 153 } 154 // We use different indexes for the edge comparisons, because of the above 155 // if statement that adjusts the iterator 156 int delta; 157 if (it.previousIndex() == 0 || it.nextIndex() == samples.size()) { 158 delta = 0; 159 } else { 160 delta = ((int) Math.floor(allowableError(it.nextIndex()))) - 1; 161 } 162 SampleItem newItem = new SampleItem(v, 1, delta); 163 it.add(newItem); 164 item = newItem; 165 } 166 167 bufferCount = 0; 168 } 169 170 /** 171 * Try to remove extraneous items from the set of sampled items. This checks 172 * if an item is unnecessary based on the desired error bounds, and merges it 173 * with the adjacent item if it is. 174 */ 175 private void compress() { 176 if (samples.size() < 2) { 177 return; 178 } 179 180 ListIterator<SampleItem> it = samples.listIterator(); 181 SampleItem prev = null; 182 SampleItem next = it.next(); 183 184 while (it.hasNext()) { 185 prev = next; 186 next = it.next(); 187 if (prev.g + next.g + next.delta <= allowableError(it.previousIndex())) { 188 next.g += prev.g; 189 // Remove prev. it.remove() kills the last thing returned. 190 it.previous(); 191 it.previous(); 192 it.remove(); 193 // it.next() is now equal to next, skip it back forward again 194 it.next(); 195 } 196 } 197 } 198 199 /** 200 * Get the estimated value at the specified quantile. 201 * 202 * @param quantile Queried quantile, e.g. 0.50 or 0.99. 203 * @return Estimated value at that quantile. 204 */ 205 private long query(double quantile) throws IOException { 206 if (samples.isEmpty()) { 207 throw new IOException("No samples present"); 208 } 209 210 int rankMin = 0; 211 int desired = (int) (quantile * count); 212 213 for (int i = 1; i < samples.size(); i++) { 214 SampleItem prev = samples.get(i - 1); 215 SampleItem cur = samples.get(i); 216 217 rankMin += prev.g; 218 219 if (rankMin + cur.g + cur.delta > desired + (allowableError(i) / 2)) { 220 return prev.value; 221 } 222 } 223 224 // edge case of wanting max value 225 return samples.get(samples.size() - 1).value; 226 } 227 228 /** 229 * Get a snapshot of the current values of all the tracked quantiles. 230 * 231 * @return snapshot of the tracked quantiles 232 * @throws IOException 233 * if no items have been added to the estimator 234 */ 235 synchronized public Map<MetricQuantile, Long> snapshot() throws IOException { 236 // flush the buffer first for best results 237 insertBatch(); 238 Map<MetricQuantile, Long> values = new HashMap<>(quantiles.length); 239 for (int i = 0; i < quantiles.length; i++) { 240 values.put(quantiles[i], query(quantiles[i].quantile)); 241 } 242 243 return values; 244 } 245 246 /** 247 * Returns the number of items that the estimator has processed 248 * 249 * @return count total number of items processed 250 */ 251 synchronized public long getCount() { 252 return count; 253 } 254 255 /** 256 * Returns the number of samples kept by the estimator 257 * 258 * @return count current number of samples 259 */ 260 @VisibleForTesting 261 synchronized public int getSampleCount() { 262 return samples.size(); 263 } 264 265 /** 266 * Resets the estimator, clearing out all previously inserted items 267 */ 268 synchronized public void clear() { 269 count = 0; 270 bufferCount = 0; 271 samples.clear(); 272 } 273 274 /** 275 * Describes a measured value passed to the estimator, tracking additional 276 * metadata required by the CKMS algorithm. 277 */ 278 private static class SampleItem { 279 280 /** 281 * Value of the sampled item (e.g. a measured latency value) 282 */ 283 private final long value; 284 285 /** 286 * Difference between the lowest possible rank of the previous item, and 287 * the lowest possible rank of this item. 288 * 289 * The sum of the g of all previous items yields this item's lower bound. 290 */ 291 private int g; 292 293 /** 294 * Difference between the item's greatest possible rank and lowest possible 295 * rank. 296 */ 297 private final int delta; 298 299 public SampleItem(long value, int lowerDelta, int delta) { 300 this.value = value; 301 this.g = lowerDelta; 302 this.delta = delta; 303 } 304 305 @Override 306 public String toString() { 307 return String.format("%d, %d, %d", value, g, delta); 308 } 309 } 310}