001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.metrics2.util; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.HashMap; 023import java.util.LinkedList; 024import java.util.ListIterator; 025import java.util.Map; 026import org.apache.yetus.audience.InterfaceAudience; 027 028/** 029 * Implementation of the Cormode, Korn, Muthukrishnan, and Srivastava algorithm for streaming 030 * calculation of targeted high-percentile epsilon-approximate quantiles. This is a generalization 031 * of the earlier work by Greenwald and Khanna (GK), which essentially allows different error bounds 032 * on the targeted quantiles, which allows for far more efficient calculation of high-percentiles. 033 * See: Cormode, Korn, Muthukrishnan, and Srivastava "Effective Computation of Biased Quantiles over 034 * Data Streams" in ICDE 2005 Greenwald and Khanna, "Space-efficient online computation of quantile 035 * summaries" in SIGMOD 2001 036 */ 037@InterfaceAudience.Private 038public class MetricSampleQuantiles { 039 040 /** 041 * Total number of items in stream 042 */ 043 private long count = 0; 044 045 /** 046 * Current list of sampled items, maintained in sorted order with error bounds 047 */ 048 private LinkedList<SampleItem> samples; 049 050 /** 051 * Buffers incoming items to be inserted in batch. Items are inserted into the buffer linearly. 052 * When the buffer fills, it is flushed into the samples array in its entirety. 053 */ 054 private long[] buffer = new long[500]; 055 private int bufferCount = 0; 056 057 /** 058 * Array of Quantiles that we care about, along with desired error. 059 */ 060 private final MetricQuantile[] quantiles; 061 062 public MetricSampleQuantiles(MetricQuantile[] quantiles) { 063 this.quantiles = Arrays.copyOf(quantiles, quantiles.length); 064 this.samples = new LinkedList<>(); 065 } 066 067 /** 068 * Specifies the allowable error for this rank, depending on which quantiles are being targeted. 069 * This is the f(r_i, n) function from the CKMS paper. It's basically how wide the range of this 070 * rank can be. n * the index in the list of samples 071 */ 072 private double allowableError(int rank) { 073 int size = samples.size(); 074 double minError = size + 1; 075 for (MetricQuantile q : quantiles) { 076 double error; 077 if (rank <= q.quantile * size) { 078 error = (2.0 * q.error * (size - rank)) / (1.0 - q.quantile); 079 } else { 080 error = (2.0 * q.error * rank) / q.quantile; 081 } 082 if (error < minError) { 083 minError = error; 084 } 085 } 086 087 return minError; 088 } 089 090 /** 091 * Add a new value from the stream. 092 * @param v the value to insert 093 */ 094 synchronized public void insert(long v) { 095 buffer[bufferCount] = v; 096 bufferCount++; 097 098 count++; 099 100 if (bufferCount == buffer.length) { 101 insertBatch(); 102 compress(); 103 } 104 } 105 106 /** 107 * Merges items from buffer into the samples array in one pass. This is more efficient than doing 108 * an insert on every item. 109 */ 110 private void insertBatch() { 111 if (bufferCount == 0) { 112 return; 113 } 114 115 Arrays.sort(buffer, 0, bufferCount); 116 117 // Base case: no samples 118 int start = 0; 119 if (samples.isEmpty()) { 120 SampleItem newItem = new SampleItem(buffer[0], 1, 0); 121 samples.add(newItem); 122 start++; 123 } 124 125 ListIterator<SampleItem> it = samples.listIterator(); 126 SampleItem item = it.next(); 127 for (int i = start; i < bufferCount; i++) { 128 long v = buffer[i]; 129 while (it.nextIndex() < samples.size() && item.value < v) { 130 item = it.next(); 131 } 132 // If we found that bigger item, back up so we insert ourselves before it 133 if (item.value > v) { 134 it.previous(); 135 } 136 // We use different indexes for the edge comparisons, because of the above 137 // if statement that adjusts the iterator 138 int delta; 139 if (it.previousIndex() == 0 || it.nextIndex() == samples.size()) { 140 delta = 0; 141 } else { 142 delta = ((int) Math.floor(allowableError(it.nextIndex()))) - 1; 143 } 144 SampleItem newItem = new SampleItem(v, 1, delta); 145 it.add(newItem); 146 item = newItem; 147 } 148 149 bufferCount = 0; 150 } 151 152 /** 153 * Try to remove extraneous items from the set of sampled items. This checks if an item is 154 * unnecessary based on the desired error bounds, and merges it with the adjacent item if it is. 155 */ 156 private void compress() { 157 if (samples.size() < 2) { 158 return; 159 } 160 161 ListIterator<SampleItem> it = samples.listIterator(); 162 SampleItem prev = null; 163 SampleItem next = it.next(); 164 165 while (it.hasNext()) { 166 prev = next; 167 next = it.next(); 168 if (prev.g + next.g + next.delta <= allowableError(it.previousIndex())) { 169 next.g += prev.g; 170 // Remove prev. it.remove() kills the last thing returned. 171 it.previous(); 172 it.previous(); 173 it.remove(); 174 // it.next() is now equal to next, skip it back forward again 175 it.next(); 176 } 177 } 178 } 179 180 /** 181 * Get the estimated value at the specified quantile. 182 * @param quantile Queried quantile, e.g. 0.50 or 0.99. 183 * @return Estimated value at that quantile. 184 */ 185 private long query(double quantile) throws IOException { 186 if (samples.isEmpty()) { 187 throw new IOException("No samples present"); 188 } 189 190 int rankMin = 0; 191 int desired = (int) (quantile * count); 192 193 for (int i = 1; i < samples.size(); i++) { 194 SampleItem prev = samples.get(i - 1); 195 SampleItem cur = samples.get(i); 196 197 rankMin += prev.g; 198 199 if (rankMin + cur.g + cur.delta > desired + (allowableError(i) / 2)) { 200 return prev.value; 201 } 202 } 203 204 // edge case of wanting max value 205 return samples.get(samples.size() - 1).value; 206 } 207 208 /** 209 * Get a snapshot of the current values of all the tracked quantiles. 210 * @return snapshot of the tracked quantiles n * if no items have been added to the estimator 211 */ 212 synchronized public Map<MetricQuantile, Long> snapshot() throws IOException { 213 // flush the buffer first for best results 214 insertBatch(); 215 Map<MetricQuantile, Long> values = new HashMap<>(quantiles.length); 216 for (int i = 0; i < quantiles.length; i++) { 217 values.put(quantiles[i], query(quantiles[i].quantile)); 218 } 219 220 return values; 221 } 222 223 /** 224 * Returns the number of items that the estimator has processed 225 * @return count total number of items processed 226 */ 227 synchronized public long getCount() { 228 return count; 229 } 230 231 /** 232 * Returns the number of samples kept by the estimator 233 * @return count current number of samples 234 */ 235 synchronized public int getSampleCount() { 236 return samples.size(); 237 } 238 239 /** 240 * Resets the estimator, clearing out all previously inserted items 241 */ 242 synchronized public void clear() { 243 count = 0; 244 bufferCount = 0; 245 samples.clear(); 246 } 247 248 /** 249 * Describes a measured value passed to the estimator, tracking additional metadata required by 250 * the CKMS algorithm. 251 */ 252 private static class SampleItem { 253 254 /** 255 * Value of the sampled item (e.g. a measured latency value) 256 */ 257 private final long value; 258 259 /** 260 * Difference between the lowest possible rank of the previous item, and the lowest possible 261 * rank of this item. The sum of the g of all previous items yields this item's lower bound. 262 */ 263 private int g; 264 265 /** 266 * Difference between the item's greatest possible rank and lowest possible rank. 267 */ 268 private final int delta; 269 270 public SampleItem(long value, int lowerDelta, int delta) { 271 this.value = value; 272 this.g = lowerDelta; 273 this.delta = delta; 274 } 275 276 @Override 277 public String toString() { 278 return String.format("%d, %d, %d", value, g, delta); 279 } 280 } 281}