001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.metrics2.util; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.HashMap; 023import java.util.LinkedList; 024import java.util.ListIterator; 025import java.util.Map; 026import org.apache.yetus.audience.InterfaceAudience; 027 028/** 029 * Implementation of the Cormode, Korn, Muthukrishnan, and Srivastava algorithm for streaming 030 * calculation of targeted high-percentile epsilon-approximate quantiles. This is a generalization 031 * of the earlier work by Greenwald and Khanna (GK), which essentially allows different error bounds 032 * on the targeted quantiles, which allows for far more efficient calculation of high-percentiles. 033 * See: Cormode, Korn, Muthukrishnan, and Srivastava "Effective Computation of Biased Quantiles over 034 * Data Streams" in ICDE 2005 Greenwald and Khanna, "Space-efficient online computation of quantile 035 * summaries" in SIGMOD 2001 036 */ 037@InterfaceAudience.Private 038@SuppressWarnings("JdkObsolete") // This is a use case for LinkedList 039public class MetricSampleQuantiles { 040 041 /** 042 * Total number of items in stream 043 */ 044 private long count = 0; 045 046 /** 047 * Current list of sampled items, maintained in sorted order with error bounds 048 */ 049 private LinkedList<SampleItem> samples; 050 051 /** 052 * Buffers incoming items to be inserted in batch. Items are inserted into the buffer linearly. 053 * When the buffer fills, it is flushed into the samples array in its entirety. 054 */ 055 private long[] buffer = new long[500]; 056 private int bufferCount = 0; 057 058 /** 059 * Array of Quantiles that we care about, along with desired error. 060 */ 061 private final MetricQuantile[] quantiles; 062 063 public MetricSampleQuantiles(MetricQuantile[] quantiles) { 064 this.quantiles = Arrays.copyOf(quantiles, quantiles.length); 065 this.samples = new LinkedList<>(); 066 } 067 068 /** 069 * Specifies the allowable error for this rank, depending on which quantiles are being targeted. 070 * This is the f(r_i, n) function from the CKMS paper. It's basically how wide the range of this 071 * rank can be. n * the index in the list of samples 072 */ 073 private double allowableError(int rank) { 074 int size = samples.size(); 075 double minError = size + 1; 076 for (MetricQuantile q : quantiles) { 077 double error; 078 if (rank <= q.quantile * size) { 079 error = (2.0 * q.error * (size - rank)) / (1.0 - q.quantile); 080 } else { 081 error = (2.0 * q.error * rank) / q.quantile; 082 } 083 if (error < minError) { 084 minError = error; 085 } 086 } 087 088 return minError; 089 } 090 091 /** 092 * Add a new value from the stream. 093 * @param v the value to insert 094 */ 095 synchronized public void insert(long v) { 096 buffer[bufferCount] = v; 097 bufferCount++; 098 099 count++; 100 101 if (bufferCount == buffer.length) { 102 insertBatch(); 103 compress(); 104 } 105 } 106 107 /** 108 * Merges items from buffer into the samples array in one pass. This is more efficient than doing 109 * an insert on every item. 110 */ 111 private void insertBatch() { 112 if (bufferCount == 0) { 113 return; 114 } 115 116 Arrays.sort(buffer, 0, bufferCount); 117 118 // Base case: no samples 119 int start = 0; 120 if (samples.isEmpty()) { 121 SampleItem newItem = new SampleItem(buffer[0], 1, 0); 122 samples.add(newItem); 123 start++; 124 } 125 126 ListIterator<SampleItem> it = samples.listIterator(); 127 SampleItem item = it.next(); 128 for (int i = start; i < bufferCount; i++) { 129 long v = buffer[i]; 130 while (it.nextIndex() < samples.size() && item.value < v) { 131 item = it.next(); 132 } 133 // If we found that bigger item, back up so we insert ourselves before it 134 if (item.value > v) { 135 it.previous(); 136 } 137 // We use different indexes for the edge comparisons, because of the above 138 // if statement that adjusts the iterator 139 int delta; 140 if (it.previousIndex() == 0 || it.nextIndex() == samples.size()) { 141 delta = 0; 142 } else { 143 delta = ((int) Math.floor(allowableError(it.nextIndex()))) - 1; 144 } 145 SampleItem newItem = new SampleItem(v, 1, delta); 146 it.add(newItem); 147 item = newItem; 148 } 149 150 bufferCount = 0; 151 } 152 153 /** 154 * Try to remove extraneous items from the set of sampled items. This checks if an item is 155 * unnecessary based on the desired error bounds, and merges it with the adjacent item if it is. 156 */ 157 private void compress() { 158 if (samples.size() < 2) { 159 return; 160 } 161 162 ListIterator<SampleItem> it = samples.listIterator(); 163 SampleItem prev = null; 164 SampleItem next = it.next(); 165 166 while (it.hasNext()) { 167 prev = next; 168 next = it.next(); 169 if (prev.g + next.g + next.delta <= allowableError(it.previousIndex())) { 170 next.g += prev.g; 171 // Remove prev. it.remove() kills the last thing returned. 172 it.previous(); 173 it.previous(); 174 it.remove(); 175 // it.next() is now equal to next, skip it back forward again 176 it.next(); 177 } 178 } 179 } 180 181 /** 182 * Get the estimated value at the specified quantile. 183 * @param quantile Queried quantile, e.g. 0.50 or 0.99. 184 * @return Estimated value at that quantile. 185 */ 186 private long query(double quantile) throws IOException { 187 if (samples.isEmpty()) { 188 throw new IOException("No samples present"); 189 } 190 191 int rankMin = 0; 192 int desired = (int) (quantile * count); 193 194 for (int i = 1; i < samples.size(); i++) { 195 SampleItem prev = samples.get(i - 1); 196 SampleItem cur = samples.get(i); 197 198 rankMin += prev.g; 199 200 if (rankMin + cur.g + cur.delta > desired + (allowableError(i) / 2)) { 201 return prev.value; 202 } 203 } 204 205 // edge case of wanting max value 206 return samples.get(samples.size() - 1).value; 207 } 208 209 /** 210 * Get a snapshot of the current values of all the tracked quantiles. 211 * @return snapshot of the tracked quantiles n * if no items have been added to the estimator 212 */ 213 synchronized public Map<MetricQuantile, Long> snapshot() throws IOException { 214 // flush the buffer first for best results 215 insertBatch(); 216 Map<MetricQuantile, Long> values = new HashMap<>(quantiles.length); 217 for (int i = 0; i < quantiles.length; i++) { 218 values.put(quantiles[i], query(quantiles[i].quantile)); 219 } 220 221 return values; 222 } 223 224 /** 225 * Returns the number of items that the estimator has processed 226 * @return count total number of items processed 227 */ 228 synchronized public long getCount() { 229 return count; 230 } 231 232 /** 233 * Returns the number of samples kept by the estimator 234 * @return count current number of samples 235 */ 236 synchronized public int getSampleCount() { 237 return samples.size(); 238 } 239 240 /** 241 * Resets the estimator, clearing out all previously inserted items 242 */ 243 synchronized public void clear() { 244 count = 0; 245 bufferCount = 0; 246 samples.clear(); 247 } 248 249 /** 250 * Describes a measured value passed to the estimator, tracking additional metadata required by 251 * the CKMS algorithm. 252 */ 253 private static class SampleItem { 254 255 /** 256 * Value of the sampled item (e.g. a measured latency value) 257 */ 258 private final long value; 259 260 /** 261 * Difference between the lowest possible rank of the previous item, and the lowest possible 262 * rank of this item. The sum of the g of all previous items yields this item's lower bound. 263 */ 264 private int g; 265 266 /** 267 * Difference between the item's greatest possible rank and lowest possible rank. 268 */ 269 private final int delta; 270 271 public SampleItem(long value, int lowerDelta, int delta) { 272 this.value = value; 273 this.g = lowerDelta; 274 this.delta = delta; 275 } 276 277 @Override 278 public String toString() { 279 return String.format("%d, %d, %d", value, g, delta); 280 } 281 } 282}