001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.util.Map; 021import java.util.Set; 022import java.util.concurrent.ConcurrentHashMap; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.Executors; 025import java.util.concurrent.Future; 026import java.util.concurrent.atomic.AtomicReference; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 034 035/** 036 * LossyCounting utility, bounded data structure that maintains approximate high frequency elements 037 * in data stream. Bucket size is 1 / error rate. (Error rate is 0.02 by default) Lemma If element 038 * does not appear in set, then is frequency is less than e * N (N is total element counts until 039 * now.) Based on paper: http://www.vldb.org/conf/2002/S10P03.pdf 040 */ 041@InterfaceAudience.Private 042public class LossyCounting<T> { 043 private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class); 044 private final ExecutorService executor; 045 private long bucketSize; 046 private int currentTerm; 047 private Map<T, Integer> data; 048 private long totalDataCount; 049 private final String name; 050 private LossyCountingListener<T> listener; 051 private static AtomicReference<Future<?>> fut = new AtomicReference<>(null); 052 053 public interface LossyCountingListener<T> { 054 void sweep(T key); 055 } 056 057 LossyCounting(String name, double errorRate) { 058 this(name, errorRate, null); 059 } 060 061 public LossyCounting(String name, double errorRate, LossyCountingListener<T> listener) { 062 this.name = name; 063 if (errorRate < 0.0 || errorRate > 1.0) { 064 throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]"); 065 } 066 this.bucketSize = (long) Math.ceil(1 / errorRate); 067 this.currentTerm = 1; 068 this.totalDataCount = 0; 069 this.data = new ConcurrentHashMap<>(); 070 this.listener = listener; 071 calculateCurrentTerm(); 072 executor = Executors.newSingleThreadExecutor( 073 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("lossy-count-%d").build()); 074 } 075 076 LossyCounting(String name, Configuration conf) { 077 this(name, conf, null); 078 } 079 080 public LossyCounting(String name, Configuration conf, LossyCountingListener<T> listener) { 081 this(name, conf.getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02), listener); 082 } 083 084 private void addByOne(T key) { 085 // If entry exists, we update the entry by incrementing its frequency by one. Otherwise, 086 // we create a new entry starting with currentTerm so that it will not be pruned immediately 087 data.put(key, data.getOrDefault(key, currentTerm != 0 ? currentTerm - 1 : 0) + 1); 088 089 // update totalDataCount and term 090 totalDataCount++; 091 calculateCurrentTerm(); 092 } 093 094 public void add(T key) { 095 addByOne(key); 096 if (totalDataCount % bucketSize == 0) { 097 // sweep the entries at bucket boundaries 098 // run Sweep 099 Future<?> future = fut.get(); 100 if (future != null && !future.isDone()) { 101 return; 102 } 103 future = executor.submit(new SweepRunnable()); 104 fut.set(future); 105 } 106 } 107 108 /** 109 * sweep low frequency data 110 */ 111 public void sweep() { 112 for (Map.Entry<T, Integer> entry : data.entrySet()) { 113 if (entry.getValue() < currentTerm) { 114 T metric = entry.getKey(); 115 data.remove(metric); 116 if (listener != null) { 117 listener.sweep(metric); 118 } 119 } 120 } 121 } 122 123 /** 124 * Calculate and set current term 125 */ 126 private void calculateCurrentTerm() { 127 this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / (double) bucketSize); 128 } 129 130 public long getBucketSize() { 131 return bucketSize; 132 } 133 134 public long getDataSize() { 135 return data.size(); 136 } 137 138 public boolean contains(T key) { 139 return data.containsKey(key); 140 } 141 142 public Set<T> getElements() { 143 return data.keySet(); 144 } 145 146 public long getCurrentTerm() { 147 return currentTerm; 148 } 149 150 class SweepRunnable implements Runnable { 151 @Override 152 public void run() { 153 if (LOG.isTraceEnabled()) { 154 LOG.trace("Starting sweep of lossyCounting-" + name); 155 } 156 try { 157 sweep(); 158 } catch (Exception exception) { 159 LOG.debug("Error while sweeping of lossyCounting-{}", name, exception); 160 } 161 } 162 } 163 164 public Future<?> getSweepFuture() { 165 return fut.get(); 166 } 167}