001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.util;
021
022import java.util.Map;
023import java.util.Set;
024import java.util.concurrent.ConcurrentHashMap;
025
026import org.apache.hadoop.hbase.HBaseConfiguration;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032
033
034/**
035 * LossyCounting utility, bounded data structure that maintains approximate high frequency
036 * elements in data stream.
037 *
038 * Bucket size is 1 / error rate.  (Error rate is 0.02 by default)
039 * Lemma If element does not appear in set, then is frequency is less than e * N
040 *       (N is total element counts until now.)
041 * Based on paper:
042 * http://www.vldb.org/conf/2002/S10P03.pdf
043 */
044
045@InterfaceAudience.Private
046public class LossyCounting {
047  private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class);
048  private long bucketSize;
049  private int currentTerm;
050  private double errorRate;
051  private Map<String, Integer> data;
052  private long totalDataCount;
053  private String name;
054  private LossyCountingListener listener;
055
056  public interface LossyCountingListener {
057    void sweep(String key);
058  }
059
060  public LossyCounting(double errorRate, String name, LossyCountingListener listener) {
061    this.errorRate = errorRate;
062    this.name = name;
063    if (errorRate < 0.0 || errorRate > 1.0) {
064      throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]");
065    }
066    this.bucketSize = (long) Math.ceil(1 / errorRate);
067    this.currentTerm = 1;
068    this.totalDataCount = 0;
069    this.data = new ConcurrentHashMap<>();
070    this.listener = listener;
071    calculateCurrentTerm();
072  }
073
074  public LossyCounting(String name, LossyCountingListener listener) {
075    this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02),
076        name, listener);
077  }
078
079  private void addByOne(String key) {
080    //If entry exists, we update the entry by incrementing its frequency by one. Otherwise,
081    //we create a new entry starting with currentTerm so that it will not be pruned immediately
082    data.put(key, data.getOrDefault(key, currentTerm != 0 ? currentTerm - 1 : 0) + 1);
083
084    //update totalDataCount and term
085    totalDataCount++;
086    calculateCurrentTerm();
087  }
088
089  public void add(String key) {
090    addByOne(key);
091    if(totalDataCount % bucketSize == 0) {
092      //sweep the entries at bucket boundaries
093      sweep();
094    }
095  }
096
097
098  /**
099   * sweep low frequency data
100   * @return Names of elements got swept
101   */
102  private void sweep() {
103    for(Map.Entry<String, Integer> entry : data.entrySet()) {
104      if(entry.getValue() < currentTerm) {
105        String metric = entry.getKey();
106        data.remove(metric);
107        if (listener != null) {
108          listener.sweep(metric);
109        }
110      }
111    }
112  }
113
114  /**
115   * Calculate and set current term
116   */
117  private void calculateCurrentTerm() {
118    this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / (double) bucketSize);
119  }
120
121  public long getBucketSize(){
122    return bucketSize;
123  }
124
125  public long getDataSize() {
126    return data.size();
127  }
128
129  public boolean contains(String key) {
130    return data.containsKey(key);
131  }
132
133  public Set<String> getElements(){
134    return data.keySet();
135  }
136
137  public long getCurrentTerm() {
138    return currentTerm;
139  }
140}
141