001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs.monitor; 019 020import java.util.Map; 021import java.util.concurrent.ConcurrentHashMap; 022import java.util.concurrent.TimeUnit; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.conf.ConfigurationObserver; 025import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 026import org.apache.hadoop.hbase.util.Pair; 027import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 033import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 034 035/** 036 * The class to manage the excluded datanodes of the WALs on the regionserver. 037 */ 038@InterfaceAudience.Private 039public class ExcludeDatanodeManager implements ConfigurationObserver { 040 private static final Logger LOG = LoggerFactory.getLogger(ExcludeDatanodeManager.class); 041 042 /** 043 * Configure for the max count the excluded datanodes. 044 */ 045 public static final String WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY = 046 "hbase.regionserver.async.wal.max.exclude.datanode.count"; 047 public static final int DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT = 3; 048 049 /** 050 * Configure for the TTL time of the datanodes excluded 051 */ 052 public static final String WAL_EXCLUDE_DATANODE_TTL_KEY = 053 "hbase.regionserver.async.wal.exclude.datanode.info.ttl.hour"; 054 public static final int DEFAULT_WAL_EXCLUDE_DATANODE_TTL = 6; // 6 hours 055 056 private volatile Cache<DatanodeInfo, Pair<String, Long>> excludeDNsCache; 057 private final int maxExcludeDNCount; 058 private final Configuration conf; 059 // This is a map of providerId->StreamSlowMonitor 060 private final Map<String, StreamSlowMonitor> streamSlowMonitors = new ConcurrentHashMap<>(1); 061 062 public ExcludeDatanodeManager(Configuration conf) { 063 this.conf = conf; 064 this.maxExcludeDNCount = conf.getInt(WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY, 065 DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT); 066 this.excludeDNsCache = CacheBuilder.newBuilder() 067 .expireAfterWrite( 068 this.conf.getLong(WAL_EXCLUDE_DATANODE_TTL_KEY, DEFAULT_WAL_EXCLUDE_DATANODE_TTL), 069 TimeUnit.HOURS) 070 .maximumSize(this.maxExcludeDNCount).build(); 071 } 072 073 /** 074 * Try to add a datanode to the regionserver excluding cache 075 * @param datanodeInfo the datanode to be added to the excluded cache 076 * @param cause the cause that the datanode is hope to be excluded 077 * @return True if the datanode is added to the regionserver excluding cache, false otherwise 078 */ 079 public boolean tryAddExcludeDN(DatanodeInfo datanodeInfo, String cause) { 080 boolean alreadyMarkedSlow = getExcludeDNs().containsKey(datanodeInfo); 081 if (!alreadyMarkedSlow) { 082 excludeDNsCache.put(datanodeInfo, new Pair<>(cause, EnvironmentEdgeManager.currentTime())); 083 LOG.info( 084 "Added datanode: {} to exclude cache by [{}] success, current excludeDNsCache size={}", 085 datanodeInfo, cause, excludeDNsCache.size()); 086 return true; 087 } 088 LOG.debug( 089 "Try add datanode {} to exclude cache by [{}] failed, " + "current exclude DNs are {}", 090 datanodeInfo, cause, getExcludeDNs().keySet()); 091 return false; 092 } 093 094 public StreamSlowMonitor getStreamSlowMonitor(String name) { 095 String key = name == null || name.isEmpty() ? "defaultMonitorName" : name; 096 return streamSlowMonitors.computeIfAbsent(key, k -> new StreamSlowMonitor(conf, key, this)); 097 } 098 099 /** 100 * Enumerates the reason of excluding a Datanode from WAL Write due to specific cause. Each enum 101 * constant represents a specific cause leading to exclusion. 102 */ 103 public enum ExcludeCause { 104 CONNECT_ERROR("connect error"), 105 SLOW_PACKET_ACK("slow packet ack"); 106 107 private final String cause; 108 109 ExcludeCause(String cause) { 110 this.cause = cause; 111 } 112 113 public String getCause() { 114 return cause; 115 } 116 117 @Override 118 public String toString() { 119 return cause; 120 } 121 } 122 123 public Map<DatanodeInfo, Pair<String, Long>> getExcludeDNs() { 124 return excludeDNsCache.asMap(); 125 } 126 127 @Override 128 public void onConfigurationChange(Configuration conf) { 129 for (StreamSlowMonitor monitor : streamSlowMonitors.values()) { 130 monitor.onConfigurationChange(conf); 131 } 132 this.excludeDNsCache = CacheBuilder.newBuilder() 133 .expireAfterWrite( 134 this.conf.getLong(WAL_EXCLUDE_DATANODE_TTL_KEY, DEFAULT_WAL_EXCLUDE_DATANODE_TTL), 135 TimeUnit.HOURS) 136 .maximumSize(this.conf.getInt(WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY, 137 DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT)) 138 .build(); 139 } 140}