001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs.monitor;
019
020import java.util.Map;
021import java.util.concurrent.ConcurrentHashMap;
022import java.util.concurrent.TimeUnit;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.conf.ConfigurationObserver;
025import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
026import org.apache.hadoop.hbase.util.Pair;
027import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
033import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
034
035/**
036 * The class to manage the excluded datanodes of the WALs on the regionserver.
037 */
038@InterfaceAudience.Private
039public class ExcludeDatanodeManager implements ConfigurationObserver {
040  private static final Logger LOG = LoggerFactory.getLogger(ExcludeDatanodeManager.class);
041
042  /**
043   * Configure for the max count the excluded datanodes.
044   */
045  public static final String WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY =
046    "hbase.regionserver.async.wal.max.exclude.datanode.count";
047  public static final int DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT = 3;
048
049  /**
050   * Configure for the TTL time of the datanodes excluded
051   */
052  public static final String WAL_EXCLUDE_DATANODE_TTL_KEY =
053    "hbase.regionserver.async.wal.exclude.datanode.info.ttl.hour";
054  public static final int DEFAULT_WAL_EXCLUDE_DATANODE_TTL = 6; // 6 hours
055
056  private volatile Cache<DatanodeInfo, Pair<String, Long>> excludeDNsCache;
057  private final int maxExcludeDNCount;
058  private final Configuration conf;
059  // This is a map of providerId->StreamSlowMonitor
060  private final Map<String, StreamSlowMonitor> streamSlowMonitors = new ConcurrentHashMap<>(1);
061
062  public ExcludeDatanodeManager(Configuration conf) {
063    this.conf = conf;
064    this.maxExcludeDNCount = conf.getInt(WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY,
065      DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT);
066    this.excludeDNsCache = CacheBuilder.newBuilder()
067      .expireAfterWrite(
068        this.conf.getLong(WAL_EXCLUDE_DATANODE_TTL_KEY, DEFAULT_WAL_EXCLUDE_DATANODE_TTL),
069        TimeUnit.HOURS)
070      .maximumSize(this.maxExcludeDNCount).build();
071  }
072
073  /**
074   * Try to add a datanode to the regionserver excluding cache
075   * @param datanodeInfo the datanode to be added to the excluded cache
076   * @param cause        the cause that the datanode is hope to be excluded
077   * @return True if the datanode is added to the regionserver excluding cache, false otherwise
078   */
079  public boolean tryAddExcludeDN(DatanodeInfo datanodeInfo, String cause) {
080    boolean alreadyMarkedSlow = getExcludeDNs().containsKey(datanodeInfo);
081    if (!alreadyMarkedSlow) {
082      excludeDNsCache.put(datanodeInfo, new Pair<>(cause, EnvironmentEdgeManager.currentTime()));
083      LOG.info(
084        "Added datanode: {} to exclude cache by [{}] success, current excludeDNsCache size={}",
085        datanodeInfo, cause, excludeDNsCache.size());
086      return true;
087    }
088    LOG.debug(
089      "Try add datanode {} to exclude cache by [{}] failed, " + "current exclude DNs are {}",
090      datanodeInfo, cause, getExcludeDNs().keySet());
091    return false;
092  }
093
094  public StreamSlowMonitor getStreamSlowMonitor(String name) {
095    String key = name == null || name.isEmpty() ? "defaultMonitorName" : name;
096    return streamSlowMonitors.computeIfAbsent(key, k -> new StreamSlowMonitor(conf, key, this));
097  }
098
099  /**
100   * Enumerates the reason of excluding a Datanode from WAL Write due to specific cause. Each enum
101   * constant represents a specific cause leading to exclusion.
102   */
103  public enum ExcludeCause {
104    CONNECT_ERROR("connect error"),
105    SLOW_PACKET_ACK("slow packet ack");
106
107    private final String cause;
108
109    ExcludeCause(String cause) {
110      this.cause = cause;
111    }
112
113    public String getCause() {
114      return cause;
115    }
116
117    @Override
118    public String toString() {
119      return cause;
120    }
121  }
122
123  public Map<DatanodeInfo, Pair<String, Long>> getExcludeDNs() {
124    return excludeDNsCache.asMap();
125  }
126
127  @Override
128  public void onConfigurationChange(Configuration conf) {
129    for (StreamSlowMonitor monitor : streamSlowMonitors.values()) {
130      monitor.onConfigurationChange(conf);
131    }
132    this.excludeDNsCache = CacheBuilder.newBuilder()
133      .expireAfterWrite(
134        this.conf.getLong(WAL_EXCLUDE_DATANODE_TTL_KEY, DEFAULT_WAL_EXCLUDE_DATANODE_TTL),
135        TimeUnit.HOURS)
136      .maximumSize(this.conf.getInt(WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY,
137        DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT))
138      .build();
139  }
140}