001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.util.Map;
021import java.util.Set;
022import java.util.concurrent.ConcurrentHashMap;
023import java.util.concurrent.ExecutorService;
024import java.util.concurrent.Executors;
025import java.util.concurrent.Future;
026import java.util.concurrent.atomic.AtomicReference;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
034
035/**
036 * LossyCounting utility, bounded data structure that maintains approximate high frequency elements
037 * in data stream. Bucket size is 1 / error rate. (Error rate is 0.02 by default) Lemma If element
038 * does not appear in set, then is frequency is less than e * N (N is total element counts until
039 * now.) Based on paper: http://www.vldb.org/conf/2002/S10P03.pdf
040 */
041@InterfaceAudience.Private
042public class LossyCounting<T> {
043  private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class);
044  private final ExecutorService executor;
045  private long bucketSize;
046  private int currentTerm;
047  private Map<T, Integer> data;
048  private long totalDataCount;
049  private final String name;
050  private LossyCountingListener<T> listener;
051  private static AtomicReference<Future<?>> fut = new AtomicReference<>(null);
052
053  public interface LossyCountingListener<T> {
054    void sweep(T key);
055  }
056
057  LossyCounting(String name, double errorRate) {
058    this(name, errorRate, null);
059  }
060
061  public LossyCounting(String name, double errorRate, LossyCountingListener<T> listener) {
062    this.name = name;
063    if (errorRate < 0.0 || errorRate > 1.0) {
064      throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]");
065    }
066    this.bucketSize = (long) Math.ceil(1 / errorRate);
067    this.currentTerm = 1;
068    this.totalDataCount = 0;
069    this.data = new ConcurrentHashMap<>();
070    this.listener = listener;
071    calculateCurrentTerm();
072    executor = Executors.newSingleThreadExecutor(
073      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("lossy-count-%d").build());
074  }
075
076  LossyCounting(String name, Configuration conf) {
077    this(name, conf, null);
078  }
079
080  public LossyCounting(String name, Configuration conf, LossyCountingListener<T> listener) {
081    this(name, conf.getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02), listener);
082  }
083
084  private void addByOne(T key) {
085    // If entry exists, we update the entry by incrementing its frequency by one. Otherwise,
086    // we create a new entry starting with currentTerm so that it will not be pruned immediately
087    data.put(key, data.getOrDefault(key, currentTerm != 0 ? currentTerm - 1 : 0) + 1);
088
089    // update totalDataCount and term
090    totalDataCount++;
091    calculateCurrentTerm();
092  }
093
094  public void add(T key) {
095    addByOne(key);
096    if (totalDataCount % bucketSize == 0) {
097      // sweep the entries at bucket boundaries
098      // run Sweep
099      Future<?> future = fut.get();
100      if (future != null && !future.isDone()) {
101        return;
102      }
103      future = executor.submit(new SweepRunnable());
104      fut.set(future);
105    }
106  }
107
108  /**
109   * sweep low frequency data
110   */
111  public void sweep() {
112    for (Map.Entry<T, Integer> entry : data.entrySet()) {
113      if (entry.getValue() < currentTerm) {
114        T metric = entry.getKey();
115        data.remove(metric);
116        if (listener != null) {
117          listener.sweep(metric);
118        }
119      }
120    }
121  }
122
123  /**
124   * Calculate and set current term
125   */
126  private void calculateCurrentTerm() {
127    this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / (double) bucketSize);
128  }
129
130  public long getBucketSize() {
131    return bucketSize;
132  }
133
134  public long getDataSize() {
135    return data.size();
136  }
137
138  public boolean contains(T key) {
139    return data.containsKey(key);
140  }
141
142  public Set<T> getElements() {
143    return data.keySet();
144  }
145
146  public long getCurrentTerm() {
147    return currentTerm;
148  }
149
150  class SweepRunnable implements Runnable {
151    @Override
152    public void run() {
153      if (LOG.isTraceEnabled()) {
154        LOG.trace("Starting sweep of lossyCounting-" + name);
155      }
156      try {
157        sweep();
158      } catch (Exception exception) {
159        LOG.debug("Error while sweeping of lossyCounting-{}", name, exception);
160      }
161    }
162  }
163
164  public Future<?> getSweepFuture() {
165    return fut.get();
166  }
167}