001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.util; 021 022import java.util.HashSet; 023import java.util.Map; 024import java.util.Set; 025import java.util.concurrent.ConcurrentHashMap; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HBaseConfiguration; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034 035 036/** 037 * LossyCounting utility, bounded data structure that maintains approximate high frequency 038 * elements in data stream. 039 * 040 * Bucket size is 1 / error rate. (Error rate is 0.02 by default) 041 * Lemma If element does not appear in set, then is frequency is less than e * N 042 * (N is total element counts until now.) 043 * Based on paper: 044 * http://www.vldb.org/conf/2002/S10P03.pdf 045 */ 046 047@InterfaceAudience.Public 048public class LossyCounting { 049 private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class); 050 private long bucketSize; 051 private long currentTerm; 052 private double errorRate; 053 private Map<String, Integer> data; 054 private long totalDataCount; 055 056 public LossyCounting(double errorRate) { 057 this.errorRate = errorRate; 058 if (errorRate < 0.0 || errorRate > 1.0) { 059 throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]"); 060 } 061 this.bucketSize = (long) Math.ceil(1 / errorRate); 062 this.currentTerm = 1; 063 this.totalDataCount = 0; 064 this.errorRate = errorRate; 065 this.data = new ConcurrentHashMap<>(); 066 calculateCurrentTerm(); 067 } 068 069 public LossyCounting() { 070 Configuration conf = HBaseConfiguration.create(); 071 this.errorRate = conf.getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02); 072 this.bucketSize = (long) Math.ceil(1.0 / errorRate); 073 this.currentTerm = 1; 074 this.totalDataCount = 0; 075 this.data = new ConcurrentHashMap<>(); 076 calculateCurrentTerm(); 077 } 078 079 public Set<String> addByOne(String key) { 080 if(data.containsKey(key)) { 081 data.put(key, data.get(key) +1); 082 } else { 083 data.put(key, 1); 084 } 085 totalDataCount++; 086 calculateCurrentTerm(); 087 Set<String> dataToBeSwept = new HashSet<>(); 088 if(totalDataCount % bucketSize == 0) { 089 dataToBeSwept = sweep(); 090 } 091 return dataToBeSwept; 092 } 093 094 /** 095 * sweep low frequency data 096 * @return Names of elements got swept 097 */ 098 private Set<String> sweep() { 099 Set<String> dataToBeSwept = new HashSet<>(); 100 for(Map.Entry<String, Integer> entry : data.entrySet()) { 101 if(entry.getValue() + errorRate < currentTerm) { 102 dataToBeSwept.add(entry.getKey()); 103 } 104 } 105 for(String key : dataToBeSwept) { 106 data.remove(key); 107 } 108 LOG.debug(String.format("Swept %d of elements.", dataToBeSwept.size())); 109 return dataToBeSwept; 110 } 111 112 /** 113 * Calculate and set current term 114 */ 115 private void calculateCurrentTerm() { 116 this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / bucketSize); 117 } 118 119 public long getBuketSize(){ 120 return bucketSize; 121 } 122 123 public long getDataSize() { 124 return data.size(); 125 } 126 127 public boolean contains(String key) { 128 return data.containsKey(key); 129 } 130 131 public long getCurrentTerm() { 132 return currentTerm; 133 } 134} 135