001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs.monitor;
019
020import java.util.Map;
021import java.util.concurrent.ConcurrentHashMap;
022import java.util.concurrent.TimeUnit;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.conf.ConfigurationObserver;
025import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
026import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
032import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
033
034/**
035 * The class to manage the excluded datanodes of the WALs on the regionserver.
036 */
037@InterfaceAudience.Private
038public class ExcludeDatanodeManager implements ConfigurationObserver {
039  private static final Logger LOG = LoggerFactory.getLogger(ExcludeDatanodeManager.class);
040
041  /**
042   * Configure for the max count the excluded datanodes.
043   */
044  public static final String WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY =
045    "hbase.regionserver.async.wal.max.exclude.datanode.count";
046  public static final int DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT = 3;
047
048  /**
049   * Configure for the TTL time of the datanodes excluded
050   */
051  public static final String WAL_EXCLUDE_DATANODE_TTL_KEY =
052    "hbase.regionserver.async.wal.exclude.datanode.info.ttl.hour";
053  public static final int DEFAULT_WAL_EXCLUDE_DATANODE_TTL = 6; // 6 hours
054
055  private volatile Cache<DatanodeInfo, Long> excludeDNsCache;
056  private final int maxExcludeDNCount;
057  private final Configuration conf;
058  // This is a map of providerId->StreamSlowMonitor
059  private final Map<String, StreamSlowMonitor> streamSlowMonitors = new ConcurrentHashMap<>(1);
060
061  public ExcludeDatanodeManager(Configuration conf) {
062    this.conf = conf;
063    this.maxExcludeDNCount = conf.getInt(WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY,
064      DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT);
065    this.excludeDNsCache = CacheBuilder.newBuilder()
066      .expireAfterWrite(
067        this.conf.getLong(WAL_EXCLUDE_DATANODE_TTL_KEY, DEFAULT_WAL_EXCLUDE_DATANODE_TTL),
068        TimeUnit.HOURS)
069      .maximumSize(this.maxExcludeDNCount).build();
070  }
071
072  /**
073   * Try to add a datanode to the regionserver excluding cache
074   * @param datanodeInfo the datanode to be added to the excluded cache
075   * @param cause        the cause that the datanode is hope to be excluded
076   * @return True if the datanode is added to the regionserver excluding cache, false otherwise
077   */
078  public boolean tryAddExcludeDN(DatanodeInfo datanodeInfo, String cause) {
079    boolean alreadyMarkedSlow = getExcludeDNs().containsKey(datanodeInfo);
080    if (!alreadyMarkedSlow) {
081      excludeDNsCache.put(datanodeInfo, EnvironmentEdgeManager.currentTime());
082      LOG.info(
083        "Added datanode: {} to exclude cache by [{}] success, current excludeDNsCache size={}",
084        datanodeInfo, cause, excludeDNsCache.size());
085      return true;
086    }
087    LOG.debug(
088      "Try add datanode {} to exclude cache by [{}] failed, " + "current exclude DNs are {}",
089      datanodeInfo, cause, getExcludeDNs().keySet());
090    return false;
091  }
092
093  public StreamSlowMonitor getStreamSlowMonitor(String name) {
094    String key = name == null || name.isEmpty() ? "defaultMonitorName" : name;
095    return streamSlowMonitors.computeIfAbsent(key, k -> new StreamSlowMonitor(conf, key, this));
096  }
097
098  public Map<DatanodeInfo, Long> getExcludeDNs() {
099    return excludeDNsCache.asMap();
100  }
101
102  @Override
103  public void onConfigurationChange(Configuration conf) {
104    for (StreamSlowMonitor monitor : streamSlowMonitors.values()) {
105      monitor.onConfigurationChange(conf);
106    }
107    this.excludeDNsCache = CacheBuilder.newBuilder()
108      .expireAfterWrite(
109        this.conf.getLong(WAL_EXCLUDE_DATANODE_TTL_KEY, DEFAULT_WAL_EXCLUDE_DATANODE_TTL),
110        TimeUnit.HOURS)
111      .maximumSize(this.conf.getInt(WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY,
112        DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT))
113      .build();
114  }
115}