001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.util;
021
022import java.util.Map;
023import java.util.Set;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.ExecutorService;
026import java.util.concurrent.Executors;
027import java.util.concurrent.Future;
028import java.util.concurrent.atomic.AtomicReference;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
035import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
036
037/**
038 * LossyCounting utility, bounded data structure that maintains approximate high frequency
039 * elements in data stream.
040 *
041 * Bucket size is 1 / error rate.  (Error rate is 0.02 by default)
042 * Lemma If element does not appear in set, then is frequency is less than e * N
043 *       (N is total element counts until now.)
044 * Based on paper:
045 * http://www.vldb.org/conf/2002/S10P03.pdf
046 */
047@InterfaceAudience.Private
048public class LossyCounting<T> {
049  private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class);
050  private final ExecutorService executor;
051  private long bucketSize;
052  private int currentTerm;
053  private Map<T, Integer> data;
054  private long totalDataCount;
055  private final String name;
056  private LossyCountingListener<T> listener;
057  private static AtomicReference<Future<?>> fut = new AtomicReference<>(null);
058
059  public interface LossyCountingListener<T> {
060    void sweep(T key);
061  }
062
063  LossyCounting(String name, double errorRate) {
064    this(name, errorRate, null);
065  }
066
067  public LossyCounting(String name, double errorRate, LossyCountingListener<T> listener) {
068    this.name = name;
069    if (errorRate < 0.0 || errorRate > 1.0) {
070      throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]");
071    }
072    this.bucketSize = (long) Math.ceil(1 / errorRate);
073    this.currentTerm = 1;
074    this.totalDataCount = 0;
075    this.data = new ConcurrentHashMap<>();
076    this.listener = listener;
077    calculateCurrentTerm();
078    executor = Executors.newSingleThreadExecutor(
079      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("lossy-count-%d").build());
080  }
081
082  LossyCounting(String name, Configuration conf) {
083    this(name, conf, null);
084  }
085
086  public LossyCounting(String name, Configuration conf, LossyCountingListener<T> listener) {
087    this(name, conf.getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02), listener);
088  }
089
090  private void addByOne(T key) {
091    //If entry exists, we update the entry by incrementing its frequency by one. Otherwise,
092    //we create a new entry starting with currentTerm so that it will not be pruned immediately
093    data.put(key, data.getOrDefault(key, currentTerm != 0 ? currentTerm - 1 : 0) + 1);
094
095    //update totalDataCount and term
096    totalDataCount++;
097    calculateCurrentTerm();
098  }
099
100  public void add(T key) {
101    addByOne(key);
102    if(totalDataCount % bucketSize == 0) {
103      //sweep the entries at bucket boundaries
104      //run Sweep
105      Future<?> future = fut.get();
106      if (future != null && !future.isDone()){
107        return;
108      }
109      future = executor.submit(new SweepRunnable());
110      fut.set(future);
111    }
112  }
113
114
115  /**
116   * sweep low frequency data
117   */
118  @VisibleForTesting
119  public void sweep() {
120    for(Map.Entry<T, Integer> entry : data.entrySet()) {
121      if(entry.getValue() < currentTerm) {
122        T metric = entry.getKey();
123        data.remove(metric);
124        if (listener != null) {
125          listener.sweep(metric);
126        }
127      }
128    }
129  }
130
131  /**
132   * Calculate and set current term
133   */
134  private void calculateCurrentTerm() {
135    this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / (double) bucketSize);
136  }
137
138  public long getBucketSize(){
139    return bucketSize;
140  }
141
142  public long getDataSize() {
143    return data.size();
144  }
145
146  public boolean contains(T key) {
147    return data.containsKey(key);
148  }
149
150  public Set<T> getElements(){
151    return data.keySet();
152  }
153
154  public long getCurrentTerm() {
155    return currentTerm;
156  }
157
158  class SweepRunnable implements Runnable {
159    @Override public void run() {
160      if (LOG.isTraceEnabled()) {
161        LOG.trace("Starting sweep of lossyCounting-" + name);
162      }
163      try {
164        sweep();
165      } catch (Exception exception) {
166        LOG.debug("Error while sweeping of lossyCounting-{}", name, exception);
167      }
168    }
169  }
170
171  @VisibleForTesting public Future<?> getSweepFuture() {
172    return fut.get();
173  }
174}
175