001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs.monitor;
019
020import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.DEFAULT_WAL_EXCLUDE_DATANODE_TTL;
021import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT;
022import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.WAL_EXCLUDE_DATANODE_TTL_KEY;
023import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY;
024
025import java.util.Deque;
026import java.util.concurrent.ConcurrentLinkedDeque;
027import java.util.concurrent.TimeUnit;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.conf.ConfigurationObserver;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
037import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
038import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
039
040/**
041 * Class for monitor the wal file flush performance. Each active wal file has a StreamSlowMonitor.
042 */
043@InterfaceAudience.Private
044public class StreamSlowMonitor implements ConfigurationObserver {
045  private static final Logger LOG = LoggerFactory.getLogger(StreamSlowMonitor.class);
046
047  /**
048   * Configure for the min count for a datanode detected slow. If a datanode is detected slow times
049   * up to this count, then it will be added to the exclude datanode cache by
050   * {@link ExcludeDatanodeManager#tryAddExcludeDN(DatanodeInfo, String)} of this regionsever.
051   */
052  private static final String WAL_SLOW_DETECT_MIN_COUNT_KEY =
053    "hbase.regionserver.async.wal.min.slow.detect.count";
054  private static final int DEFAULT_WAL_SLOW_DETECT_MIN_COUNT = 3;
055
056  /**
057   * Configure for the TTL of the data that a datanode detected slow.
058   */
059  private static final String WAL_SLOW_DETECT_DATA_TTL_KEY =
060    "hbase.regionserver.async.wal.slow.detect.data.ttl.ms";
061  private static final long DEFAULT_WAL_SLOW_DETECT_DATA_TTL = 10 * 60 * 1000; // 10min in ms
062
063  /**
064   * Configure for the speed check of packet min length. For packets whose data length smaller than
065   * this value, check slow by processing time. While for packets whose data length larger than this
066   * value, check slow by flushing speed.
067   */
068  private static final String DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY =
069    "hbase.regionserver.async.wal.datanode.slow.check.speed.packet.data.length.min";
070  // 64KB
071  private static final long DEFAULT_DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH = 64 * 1024;
072
073  /**
074   * Configure for the slow packet process time, a duration from send to ACK. The processing time
075   * check is for packets that data length smaller than
076   * {@link StreamSlowMonitor#DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY}
077   */
078  public static final String DATANODE_SLOW_PACKET_PROCESS_TIME_KEY =
079    "hbase.regionserver.async.wal.datanode.slow.packet.process.time.millis";
080  // 6s in ms
081  private static final long DEFAULT_DATANODE_SLOW_PACKET_PROCESS_TIME = 6000;
082
083  /**
084   * Configure for the check of large packet(which is configured by
085   * {@link StreamSlowMonitor#DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY}) flush speed.
086   * e.g. If the configured slow packet process time is smaller than 10s, then here 20KB/s means
087   * 64KB should be processed in less than 3.2s.
088   */
089  private static final String DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED_KEY =
090    "hbase.regionserver.async.wal.datanode.slow.packet.speed.min.kbs";
091  // 20KB/s
092  private static final double DEFAULT_DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED = 20;
093
094  private final String name;
095  // this is a map of datanodeInfo->queued slow PacketAckData
096  private final LoadingCache<DatanodeInfo, Deque<PacketAckData>> datanodeSlowDataQueue;
097  private final ExcludeDatanodeManager excludeDatanodeManager;
098
099  private int minSlowDetectCount;
100  private long slowDataTtl;
101  private long slowPacketAckMs;
102  private double minPacketFlushSpeedKBs;
103  private long minLengthForSpeedCheck;
104
105  public StreamSlowMonitor(Configuration conf, String name,
106    ExcludeDatanodeManager excludeDatanodeManager) {
107    setConf(conf);
108    this.name = name;
109    this.excludeDatanodeManager = excludeDatanodeManager;
110    this.datanodeSlowDataQueue = CacheBuilder.newBuilder()
111      .maximumSize(conf.getInt(WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY,
112        DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT))
113      .expireAfterWrite(
114        conf.getLong(WAL_EXCLUDE_DATANODE_TTL_KEY, DEFAULT_WAL_EXCLUDE_DATANODE_TTL),
115        TimeUnit.HOURS)
116      .build(new CacheLoader<DatanodeInfo, Deque<PacketAckData>>() {
117        @Override
118        public Deque<PacketAckData> load(DatanodeInfo key) throws Exception {
119          return new ConcurrentLinkedDeque<>();
120        }
121      });
122    LOG.info("New stream slow monitor {}", this.name);
123  }
124
125  public static StreamSlowMonitor create(Configuration conf, String name) {
126    return new StreamSlowMonitor(conf, name, new ExcludeDatanodeManager(conf));
127  }
128
129  /**
130   * Check if the packet process time shows that the relevant datanode is a slow node.
131   * @param datanodeInfo     the datanode that processed the packet
132   * @param packetDataLen    the data length of the packet (in bytes)
133   * @param processTimeMs    the process time (in ms) of the packet on the datanode,
134   * @param lastAckTimestamp the last acked timestamp of the packet on another datanode
135   * @param unfinished       if the packet is unfinished flushed to the datanode replicas
136   */
137  public void checkProcessTimeAndSpeed(DatanodeInfo datanodeInfo, long packetDataLen,
138    long processTimeMs, long lastAckTimestamp, int unfinished) {
139    long current = EnvironmentEdgeManager.currentTime();
140    // Here are two conditions used to determine whether a datanode is slow,
141    // 1. For small packet, we just have a simple time limit, without considering
142    // the size of the packet.
143    // 2. For large packet, we will calculate the speed, and check if the speed is too slow.
144    boolean slow = (packetDataLen <= minLengthForSpeedCheck && processTimeMs > slowPacketAckMs)
145      || (packetDataLen > minLengthForSpeedCheck
146        && (double) packetDataLen / processTimeMs < minPacketFlushSpeedKBs);
147    if (slow) {
148      // Check if large diff ack timestamp between replicas,
149      // should try to avoid misjudgments that caused by GC STW.
150      if (
151        (lastAckTimestamp > 0 && current - lastAckTimestamp > slowPacketAckMs / 2)
152          || (lastAckTimestamp <= 0 && unfinished == 0)
153      ) {
154        LOG.info(
155          "Slow datanode: {}, data length={}, duration={}ms, unfinishedReplicas={}, "
156            + "lastAckTimestamp={}, monitor name: {}",
157          datanodeInfo, packetDataLen, processTimeMs, unfinished, lastAckTimestamp, this.name);
158        if (addSlowAckData(datanodeInfo, packetDataLen, processTimeMs)) {
159          excludeDatanodeManager.tryAddExcludeDN(datanodeInfo, "slow packet ack");
160        }
161      }
162    }
163  }
164
165  @Override
166  public void onConfigurationChange(Configuration conf) {
167    setConf(conf);
168  }
169
170  private boolean addSlowAckData(DatanodeInfo datanodeInfo, long dataLength, long processTime) {
171    Deque<PacketAckData> slowDNQueue = datanodeSlowDataQueue.getUnchecked(datanodeInfo);
172    long current = EnvironmentEdgeManager.currentTime();
173    while (
174      !slowDNQueue.isEmpty() && (current - slowDNQueue.getFirst().getTimestamp() > slowDataTtl
175        || slowDNQueue.size() >= minSlowDetectCount)
176    ) {
177      slowDNQueue.removeFirst();
178    }
179    slowDNQueue.addLast(new PacketAckData(dataLength, processTime));
180    return slowDNQueue.size() >= minSlowDetectCount;
181  }
182
183  private void setConf(Configuration conf) {
184    this.minSlowDetectCount =
185      conf.getInt(WAL_SLOW_DETECT_MIN_COUNT_KEY, DEFAULT_WAL_SLOW_DETECT_MIN_COUNT);
186    this.slowDataTtl = conf.getLong(WAL_SLOW_DETECT_DATA_TTL_KEY, DEFAULT_WAL_SLOW_DETECT_DATA_TTL);
187    this.slowPacketAckMs = conf.getLong(DATANODE_SLOW_PACKET_PROCESS_TIME_KEY,
188      DEFAULT_DATANODE_SLOW_PACKET_PROCESS_TIME);
189    this.minLengthForSpeedCheck =
190      conf.getLong(DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY,
191        DEFAULT_DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH);
192    this.minPacketFlushSpeedKBs = conf.getDouble(DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED_KEY,
193      DEFAULT_DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED);
194  }
195
196  public ExcludeDatanodeManager getExcludeDatanodeManager() {
197    return excludeDatanodeManager;
198  }
199
200  private static class PacketAckData {
201    private final long dataLength;
202    private final long processTime;
203    private final long timestamp;
204
205    public PacketAckData(long dataLength, long processTime) {
206      this.dataLength = dataLength;
207      this.processTime = processTime;
208      this.timestamp = EnvironmentEdgeManager.currentTime();
209    }
210
211    public long getDataLength() {
212      return dataLength;
213    }
214
215    public long getProcessTime() {
216      return processTime;
217    }
218
219    public long getTimestamp() {
220      return timestamp;
221    }
222  }
223}