001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs.monitor; 019 020import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.DEFAULT_WAL_EXCLUDE_DATANODE_TTL; 021import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT; 022import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.WAL_EXCLUDE_DATANODE_TTL_KEY; 023import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY; 024 025import java.util.Deque; 026import java.util.concurrent.ConcurrentLinkedDeque; 027import java.util.concurrent.TimeUnit; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.conf.ConfigurationObserver; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 037import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; 038import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; 039 040/** 041 * Class for monitor the wal file flush performance. Each active wal file has a StreamSlowMonitor. 042 */ 043@InterfaceAudience.Private 044public class StreamSlowMonitor implements ConfigurationObserver { 045 private static final Logger LOG = LoggerFactory.getLogger(StreamSlowMonitor.class); 046 047 /** 048 * Configure for the min count for a datanode detected slow. If a datanode is detected slow times 049 * up to this count, then it will be added to the exclude datanode cache by 050 * {@link ExcludeDatanodeManager#tryAddExcludeDN(DatanodeInfo, String)} of this regionsever. 051 */ 052 private static final String WAL_SLOW_DETECT_MIN_COUNT_KEY = 053 "hbase.regionserver.async.wal.min.slow.detect.count"; 054 private static final int DEFAULT_WAL_SLOW_DETECT_MIN_COUNT = 3; 055 056 /** 057 * Configure for the TTL of the data that a datanode detected slow. 058 */ 059 private static final String WAL_SLOW_DETECT_DATA_TTL_KEY = 060 "hbase.regionserver.async.wal.slow.detect.data.ttl.ms"; 061 private static final long DEFAULT_WAL_SLOW_DETECT_DATA_TTL = 10 * 60 * 1000; // 10min in ms 062 063 /** 064 * Configure for the speed check of packet min length. For packets whose data length smaller than 065 * this value, check slow by processing time. While for packets whose data length larger than this 066 * value, check slow by flushing speed. 067 */ 068 private static final String DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY = 069 "hbase.regionserver.async.wal.datanode.slow.check.speed.packet.data.length.min"; 070 // 64KB 071 private static final long DEFAULT_DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH = 64 * 1024; 072 073 /** 074 * Configure for the slow packet process time, a duration from send to ACK. The processing time 075 * check is for packets that data length smaller than 076 * {@link StreamSlowMonitor#DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY} 077 */ 078 public static final String DATANODE_SLOW_PACKET_PROCESS_TIME_KEY = 079 "hbase.regionserver.async.wal.datanode.slow.packet.process.time.millis"; 080 // 6s in ms 081 private static final long DEFAULT_DATANODE_SLOW_PACKET_PROCESS_TIME = 6000; 082 083 /** 084 * Configure for the check of large packet(which is configured by 085 * {@link StreamSlowMonitor#DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY}) flush speed. 086 * e.g. If the configured slow packet process time is smaller than 10s, then here 20KB/s means 087 * 64KB should be processed in less than 3.2s. 088 */ 089 private static final String DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED_KEY = 090 "hbase.regionserver.async.wal.datanode.slow.packet.speed.min.kbs"; 091 // 20KB/s 092 private static final double DEFAULT_DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED = 20; 093 094 private final String name; 095 // this is a map of datanodeInfo->queued slow PacketAckData 096 private final LoadingCache<DatanodeInfo, Deque<PacketAckData>> datanodeSlowDataQueue; 097 private final ExcludeDatanodeManager excludeDatanodeManager; 098 099 private int minSlowDetectCount; 100 private long slowDataTtl; 101 private long slowPacketAckMs; 102 private double minPacketFlushSpeedKBs; 103 private long minLengthForSpeedCheck; 104 105 public StreamSlowMonitor(Configuration conf, String name, 106 ExcludeDatanodeManager excludeDatanodeManager) { 107 setConf(conf); 108 this.name = name; 109 this.excludeDatanodeManager = excludeDatanodeManager; 110 this.datanodeSlowDataQueue = CacheBuilder.newBuilder() 111 .maximumSize(conf.getInt(WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY, 112 DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT)) 113 .expireAfterWrite( 114 conf.getLong(WAL_EXCLUDE_DATANODE_TTL_KEY, DEFAULT_WAL_EXCLUDE_DATANODE_TTL), 115 TimeUnit.HOURS) 116 .build(new CacheLoader<DatanodeInfo, Deque<PacketAckData>>() { 117 @Override 118 public Deque<PacketAckData> load(DatanodeInfo key) throws Exception { 119 return new ConcurrentLinkedDeque<>(); 120 } 121 }); 122 LOG.info("New stream slow monitor {}", this.name); 123 } 124 125 public static StreamSlowMonitor create(Configuration conf, String name) { 126 return new StreamSlowMonitor(conf, name, new ExcludeDatanodeManager(conf)); 127 } 128 129 /** 130 * Check if the packet process time shows that the relevant datanode is a slow node. 131 * @param datanodeInfo the datanode that processed the packet 132 * @param packetDataLen the data length of the packet (in bytes) 133 * @param processTimeMs the process time (in ms) of the packet on the datanode, 134 * @param lastAckTimestamp the last acked timestamp of the packet on another datanode 135 * @param unfinished if the packet is unfinished flushed to the datanode replicas 136 */ 137 public void checkProcessTimeAndSpeed(DatanodeInfo datanodeInfo, long packetDataLen, 138 long processTimeMs, long lastAckTimestamp, int unfinished) { 139 long current = EnvironmentEdgeManager.currentTime(); 140 // Here are two conditions used to determine whether a datanode is slow, 141 // 1. For small packet, we just have a simple time limit, without considering 142 // the size of the packet. 143 // 2. For large packet, we will calculate the speed, and check if the speed is too slow. 144 boolean slow = (packetDataLen <= minLengthForSpeedCheck && processTimeMs > slowPacketAckMs) 145 || (packetDataLen > minLengthForSpeedCheck 146 && (double) packetDataLen / processTimeMs < minPacketFlushSpeedKBs); 147 if (slow) { 148 // Check if large diff ack timestamp between replicas, 149 // should try to avoid misjudgments that caused by GC STW. 150 if ( 151 (lastAckTimestamp > 0 && current - lastAckTimestamp > slowPacketAckMs / 2) 152 || (lastAckTimestamp <= 0 && unfinished == 0) 153 ) { 154 LOG.info( 155 "Slow datanode: {}, data length={}, duration={}ms, unfinishedReplicas={}, " 156 + "lastAckTimestamp={}, monitor name: {}", 157 datanodeInfo, packetDataLen, processTimeMs, unfinished, lastAckTimestamp, this.name); 158 if (addSlowAckData(datanodeInfo, packetDataLen, processTimeMs)) { 159 excludeDatanodeManager.tryAddExcludeDN(datanodeInfo, "slow packet ack"); 160 } 161 } 162 } 163 } 164 165 @Override 166 public void onConfigurationChange(Configuration conf) { 167 setConf(conf); 168 } 169 170 private boolean addSlowAckData(DatanodeInfo datanodeInfo, long dataLength, long processTime) { 171 Deque<PacketAckData> slowDNQueue = datanodeSlowDataQueue.getUnchecked(datanodeInfo); 172 long current = EnvironmentEdgeManager.currentTime(); 173 while ( 174 !slowDNQueue.isEmpty() && (current - slowDNQueue.getFirst().getTimestamp() > slowDataTtl 175 || slowDNQueue.size() >= minSlowDetectCount) 176 ) { 177 slowDNQueue.removeFirst(); 178 } 179 slowDNQueue.addLast(new PacketAckData(dataLength, processTime)); 180 return slowDNQueue.size() >= minSlowDetectCount; 181 } 182 183 private void setConf(Configuration conf) { 184 this.minSlowDetectCount = 185 conf.getInt(WAL_SLOW_DETECT_MIN_COUNT_KEY, DEFAULT_WAL_SLOW_DETECT_MIN_COUNT); 186 this.slowDataTtl = conf.getLong(WAL_SLOW_DETECT_DATA_TTL_KEY, DEFAULT_WAL_SLOW_DETECT_DATA_TTL); 187 this.slowPacketAckMs = conf.getLong(DATANODE_SLOW_PACKET_PROCESS_TIME_KEY, 188 DEFAULT_DATANODE_SLOW_PACKET_PROCESS_TIME); 189 this.minLengthForSpeedCheck = 190 conf.getLong(DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY, 191 DEFAULT_DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH); 192 this.minPacketFlushSpeedKBs = conf.getDouble(DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED_KEY, 193 DEFAULT_DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED); 194 } 195 196 public ExcludeDatanodeManager getExcludeDatanodeManager() { 197 return excludeDatanodeManager; 198 } 199 200 private static class PacketAckData { 201 private final long dataLength; 202 private final long processTime; 203 private final long timestamp; 204 205 public PacketAckData(long dataLength, long processTime) { 206 this.dataLength = dataLength; 207 this.processTime = processTime; 208 this.timestamp = EnvironmentEdgeManager.currentTime(); 209 } 210 211 public long getDataLength() { 212 return dataLength; 213 } 214 215 public long getProcessTime() { 216 return processTime; 217 } 218 219 public long getTimestamp() { 220 return timestamp; 221 } 222 } 223}