001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.util;
021
022import java.util.HashSet;
023import java.util.Map;
024import java.util.Set;
025import java.util.concurrent.ConcurrentHashMap;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HBaseConfiguration;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034
035
036/**
037 * LossyCounting utility, bounded data structure that maintains approximate high frequency
038 * elements in data stream.
039 *
040 * Bucket size is 1 / error rate.  (Error rate is 0.02 by default)
041 * Lemma If element does not appear in set, then is frequency is less than e * N
042 *       (N is total element counts until now.)
043 * Based on paper:
044 * http://www.vldb.org/conf/2002/S10P03.pdf
045 */
046
047@InterfaceAudience.Public
048public class LossyCounting {
049  private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class);
050  private long bucketSize;
051  private long currentTerm;
052  private double errorRate;
053  private Map<String, Integer> data;
054  private long totalDataCount;
055
056  public LossyCounting(double errorRate) {
057    this.errorRate = errorRate;
058    if (errorRate < 0.0 || errorRate > 1.0) {
059      throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]");
060    }
061    this.bucketSize = (long) Math.ceil(1 / errorRate);
062    this.currentTerm = 1;
063    this.totalDataCount = 0;
064    this.errorRate = errorRate;
065    this.data = new ConcurrentHashMap<>();
066    calculateCurrentTerm();
067  }
068
069  public LossyCounting() {
070    Configuration conf = HBaseConfiguration.create();
071    this.errorRate = conf.getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02);
072    this.bucketSize = (long) Math.ceil(1.0 / errorRate);
073    this.currentTerm = 1;
074    this.totalDataCount = 0;
075    this.data = new ConcurrentHashMap<>();
076    calculateCurrentTerm();
077  }
078
079  public Set<String> addByOne(String key) {
080    if(data.containsKey(key)) {
081      data.put(key, data.get(key) +1);
082    } else {
083      data.put(key, 1);
084    }
085    totalDataCount++;
086    calculateCurrentTerm();
087    Set<String> dataToBeSwept = new HashSet<>();
088    if(totalDataCount % bucketSize == 0) {
089      dataToBeSwept = sweep();
090    }
091    return dataToBeSwept;
092  }
093
094  /**
095   * sweep low frequency data
096   * @return Names of elements got swept
097   */
098  private Set<String> sweep() {
099    Set<String> dataToBeSwept = new HashSet<>();
100    for(Map.Entry<String, Integer> entry : data.entrySet()) {
101      if(entry.getValue() + errorRate < currentTerm) {
102        dataToBeSwept.add(entry.getKey());
103      }
104    }
105    for(String key : dataToBeSwept) {
106      data.remove(key);
107    }
108    LOG.debug(String.format("Swept %d of elements.", dataToBeSwept.size()));
109    return dataToBeSwept;
110  }
111
112  /**
113   * Calculate and set current term
114   */
115  private void calculateCurrentTerm() {
116    this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / bucketSize);
117  }
118
119  public long getBuketSize(){
120    return bucketSize;
121  }
122
123  public long getDataSize() {
124    return data.size();
125  }
126
127  public boolean contains(String key) {
128    return data.containsKey(key);
129  }
130
131  public long getCurrentTerm() {
132    return currentTerm;
133  }
134}
135