001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs.monitor; 019 020import java.util.Map; 021import java.util.concurrent.ConcurrentHashMap; 022import java.util.concurrent.TimeUnit; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.conf.ConfigurationObserver; 025import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 026import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 027import org.apache.yetus.audience.InterfaceAudience; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 032import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 033 034/** 035 * The class to manage the excluded datanodes of the WALs on the regionserver. 036 */ 037@InterfaceAudience.Private 038public class ExcludeDatanodeManager implements ConfigurationObserver { 039 private static final Logger LOG = LoggerFactory.getLogger(ExcludeDatanodeManager.class); 040 041 /** 042 * Configure for the max count the excluded datanodes. 043 */ 044 public static final String WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY = 045 "hbase.regionserver.async.wal.max.exclude.datanode.count"; 046 public static final int DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT = 3; 047 048 /** 049 * Configure for the TTL time of the datanodes excluded 050 */ 051 public static final String WAL_EXCLUDE_DATANODE_TTL_KEY = 052 "hbase.regionserver.async.wal.exclude.datanode.info.ttl.hour"; 053 public static final int DEFAULT_WAL_EXCLUDE_DATANODE_TTL = 6; // 6 hours 054 055 private volatile Cache<DatanodeInfo, Long> excludeDNsCache; 056 private final int maxExcludeDNCount; 057 private final Configuration conf; 058 // This is a map of providerId->StreamSlowMonitor 059 private final Map<String, StreamSlowMonitor> streamSlowMonitors = new ConcurrentHashMap<>(1); 060 061 public ExcludeDatanodeManager(Configuration conf) { 062 this.conf = conf; 063 this.maxExcludeDNCount = conf.getInt(WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY, 064 DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT); 065 this.excludeDNsCache = CacheBuilder.newBuilder() 066 .expireAfterWrite( 067 this.conf.getLong(WAL_EXCLUDE_DATANODE_TTL_KEY, DEFAULT_WAL_EXCLUDE_DATANODE_TTL), 068 TimeUnit.HOURS) 069 .maximumSize(this.maxExcludeDNCount).build(); 070 } 071 072 /** 073 * Try to add a datanode to the regionserver excluding cache 074 * @param datanodeInfo the datanode to be added to the excluded cache 075 * @param cause the cause that the datanode is hope to be excluded 076 * @return True if the datanode is added to the regionserver excluding cache, false otherwise 077 */ 078 public boolean tryAddExcludeDN(DatanodeInfo datanodeInfo, String cause) { 079 boolean alreadyMarkedSlow = getExcludeDNs().containsKey(datanodeInfo); 080 if (!alreadyMarkedSlow) { 081 excludeDNsCache.put(datanodeInfo, EnvironmentEdgeManager.currentTime()); 082 LOG.info( 083 "Added datanode: {} to exclude cache by [{}] success, current excludeDNsCache size={}", 084 datanodeInfo, cause, excludeDNsCache.size()); 085 return true; 086 } 087 LOG.debug( 088 "Try add datanode {} to exclude cache by [{}] failed, " + "current exclude DNs are {}", 089 datanodeInfo, cause, getExcludeDNs().keySet()); 090 return false; 091 } 092 093 public StreamSlowMonitor getStreamSlowMonitor(String name) { 094 String key = name == null || name.isEmpty() ? "defaultMonitorName" : name; 095 return streamSlowMonitors.computeIfAbsent(key, k -> new StreamSlowMonitor(conf, key, this)); 096 } 097 098 public Map<DatanodeInfo, Long> getExcludeDNs() { 099 return excludeDNsCache.asMap(); 100 } 101 102 @Override 103 public void onConfigurationChange(Configuration conf) { 104 for (StreamSlowMonitor monitor : streamSlowMonitors.values()) { 105 monitor.onConfigurationChange(conf); 106 } 107 this.excludeDNsCache = CacheBuilder.newBuilder() 108 .expireAfterWrite( 109 this.conf.getLong(WAL_EXCLUDE_DATANODE_TTL_KEY, DEFAULT_WAL_EXCLUDE_DATANODE_TTL), 110 TimeUnit.HOURS) 111 .maximumSize(this.conf.getInt(WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY, 112 DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT)) 113 .build(); 114 } 115}