001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.util; 021 022import java.util.Map; 023import java.util.Set; 024import java.util.concurrent.ConcurrentHashMap; 025 026import org.apache.hadoop.hbase.HBaseConfiguration; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032 033 034/** 035 * LossyCounting utility, bounded data structure that maintains approximate high frequency 036 * elements in data stream. 037 * 038 * Bucket size is 1 / error rate. (Error rate is 0.02 by default) 039 * Lemma If element does not appear in set, then is frequency is less than e * N 040 * (N is total element counts until now.) 041 * Based on paper: 042 * http://www.vldb.org/conf/2002/S10P03.pdf 043 */ 044 045@InterfaceAudience.Private 046public class LossyCounting { 047 private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class); 048 private long bucketSize; 049 private int currentTerm; 050 private double errorRate; 051 private Map<String, Integer> data; 052 private long totalDataCount; 053 private String name; 054 private LossyCountingListener listener; 055 056 public interface LossyCountingListener { 057 void sweep(String key); 058 } 059 060 public LossyCounting(double errorRate, String name, LossyCountingListener listener) { 061 this.errorRate = errorRate; 062 this.name = name; 063 if (errorRate < 0.0 || errorRate > 1.0) { 064 throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]"); 065 } 066 this.bucketSize = (long) Math.ceil(1 / errorRate); 067 this.currentTerm = 1; 068 this.totalDataCount = 0; 069 this.data = new ConcurrentHashMap<>(); 070 this.listener = listener; 071 calculateCurrentTerm(); 072 } 073 074 public LossyCounting(String name, LossyCountingListener listener) { 075 this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02), 076 name, listener); 077 } 078 079 private void addByOne(String key) { 080 //If entry exists, we update the entry by incrementing its frequency by one. Otherwise, 081 //we create a new entry starting with currentTerm so that it will not be pruned immediately 082 data.put(key, data.getOrDefault(key, currentTerm != 0 ? currentTerm - 1 : 0) + 1); 083 084 //update totalDataCount and term 085 totalDataCount++; 086 calculateCurrentTerm(); 087 } 088 089 public void add(String key) { 090 addByOne(key); 091 if(totalDataCount % bucketSize == 0) { 092 //sweep the entries at bucket boundaries 093 sweep(); 094 } 095 } 096 097 098 /** 099 * sweep low frequency data 100 * @return Names of elements got swept 101 */ 102 private void sweep() { 103 for(Map.Entry<String, Integer> entry : data.entrySet()) { 104 if(entry.getValue() < currentTerm) { 105 String metric = entry.getKey(); 106 data.remove(metric); 107 if (listener != null) { 108 listener.sweep(metric); 109 } 110 } 111 } 112 } 113 114 /** 115 * Calculate and set current term 116 */ 117 private void calculateCurrentTerm() { 118 this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / (double) bucketSize); 119 } 120 121 public long getBucketSize(){ 122 return bucketSize; 123 } 124 125 public long getDataSize() { 126 return data.size(); 127 } 128 129 public boolean contains(String key) { 130 return data.containsKey(key); 131 } 132 133 public Set<String> getElements(){ 134 return data.keySet(); 135 } 136 137 public long getCurrentTerm() { 138 return currentTerm; 139 } 140} 141