001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.util;
021
022import java.util.Map;
023import java.util.Set;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.ExecutorService;
026import java.util.concurrent.Executors;
027import java.util.concurrent.Future;
028import java.util.concurrent.atomic.AtomicReference;
029
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
037import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
038
039/**
040 * LossyCounting utility, bounded data structure that maintains approximate high frequency
041 * elements in data stream.
042 *
043 * Bucket size is 1 / error rate.  (Error rate is 0.02 by default)
044 * Lemma If element does not appear in set, then is frequency is less than e * N
045 *       (N is total element counts until now.)
046 * Based on paper:
047 * http://www.vldb.org/conf/2002/S10P03.pdf
048 */
049
050@InterfaceAudience.Private
051public class LossyCounting<T> {
052  private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class);
053  private final ExecutorService executor;
054  private long bucketSize;
055  private int currentTerm;
056  private double errorRate;
057  private Map<T, Integer> data;
058  private long totalDataCount;
059  private final String name;
060  private LossyCountingListener listener;
061  private static AtomicReference<Future> fut = new AtomicReference<>(null);
062
063  public interface LossyCountingListener<T> {
064    void sweep(T key);
065  }
066
067  public LossyCounting(double errorRate, String name, LossyCountingListener listener) {
068    this.errorRate = errorRate;
069    this.name = name;
070    if (errorRate < 0.0 || errorRate > 1.0) {
071      throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]");
072    }
073    this.bucketSize = (long) Math.ceil(1 / errorRate);
074    this.currentTerm = 1;
075    this.totalDataCount = 0;
076    this.data = new ConcurrentHashMap<>();
077    this.listener = listener;
078    calculateCurrentTerm();
079    executor = Executors.newSingleThreadExecutor(
080      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("lossy-count-%d").build());
081  }
082
083  public LossyCounting(String name, LossyCountingListener listener) {
084    this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02),
085        name, listener);
086  }
087
088  private void addByOne(T key) {
089    //If entry exists, we update the entry by incrementing its frequency by one. Otherwise,
090    //we create a new entry starting with currentTerm so that it will not be pruned immediately
091    data.put(key, data.getOrDefault(key, currentTerm != 0 ? currentTerm - 1 : 0) + 1);
092
093    //update totalDataCount and term
094    totalDataCount++;
095    calculateCurrentTerm();
096  }
097
098  public void add(T key) {
099    addByOne(key);
100    if(totalDataCount % bucketSize == 0) {
101      //sweep the entries at bucket boundaries
102      //run Sweep
103      Future future = fut.get();
104      if (future != null && !future.isDone()){
105        return;
106      }
107      future = executor.submit(new SweepRunnable());
108      fut.set(future);
109    }
110  }
111
112
113  /**
114   * sweep low frequency data
115   */
116  @VisibleForTesting
117  public void sweep() {
118    for(Map.Entry<T, Integer> entry : data.entrySet()) {
119      if(entry.getValue() < currentTerm) {
120        T metric = entry.getKey();
121        data.remove(metric);
122        if (listener != null) {
123          listener.sweep(metric);
124        }
125      }
126    }
127  }
128
129  /**
130   * Calculate and set current term
131   */
132  private void calculateCurrentTerm() {
133    this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / (double) bucketSize);
134  }
135
136  public long getBucketSize(){
137    return bucketSize;
138  }
139
140  public long getDataSize() {
141    return data.size();
142  }
143
144  public boolean contains(T key) {
145    return data.containsKey(key);
146  }
147
148  public Set<T> getElements(){
149    return data.keySet();
150  }
151
152  public long getCurrentTerm() {
153    return currentTerm;
154  }
155
156  class SweepRunnable implements Runnable {
157    @Override public void run() {
158      if (LOG.isTraceEnabled()) {
159        LOG.trace("Starting sweep of lossyCounting-" + name);
160      }
161      try {
162        sweep();
163      } catch (Exception exception) {
164        LOG.debug("Error while sweeping of lossyCounting-{}", name, exception);
165      }
166    }
167  }
168
169  @VisibleForTesting public Future getSweepFuture() {
170    return fut.get();
171  }
172}
173