001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mob;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.Executors;
028import java.util.concurrent.ScheduledExecutorService;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicLong;
031import java.util.concurrent.atomic.LongAdder;
032import java.util.concurrent.locks.ReentrantLock;
033
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040import org.apache.hadoop.hbase.util.IdLock;
041
042import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
043
044/**
045 * The cache for mob files.
046 * This cache doesn't cache the mob file blocks. It only caches the references of mob files.
047 * We are doing this to avoid opening and closing mob files all the time. We just keep
048 * references open.
049 */
050@InterfaceAudience.Private
051public class MobFileCache {
052
053  private static final Logger LOG = LoggerFactory.getLogger(MobFileCache.class);
054
055  /*
056   * Eviction and statistics thread. Periodically run to print the statistics and
057   * evict the lru cached mob files when the count of the cached files is larger
058   * than the threshold.
059   */
060  static class EvictionThread extends Thread {
061    MobFileCache lru;
062
063    public EvictionThread(MobFileCache lru) {
064      super("MobFileCache.EvictionThread");
065      setDaemon(true);
066      this.lru = lru;
067    }
068
069    @Override
070    public void run() {
071      lru.evict();
072    }
073  }
074
075  // a ConcurrentHashMap, accesses to this map are synchronized.
076  private Map<String, CachedMobFile> map = null;
077  // caches access count
078  private final AtomicLong count = new AtomicLong(0);
079  private long lastAccess = 0;
080  private final LongAdder miss = new LongAdder();
081  private long lastMiss = 0;
082  private final LongAdder evictedFileCount = new LongAdder();
083  private long lastEvictedFileCount = 0;
084
085  // a lock to sync the evict to guarantee the eviction occurs in sequence.
086  // the method evictFile is not sync by this lock, the ConcurrentHashMap does the sync there.
087  private final ReentrantLock evictionLock = new ReentrantLock(true);
088
089  //stripes lock on each mob file based on its hash. Sync the openFile/closeFile operations.
090  private final IdLock keyLock = new IdLock();
091
092  private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
093      new ThreadFactoryBuilder().setNameFormat("MobFileCache #%d").setDaemon(true).build());
094  private final Configuration conf;
095
096  // the count of the cached references to mob files
097  private final int mobFileMaxCacheSize;
098  private final boolean isCacheEnabled;
099  private float evictRemainRatio;
100
101  public MobFileCache(Configuration conf) {
102    this.conf = conf;
103    this.mobFileMaxCacheSize = conf.getInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY,
104        MobConstants.DEFAULT_MOB_FILE_CACHE_SIZE);
105    isCacheEnabled = (mobFileMaxCacheSize > 0);
106    map = new ConcurrentHashMap<>(mobFileMaxCacheSize);
107    if (isCacheEnabled) {
108      long period = conf.getLong(MobConstants.MOB_CACHE_EVICT_PERIOD,
109          MobConstants.DEFAULT_MOB_CACHE_EVICT_PERIOD); // in seconds
110      evictRemainRatio = conf.getFloat(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO,
111          MobConstants.DEFAULT_EVICT_REMAIN_RATIO);
112      if (evictRemainRatio < 0.0) {
113        evictRemainRatio = 0.0f;
114        LOG.warn(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO + " is less than 0.0, 0.0 is used.");
115      } else if (evictRemainRatio > 1.0) {
116        evictRemainRatio = 1.0f;
117        LOG.warn(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO + " is larger than 1.0, 1.0 is used.");
118      }
119      this.scheduleThreadPool.scheduleAtFixedRate(new EvictionThread(this), period, period,
120          TimeUnit.SECONDS);
121
122      if (LOG.isDebugEnabled()) {
123        LOG.debug("MobFileCache enabled with cacheSize=" + mobFileMaxCacheSize +
124          ", evictPeriods=" +  period + "sec, evictRemainRatio=" + evictRemainRatio);
125      }
126    } else {
127      LOG.info("MobFileCache disabled");
128    }
129  }
130
131  /**
132   * Evicts the lru cached mob files when the count of the cached files is larger
133   * than the threshold.
134   */
135  public void evict() {
136    if (isCacheEnabled) {
137      // Ensure only one eviction at a time
138      if (!evictionLock.tryLock()) {
139        return;
140      }
141      printStatistics();
142      List<CachedMobFile> evictedFiles = new ArrayList<>();
143      try {
144        if (map.size() <= mobFileMaxCacheSize) {
145          return;
146        }
147        List<CachedMobFile> files = new ArrayList<>(map.values());
148        Collections.sort(files);
149        int start = (int) (mobFileMaxCacheSize * evictRemainRatio);
150        if (start >= 0) {
151          for (int i = start; i < files.size(); i++) {
152            String name = files.get(i).getFileName();
153            CachedMobFile evictedFile = map.remove(name);
154            if (evictedFile != null) {
155              evictedFiles.add(evictedFile);
156            }
157          }
158        }
159      } finally {
160        evictionLock.unlock();
161      }
162      // EvictionLock is released. Close the evicted files one by one.
163      // The closes are sync in the closeFile method.
164      for (CachedMobFile evictedFile : evictedFiles) {
165        closeFile(evictedFile);
166      }
167      evictedFileCount.add(evictedFiles.size());
168    }
169  }
170
171  /**
172   * Evicts the cached file by the name.
173   * @param fileName The name of a cached file.
174   */
175  public void evictFile(String fileName) {
176    if (isCacheEnabled) {
177      IdLock.Entry lockEntry = null;
178      try {
179        // obtains the lock to close the cached file.
180        lockEntry = keyLock.getLockEntry(fileName.hashCode());
181        CachedMobFile evictedFile = map.remove(fileName);
182        if (evictedFile != null) {
183          evictedFile.close();
184          evictedFileCount.increment();
185        }
186      } catch (IOException e) {
187        LOG.error("Failed to evict the file " + fileName, e);
188      } finally {
189        if (lockEntry != null) {
190          keyLock.releaseLockEntry(lockEntry);
191        }
192      }
193    }
194  }
195
196  /**
197   * Opens a mob file.
198   * @param fs The current file system.
199   * @param path The file path.
200   * @param cacheConf The current MobCacheConfig
201   * @return A opened mob file.
202   * @throws IOException
203   */
204  public MobFile openFile(FileSystem fs, Path path, MobCacheConfig cacheConf) throws IOException {
205    if (!isCacheEnabled) {
206      MobFile mobFile = MobFile.create(fs, path, conf, cacheConf);
207      mobFile.open();
208      return mobFile;
209    } else {
210      String fileName = path.getName();
211      CachedMobFile cached = map.get(fileName);
212      IdLock.Entry lockEntry = keyLock.getLockEntry(fileName.hashCode());
213      try {
214        if (cached == null) {
215          cached = map.get(fileName);
216          if (cached == null) {
217            if (map.size() > mobFileMaxCacheSize) {
218              evict();
219            }
220            cached = CachedMobFile.create(fs, path, conf, cacheConf);
221            cached.open();
222            map.put(fileName, cached);
223            miss.increment();
224          }
225        }
226        cached.open();
227        cached.access(count.incrementAndGet());
228      } finally {
229        keyLock.releaseLockEntry(lockEntry);
230      }
231      return cached;
232    }
233  }
234
235  /**
236   * Closes a mob file.
237   * @param file The mob file that needs to be closed.
238   */
239  public void closeFile(MobFile file) {
240    IdLock.Entry lockEntry = null;
241    try {
242      if (!isCacheEnabled) {
243        file.close();
244      } else {
245        lockEntry = keyLock.getLockEntry(file.getFileName().hashCode());
246        file.close();
247      }
248    } catch (IOException e) {
249      LOG.error("MobFileCache, Exception happen during close " + file.getFileName(), e);
250    } finally {
251      if (lockEntry != null) {
252        keyLock.releaseLockEntry(lockEntry);
253      }
254    }
255  }
256
257  public void shutdown() {
258    this.scheduleThreadPool.shutdown();
259    for (int i = 0; i < 100; i++) {
260      if (!this.scheduleThreadPool.isShutdown()) {
261        try {
262          Thread.sleep(10);
263        } catch (InterruptedException e) {
264          LOG.warn("Interrupted while sleeping");
265          Thread.currentThread().interrupt();
266          break;
267        }
268      }
269    }
270
271    if (!this.scheduleThreadPool.isShutdown()) {
272      List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
273      LOG.debug("Still running " + runnables);
274    }
275  }
276
277  /**
278   * Gets the count of cached mob files.
279   * @return The count of the cached mob files.
280   */
281  public int getCacheSize() {
282    return map == null ? 0 : map.size();
283  }
284
285  /**
286   * Gets the count of accesses to the mob file cache.
287   * @return The count of accesses to the mob file cache.
288   */
289  public long getAccessCount() {
290    return count.get();
291  }
292
293  /**
294   * Gets the count of misses to the mob file cache.
295   * @return The count of misses to the mob file cache.
296   */
297  public long getMissCount() {
298    return miss.sum();
299  }
300
301  /**
302   * Gets the number of items evicted from the mob file cache.
303   * @return The number of items evicted from the mob file cache.
304   */
305  public long getEvictedFileCount() {
306    return evictedFileCount.sum();
307  }
308
309  /**
310   * Gets the hit ratio to the mob file cache.
311   * @return The hit ratio to the mob file cache.
312   */
313  public double getHitRatio() {
314    return count.get() == 0 ? 0 : ((float) (count.get() - miss.sum())) / (float) count.get();
315  }
316
317  /**
318   * Prints the statistics.
319   */
320  public void printStatistics() {
321    long access = count.get() - lastAccess;
322    long missed = miss.sum() - lastMiss;
323    long evicted = evictedFileCount.sum() - lastEvictedFileCount;
324    int hitRatio = access == 0 ? 0 : (int) (((float) (access - missed)) / (float) access * 100);
325    LOG.info("MobFileCache Statistics, access: " + access + ", miss: " + missed + ", hit: "
326        + (access - missed) + ", hit ratio: " + hitRatio + "%, evicted files: " + evicted);
327    lastAccess += access;
328    lastMiss += missed;
329    lastEvictedFileCount += evicted;
330  }
331
332}