001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.util; 021 022import java.util.Map; 023import java.util.Set; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.ExecutorService; 026import java.util.concurrent.Executors; 027import java.util.concurrent.Future; 028import java.util.concurrent.atomic.AtomicReference; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 035import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 036 037/** 038 * LossyCounting utility, bounded data structure that maintains approximate high frequency 039 * elements in data stream. 040 * 041 * Bucket size is 1 / error rate. (Error rate is 0.02 by default) 042 * Lemma If element does not appear in set, then is frequency is less than e * N 043 * (N is total element counts until now.) 044 * Based on paper: 045 * http://www.vldb.org/conf/2002/S10P03.pdf 046 */ 047@InterfaceAudience.Private 048public class LossyCounting<T> { 049 private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class); 050 private final ExecutorService executor; 051 private long bucketSize; 052 private int currentTerm; 053 private Map<T, Integer> data; 054 private long totalDataCount; 055 private final String name; 056 private LossyCountingListener<T> listener; 057 private static AtomicReference<Future<?>> fut = new AtomicReference<>(null); 058 059 public interface LossyCountingListener<T> { 060 void sweep(T key); 061 } 062 063 LossyCounting(String name, double errorRate) { 064 this(name, errorRate, null); 065 } 066 067 public LossyCounting(String name, double errorRate, LossyCountingListener<T> listener) { 068 this.name = name; 069 if (errorRate < 0.0 || errorRate > 1.0) { 070 throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]"); 071 } 072 this.bucketSize = (long) Math.ceil(1 / errorRate); 073 this.currentTerm = 1; 074 this.totalDataCount = 0; 075 this.data = new ConcurrentHashMap<>(); 076 this.listener = listener; 077 calculateCurrentTerm(); 078 executor = Executors.newSingleThreadExecutor( 079 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("lossy-count-%d").build()); 080 } 081 082 LossyCounting(String name, Configuration conf) { 083 this(name, conf, null); 084 } 085 086 public LossyCounting(String name, Configuration conf, LossyCountingListener<T> listener) { 087 this(name, conf.getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02), listener); 088 } 089 090 private void addByOne(T key) { 091 //If entry exists, we update the entry by incrementing its frequency by one. Otherwise, 092 //we create a new entry starting with currentTerm so that it will not be pruned immediately 093 data.put(key, data.getOrDefault(key, currentTerm != 0 ? currentTerm - 1 : 0) + 1); 094 095 //update totalDataCount and term 096 totalDataCount++; 097 calculateCurrentTerm(); 098 } 099 100 public void add(T key) { 101 addByOne(key); 102 if(totalDataCount % bucketSize == 0) { 103 //sweep the entries at bucket boundaries 104 //run Sweep 105 Future<?> future = fut.get(); 106 if (future != null && !future.isDone()){ 107 return; 108 } 109 future = executor.submit(new SweepRunnable()); 110 fut.set(future); 111 } 112 } 113 114 115 /** 116 * sweep low frequency data 117 */ 118 @VisibleForTesting 119 public void sweep() { 120 for(Map.Entry<T, Integer> entry : data.entrySet()) { 121 if(entry.getValue() < currentTerm) { 122 T metric = entry.getKey(); 123 data.remove(metric); 124 if (listener != null) { 125 listener.sweep(metric); 126 } 127 } 128 } 129 } 130 131 /** 132 * Calculate and set current term 133 */ 134 private void calculateCurrentTerm() { 135 this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / (double) bucketSize); 136 } 137 138 public long getBucketSize(){ 139 return bucketSize; 140 } 141 142 public long getDataSize() { 143 return data.size(); 144 } 145 146 public boolean contains(T key) { 147 return data.containsKey(key); 148 } 149 150 public Set<T> getElements(){ 151 return data.keySet(); 152 } 153 154 public long getCurrentTerm() { 155 return currentTerm; 156 } 157 158 class SweepRunnable implements Runnable { 159 @Override public void run() { 160 if (LOG.isTraceEnabled()) { 161 LOG.trace("Starting sweep of lossyCounting-" + name); 162 } 163 try { 164 sweep(); 165 } catch (Exception exception) { 166 LOG.debug("Error while sweeping of lossyCounting-{}", name, exception); 167 } 168 } 169 } 170 171 @VisibleForTesting public Future<?> getSweepFuture() { 172 return fut.get(); 173 } 174} 175