001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mob;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.Executors;
028import java.util.concurrent.ScheduledExecutorService;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicLong;
031import java.util.concurrent.atomic.LongAdder;
032import java.util.concurrent.locks.ReentrantLock;
033
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.io.hfile.CacheConfig;
038import org.apache.hadoop.hbase.util.IdLock;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
044
045/**
046 * The cache for mob files.
047 * This cache doesn't cache the mob file blocks. It only caches the references of mob files.
048 * We are doing this to avoid opening and closing mob files all the time. We just keep
049 * references open.
050 */
051@InterfaceAudience.Private
052public class MobFileCache {
053
054  private static final Logger LOG = LoggerFactory.getLogger(MobFileCache.class);
055
056  /*
057   * Eviction and statistics thread. Periodically run to print the statistics and
058   * evict the lru cached mob files when the count of the cached files is larger
059   * than the threshold.
060   */
061  static class EvictionThread extends Thread {
062    MobFileCache lru;
063
064    public EvictionThread(MobFileCache lru) {
065      super("MobFileCache.EvictionThread");
066      setDaemon(true);
067      this.lru = lru;
068    }
069
070    @Override
071    public void run() {
072      lru.evict();
073    }
074  }
075
076  // a ConcurrentHashMap, accesses to this map are synchronized.
077  private Map<String, CachedMobFile> map = null;
078  // caches access count
079  private final AtomicLong count = new AtomicLong(0);
080  private long lastAccess = 0;
081  private final LongAdder miss = new LongAdder();
082  private long lastMiss = 0;
083  private final LongAdder evictedFileCount = new LongAdder();
084  private long lastEvictedFileCount = 0;
085
086  // a lock to sync the evict to guarantee the eviction occurs in sequence.
087  // the method evictFile is not sync by this lock, the ConcurrentHashMap does the sync there.
088  private final ReentrantLock evictionLock = new ReentrantLock(true);
089
090  //stripes lock on each mob file based on its hash. Sync the openFile/closeFile operations.
091  private final IdLock keyLock = new IdLock();
092
093  private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
094      new ThreadFactoryBuilder().setNameFormat("MobFileCache #%d").setDaemon(true).build());
095  private final Configuration conf;
096
097  // the count of the cached references to mob files
098  private final int mobFileMaxCacheSize;
099  private final boolean isCacheEnabled;
100  private float evictRemainRatio;
101
102  public MobFileCache(Configuration conf) {
103    this.conf = conf;
104    this.mobFileMaxCacheSize = conf.getInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY,
105        MobConstants.DEFAULT_MOB_FILE_CACHE_SIZE);
106    isCacheEnabled = (mobFileMaxCacheSize > 0);
107    map = new ConcurrentHashMap<>(mobFileMaxCacheSize);
108    if (isCacheEnabled) {
109      long period = conf.getLong(MobConstants.MOB_CACHE_EVICT_PERIOD,
110          MobConstants.DEFAULT_MOB_CACHE_EVICT_PERIOD); // in seconds
111      evictRemainRatio = conf.getFloat(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO,
112          MobConstants.DEFAULT_EVICT_REMAIN_RATIO);
113      if (evictRemainRatio < 0.0) {
114        evictRemainRatio = 0.0f;
115        LOG.warn(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO + " is less than 0.0, 0.0 is used.");
116      } else if (evictRemainRatio > 1.0) {
117        evictRemainRatio = 1.0f;
118        LOG.warn(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO + " is larger than 1.0, 1.0 is used.");
119      }
120      this.scheduleThreadPool.scheduleAtFixedRate(new EvictionThread(this), period, period,
121          TimeUnit.SECONDS);
122
123      if (LOG.isDebugEnabled()) {
124        LOG.debug("MobFileCache enabled with cacheSize=" + mobFileMaxCacheSize +
125          ", evictPeriods=" +  period + "sec, evictRemainRatio=" + evictRemainRatio);
126      }
127    } else {
128      LOG.info("MobFileCache disabled");
129    }
130  }
131
132  /**
133   * Evicts the lru cached mob files when the count of the cached files is larger
134   * than the threshold.
135   */
136  public void evict() {
137    if (isCacheEnabled) {
138      // Ensure only one eviction at a time
139      if (!evictionLock.tryLock()) {
140        return;
141      }
142      printStatistics();
143      List<CachedMobFile> evictedFiles = new ArrayList<>();
144      try {
145        if (map.size() <= mobFileMaxCacheSize) {
146          return;
147        }
148        List<CachedMobFile> files = new ArrayList<>(map.values());
149        Collections.sort(files);
150        int start = (int) (mobFileMaxCacheSize * evictRemainRatio);
151        if (start >= 0) {
152          for (int i = start; i < files.size(); i++) {
153            String name = files.get(i).getFileName();
154            CachedMobFile evictedFile = map.remove(name);
155            if (evictedFile != null) {
156              evictedFiles.add(evictedFile);
157            }
158          }
159        }
160      } finally {
161        evictionLock.unlock();
162      }
163      // EvictionLock is released. Close the evicted files one by one.
164      // The closes are sync in the closeFile method.
165      for (CachedMobFile evictedFile : evictedFiles) {
166        closeFile(evictedFile);
167      }
168      evictedFileCount.add(evictedFiles.size());
169    }
170  }
171
172  /**
173   * Evicts the cached file by the name.
174   * @param fileName The name of a cached file.
175   */
176  public void evictFile(String fileName) {
177    if (isCacheEnabled) {
178      IdLock.Entry lockEntry = null;
179      try {
180        // obtains the lock to close the cached file.
181        lockEntry = keyLock.getLockEntry(fileName.hashCode());
182        CachedMobFile evictedFile = map.remove(fileName);
183        if (evictedFile != null) {
184          evictedFile.close();
185          evictedFileCount.increment();
186        }
187      } catch (IOException e) {
188        LOG.error("Failed to evict the file " + fileName, e);
189      } finally {
190        if (lockEntry != null) {
191          keyLock.releaseLockEntry(lockEntry);
192        }
193      }
194    }
195  }
196
197  /**
198   * Opens a mob file.
199   * @param fs The current file system.
200   * @param path The file path.
201   * @param cacheConf The current MobCacheConfig
202   * @return A opened mob file.
203   * @throws IOException
204   */
205  public MobFile openFile(FileSystem fs, Path path, CacheConfig cacheConf) throws IOException {
206    if (!isCacheEnabled) {
207      MobFile mobFile = MobFile.create(fs, path, conf, cacheConf);
208      mobFile.open();
209      return mobFile;
210    } else {
211      String fileName = path.getName();
212      CachedMobFile cached = map.get(fileName);
213      IdLock.Entry lockEntry = keyLock.getLockEntry(fileName.hashCode());
214      try {
215        if (cached == null) {
216          cached = map.get(fileName);
217          if (cached == null) {
218            if (map.size() > mobFileMaxCacheSize) {
219              evict();
220            }
221            cached = CachedMobFile.create(fs, path, conf, cacheConf);
222            cached.open();
223            map.put(fileName, cached);
224            miss.increment();
225          }
226        }
227        cached.open();
228        cached.access(count.incrementAndGet());
229      } finally {
230        keyLock.releaseLockEntry(lockEntry);
231      }
232      return cached;
233    }
234  }
235
236  /**
237   * Closes a mob file.
238   * @param file The mob file that needs to be closed.
239   */
240  public void closeFile(MobFile file) {
241    IdLock.Entry lockEntry = null;
242    try {
243      if (!isCacheEnabled) {
244        file.close();
245      } else {
246        lockEntry = keyLock.getLockEntry(file.getFileName().hashCode());
247        file.close();
248      }
249    } catch (IOException e) {
250      LOG.error("MobFileCache, Exception happen during close " + file.getFileName(), e);
251    } finally {
252      if (lockEntry != null) {
253        keyLock.releaseLockEntry(lockEntry);
254      }
255    }
256  }
257
258  public void shutdown() {
259    this.scheduleThreadPool.shutdown();
260    for (int i = 0; i < 100; i++) {
261      if (!this.scheduleThreadPool.isShutdown()) {
262        try {
263          Thread.sleep(10);
264        } catch (InterruptedException e) {
265          LOG.warn("Interrupted while sleeping");
266          Thread.currentThread().interrupt();
267          break;
268        }
269      }
270    }
271
272    if (!this.scheduleThreadPool.isShutdown()) {
273      List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
274      LOG.debug("Still running " + runnables);
275    }
276  }
277
278  /**
279   * Gets the count of cached mob files.
280   * @return The count of the cached mob files.
281   */
282  public int getCacheSize() {
283    return map == null ? 0 : map.size();
284  }
285
286  /**
287   * Gets the count of accesses to the mob file cache.
288   * @return The count of accesses to the mob file cache.
289   */
290  public long getAccessCount() {
291    return count.get();
292  }
293
294  /**
295   * Gets the count of misses to the mob file cache.
296   * @return The count of misses to the mob file cache.
297   */
298  public long getMissCount() {
299    return miss.sum();
300  }
301
302  /**
303   * Gets the number of items evicted from the mob file cache.
304   * @return The number of items evicted from the mob file cache.
305   */
306  public long getEvictedFileCount() {
307    return evictedFileCount.sum();
308  }
309
310  /**
311   * Gets the hit ratio to the mob file cache.
312   * @return The hit ratio to the mob file cache.
313   */
314  public double getHitRatio() {
315    return count.get() == 0 ? 0 : ((float) (count.get() - miss.sum())) / (float) count.get();
316  }
317
318  /**
319   * Prints the statistics.
320   */
321  public void printStatistics() {
322    long access = count.get() - lastAccess;
323    long missed = miss.sum() - lastMiss;
324    long evicted = evictedFileCount.sum() - lastEvictedFileCount;
325    int hitRatio = access == 0 ? 0 : (int) (((float) (access - missed)) / (float) access * 100);
326    LOG.info("MobFileCache Statistics, access: " + access + ", miss: " + missed + ", hit: "
327        + (access - missed) + ", hit ratio: " + hitRatio + "%, evicted files: " + evicted);
328    lastAccess += access;
329    lastMiss += missed;
330    lastEvictedFileCount += evicted;
331  }
332
333}