001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master;
020
021import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
022
023import java.io.IOException;
024import java.net.InetAddress;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031import java.util.Set;
032import java.util.concurrent.ConcurrentNavigableMap;
033import java.util.concurrent.ConcurrentSkipListMap;
034import java.util.concurrent.CopyOnWriteArrayList;
035import java.util.concurrent.atomic.AtomicBoolean;
036import java.util.function.Predicate;
037
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.ClockOutOfSyncException;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.NotServingRegionException;
042import org.apache.hadoop.hbase.RegionMetrics;
043import org.apache.hadoop.hbase.ServerMetrics;
044import org.apache.hadoop.hbase.ServerMetricsBuilder;
045import org.apache.hadoop.hbase.ServerName;
046import org.apache.hadoop.hbase.YouAreDeadException;
047import org.apache.hadoop.hbase.client.ClusterConnection;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.client.RetriesExhaustedException;
050import org.apache.hadoop.hbase.ipc.HBaseRpcController;
051import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
052import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
053import org.apache.hadoop.hbase.monitoring.MonitoredTask;
054import org.apache.hadoop.hbase.regionserver.HRegionServer;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.zookeeper.ZKUtil;
057import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
058import org.apache.yetus.audience.InterfaceAudience;
059import org.apache.zookeeper.KeeperException;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
064import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
065
066import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
070import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
071
072/**
073 * The ServerManager class manages info about region servers.
074 * <p>
075 * Maintains lists of online and dead servers.  Processes the startups,
076 * shutdowns, and deaths of region servers.
077 * <p>
078 * Servers are distinguished in two different ways.  A given server has a
079 * location, specified by hostname and port, and of which there can only be one
080 * online at any given time.  A server instance is specified by the location
081 * (hostname and port) as well as the startcode (timestamp from when the server
082 * was started).  This is used to differentiate a restarted instance of a given
083 * server from the original instance.
084 * <p>
085 * If a sever is known not to be running any more, it is called dead. The dead
086 * server needs to be handled by a ServerShutdownHandler.  If the handler is not
087 * enabled yet, the server can't be handled right away so it is queued up.
088 * After the handler is enabled, the server will be submitted to a handler to handle.
089 * However, the handler may be just partially enabled.  If so,
090 * the server cannot be fully processed, and be queued up for further processing.
091 * A server is fully processed only after the handler is fully enabled
092 * and has completed the handling.
093 */
094@InterfaceAudience.Private
095public class ServerManager {
096  public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
097      "hbase.master.wait.on.regionservers.maxtostart";
098
099  public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
100      "hbase.master.wait.on.regionservers.mintostart";
101
102  public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
103      "hbase.master.wait.on.regionservers.timeout";
104
105  public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
106      "hbase.master.wait.on.regionservers.interval";
107
108  private static final Logger LOG = LoggerFactory.getLogger(ServerManager.class);
109
110  // Set if we are to shutdown the cluster.
111  private AtomicBoolean clusterShutdown = new AtomicBoolean(false);
112
113  /**
114   * The last flushed sequence id for a region.
115   */
116  private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
117    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
118
119  /**
120   * The last flushed sequence id for a store in a region.
121   */
122  private final ConcurrentNavigableMap<byte[], ConcurrentNavigableMap<byte[], Long>>
123    storeFlushedSequenceIdsByRegion = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
124
125  /** Map of registered servers to their current load */
126  private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
127    new ConcurrentSkipListMap<>();
128
129  /**
130   * Map of admin interfaces per registered regionserver; these interfaces we use to control
131   * regionservers out on the cluster
132   */
133  private final Map<ServerName, AdminService.BlockingInterface> rsAdmins = new HashMap<>();
134
135  /** List of region servers that should not get any more new regions. */
136  private final ArrayList<ServerName> drainingServers = new ArrayList<>();
137
138  private final MasterServices master;
139  private final ClusterConnection connection;
140
141  private final DeadServer deadservers = new DeadServer();
142
143  private final long maxSkew;
144  private final long warningSkew;
145
146  private final RpcControllerFactory rpcControllerFactory;
147
148  /** Listeners that are called on server events. */
149  private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
150
151  /**
152   * Constructor.
153   */
154  public ServerManager(final MasterServices master) {
155    this.master = master;
156    Configuration c = master.getConfiguration();
157    maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
158    warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
159    this.connection = master.getClusterConnection();
160    this.rpcControllerFactory = this.connection == null? null: connection.getRpcControllerFactory();
161  }
162
163  /**
164   * Add the listener to the notification list.
165   * @param listener The ServerListener to register
166   */
167  public void registerListener(final ServerListener listener) {
168    this.listeners.add(listener);
169  }
170
171  /**
172   * Remove the listener from the notification list.
173   * @param listener The ServerListener to unregister
174   */
175  public boolean unregisterListener(final ServerListener listener) {
176    return this.listeners.remove(listener);
177  }
178
179  /**
180   * Let the server manager know a new regionserver has come online
181   * @param request the startup request
182   * @param versionNumber the version number of the new regionserver
183   * @param version the version of the new regionserver, could contain strings like "SNAPSHOT"
184   * @param ia the InetAddress from which request is received
185   * @return The ServerName we know this server as.
186   * @throws IOException
187   */
188  ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber,
189      String version, InetAddress ia) throws IOException {
190    // Test for case where we get a region startup message from a regionserver
191    // that has been quickly restarted but whose znode expiration handler has
192    // not yet run, or from a server whose fail we are currently processing.
193    // Test its host+port combo is present in serverAddressToServerInfo. If it
194    // is, reject the server and trigger its expiration. The next time it comes
195    // in, it should have been removed from serverAddressToServerInfo and queued
196    // for processing by ProcessServerShutdown.
197
198    final String hostname =
199      request.hasUseThisHostnameInstead() ? request.getUseThisHostnameInstead() : ia.getHostName();
200    ServerName sn = ServerName.valueOf(hostname, request.getPort(), request.getServerStartCode());
201    checkClockSkew(sn, request.getServerCurrentTime());
202    checkIsDead(sn, "STARTUP");
203    if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber, version))) {
204      LOG.warn(
205        "THIS SHOULD NOT HAPPEN, RegionServerStartup" + " could not record the server: " + sn);
206    }
207    return sn;
208  }
209
210  /**
211   * Updates last flushed sequence Ids for the regions on server sn
212   * @param sn
213   * @param hsl
214   */
215  private void updateLastFlushedSequenceIds(ServerName sn, ServerMetrics hsl) {
216    for (Entry<byte[], RegionMetrics> entry : hsl.getRegionMetrics().entrySet()) {
217      byte[] encodedRegionName = Bytes.toBytes(RegionInfo.encodeRegionName(entry.getKey()));
218      Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
219      long l = entry.getValue().getCompletedSequenceId();
220      // Don't let smaller sequence ids override greater sequence ids.
221      if (LOG.isTraceEnabled()) {
222        LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue +
223          ", completeSequenceId=" + l);
224      }
225      if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
226        flushedSequenceIdByRegion.put(encodedRegionName, l);
227      } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
228        LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id ("
229            + l + ") that is less than the previous last flushed sequence id ("
230            + existingValue + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
231      }
232      ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
233          computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
234            () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
235      for (Entry<byte[], Long> storeSeqId : entry.getValue().getStoreSequenceId().entrySet()) {
236        byte[] family = storeSeqId.getKey();
237        existingValue = storeFlushedSequenceId.get(family);
238        l = storeSeqId.getValue();
239        if (LOG.isTraceEnabled()) {
240          LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family) +
241            ", existingValue=" + existingValue + ", completeSequenceId=" + l);
242        }
243        // Don't let smaller sequence ids override greater sequence ids.
244        if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
245          storeFlushedSequenceId.put(family, l);
246        }
247      }
248    }
249  }
250
251  @VisibleForTesting
252  public void regionServerReport(ServerName sn, ServerMetrics sl) throws YouAreDeadException {
253    checkIsDead(sn, "REPORT");
254    if (null == this.onlineServers.replace(sn, sl)) {
255      // Already have this host+port combo and its just different start code?
256      // Just let the server in. Presume master joining a running cluster.
257      // recordNewServer is what happens at the end of reportServerStartup.
258      // The only thing we are skipping is passing back to the regionserver
259      // the ServerName to use. Here we presume a master has already done
260      // that so we'll press on with whatever it gave us for ServerName.
261      if (!checkAndRecordNewServer(sn, sl)) {
262        LOG.info("RegionServerReport ignored, could not record the server: " + sn);
263        return; // Not recorded, so no need to move on
264      }
265    }
266    updateLastFlushedSequenceIds(sn, sl);
267  }
268
269  /**
270   * Check is a server of same host and port already exists,
271   * if not, or the existed one got a smaller start code, record it.
272   *
273   * @param serverName the server to check and record
274   * @param sl the server load on the server
275   * @return true if the server is recorded, otherwise, false
276   */
277  boolean checkAndRecordNewServer(final ServerName serverName, final ServerMetrics sl) {
278    ServerName existingServer = null;
279    synchronized (this.onlineServers) {
280      existingServer = findServerWithSameHostnamePortWithLock(serverName);
281      if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
282        LOG.info("Server serverName=" + serverName + " rejected; we already have "
283            + existingServer.toString() + " registered with same hostname and port");
284        return false;
285      }
286      recordNewServerWithLock(serverName, sl);
287    }
288
289    // Tell our listeners that a server was added
290    if (!this.listeners.isEmpty()) {
291      for (ServerListener listener : this.listeners) {
292        listener.serverAdded(serverName);
293      }
294    }
295
296    // Note that we assume that same ts means same server, and don't expire in that case.
297    //  TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky.
298    if (existingServer != null &&
299        (existingServer.getStartcode() < serverName.getStartcode())) {
300      LOG.info("Triggering server recovery; existingServer " +
301          existingServer + " looks stale, new server:" + serverName);
302      expireServer(existingServer);
303    }
304    return true;
305  }
306
307  /**
308   * Find out the region servers crashed between the crash of the previous master instance and the
309   * current master instance and schedule SCP for them.
310   * <p/>
311   * Since the {@code RegionServerTracker} has already helped us to construct the online servers set
312   * by scanning zookeeper, now we can compare the online servers with {@code liveServersFromWALDir}
313   * to find out whether there are servers which are already dead.
314   * <p/>
315   * Must be called inside the initialization method of {@code RegionServerTracker} to avoid
316   * concurrency issue.
317   * @param deadServersFromPE the region servers which already have a SCP associated.
318   * @param liveServersFromWALDir the live region servers from wal directory.
319   */
320  void findDeadServersAndProcess(Set<ServerCrashProcedure> deadServersFromPE,
321      Set<ServerName> liveServersFromWALDir) {
322    deadServersFromPE.forEach(scp -> deadservers.add(scp.getServerName(), !scp.isFinished()));
323    liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
324      .forEach(this::expireServer);
325  }
326
327  /**
328   * Checks if the clock skew between the server and the master. If the clock skew exceeds the
329   * configured max, it will throw an exception; if it exceeds the configured warning threshold,
330   * it will log a warning but start normally.
331   * @param serverName Incoming servers's name
332   * @param serverCurrentTime
333   * @throws ClockOutOfSyncException if the skew exceeds the configured max value
334   */
335  private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
336      throws ClockOutOfSyncException {
337    long skew = Math.abs(System.currentTimeMillis() - serverCurrentTime);
338    if (skew > maxSkew) {
339      String message = "Server " + serverName + " has been " +
340        "rejected; Reported time is too far out of sync with master.  " +
341        "Time difference of " + skew + "ms > max allowed of " + maxSkew + "ms";
342      LOG.warn(message);
343      throw new ClockOutOfSyncException(message);
344    } else if (skew > warningSkew){
345      String message = "Reported time for server " + serverName + " is out of sync with master " +
346        "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " +
347        "error threshold is " + maxSkew + "ms)";
348      LOG.warn(message);
349    }
350  }
351
352  /**
353   * If this server is on the dead list, reject it with a YouAreDeadException.
354   * If it was dead but came back with a new start code, remove the old entry
355   * from the dead list.
356   * @param what START or REPORT
357   */
358  private void checkIsDead(final ServerName serverName, final String what)
359      throws YouAreDeadException {
360    if (this.deadservers.isDeadServer(serverName)) {
361      // host name, port and start code all match with existing one of the
362      // dead servers. So, this server must be dead.
363      String message = "Server " + what + " rejected; currently processing " +
364          serverName + " as dead server";
365      LOG.debug(message);
366      throw new YouAreDeadException(message);
367    }
368    // remove dead server with same hostname and port of newly checking in rs after master
369    // initialization.See HBASE-5916 for more information.
370    if ((this.master == null || this.master.isInitialized())
371        && this.deadservers.cleanPreviousInstance(serverName)) {
372      // This server has now become alive after we marked it as dead.
373      // We removed it's previous entry from the dead list to reflect it.
374      LOG.debug(what + ":" + " Server " + serverName + " came back up," +
375          " removed it from the dead servers list");
376    }
377  }
378
379  /**
380   * Assumes onlineServers is locked.
381   * @return ServerName with matching hostname and port.
382   */
383  private ServerName findServerWithSameHostnamePortWithLock(
384      final ServerName serverName) {
385    ServerName end = ServerName.valueOf(serverName.getHostname(), serverName.getPort(),
386        Long.MAX_VALUE);
387
388    ServerName r = onlineServers.lowerKey(end);
389    if (r != null) {
390      if (ServerName.isSameAddress(r, serverName)) {
391        return r;
392      }
393    }
394    return null;
395  }
396
397  /**
398   * Adds the onlineServers list. onlineServers should be locked.
399   * @param serverName The remote servers name.
400   */
401  @VisibleForTesting
402  void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) {
403    LOG.info("Registering regionserver=" + serverName);
404    this.onlineServers.put(serverName, sl);
405    this.rsAdmins.remove(serverName);
406  }
407
408  public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
409    RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
410    Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
411    builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM);
412    Map<byte[], Long> storeFlushedSequenceId =
413        storeFlushedSequenceIdsByRegion.get(encodedRegionName);
414    if (storeFlushedSequenceId != null) {
415      for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) {
416        builder.addStoreSequenceId(StoreSequenceId.newBuilder()
417            .setFamilyName(UnsafeByteOperations.unsafeWrap(entry.getKey()))
418            .setSequenceId(entry.getValue().longValue()).build());
419      }
420    }
421    return builder.build();
422  }
423
424  /**
425   * @param serverName
426   * @return ServerMetrics if serverName is known else null
427   */
428  public ServerMetrics getLoad(final ServerName serverName) {
429    return this.onlineServers.get(serverName);
430  }
431
432  /**
433   * Compute the average load across all region servers.
434   * Currently, this uses a very naive computation - just uses the number of
435   * regions being served, ignoring stats about number of requests.
436   * @return the average load
437   */
438  public double getAverageLoad() {
439    int totalLoad = 0;
440    int numServers = 0;
441    for (ServerMetrics sl : this.onlineServers.values()) {
442      numServers++;
443      totalLoad += sl.getRegionMetrics().size();
444    }
445    return numServers == 0 ? 0 :
446      (double)totalLoad / (double)numServers;
447  }
448
449  /** @return the count of active regionservers */
450  public int countOfRegionServers() {
451    // Presumes onlineServers is a concurrent map
452    return this.onlineServers.size();
453  }
454
455  /**
456   * @return Read-only map of servers to serverinfo
457   */
458  public Map<ServerName, ServerMetrics> getOnlineServers() {
459    // Presumption is that iterating the returned Map is OK.
460    synchronized (this.onlineServers) {
461      return Collections.unmodifiableMap(this.onlineServers);
462    }
463  }
464
465  public DeadServer getDeadServers() {
466    return this.deadservers;
467  }
468
469  /**
470   * Checks if any dead servers are currently in progress.
471   * @return true if any RS are being processed as dead, false if not
472   */
473  public boolean areDeadServersInProgress() {
474    return this.deadservers.areDeadServersInProgress();
475  }
476
477  void letRegionServersShutdown() {
478    long previousLogTime = 0;
479    ServerName sn = master.getServerName();
480    ZKWatcher zkw = master.getZooKeeper();
481    int onlineServersCt;
482    while ((onlineServersCt = onlineServers.size()) > 0){
483
484      if (System.currentTimeMillis() > (previousLogTime + 1000)) {
485        Set<ServerName> remainingServers = onlineServers.keySet();
486        synchronized (onlineServers) {
487          if (remainingServers.size() == 1 && remainingServers.contains(sn)) {
488            // Master will delete itself later.
489            return;
490          }
491        }
492        StringBuilder sb = new StringBuilder();
493        // It's ok here to not sync on onlineServers - merely logging
494        for (ServerName key : remainingServers) {
495          if (sb.length() > 0) {
496            sb.append(", ");
497          }
498          sb.append(key);
499        }
500        LOG.info("Waiting on regionserver(s) " + sb.toString());
501        previousLogTime = System.currentTimeMillis();
502      }
503
504      try {
505        List<String> servers = getRegionServersInZK(zkw);
506        if (servers == null || servers.isEmpty() || (servers.size() == 1
507            && servers.contains(sn.toString()))) {
508          LOG.info("ZK shows there is only the master self online, exiting now");
509          // Master could have lost some ZK events, no need to wait more.
510          break;
511        }
512      } catch (KeeperException ke) {
513        LOG.warn("Failed to list regionservers", ke);
514        // ZK is malfunctioning, don't hang here
515        break;
516      }
517      synchronized (onlineServers) {
518        try {
519          if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
520        } catch (InterruptedException ignored) {
521          // continue
522        }
523      }
524    }
525  }
526
527  private List<String> getRegionServersInZK(final ZKWatcher zkw)
528  throws KeeperException {
529    return ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
530  }
531
532  /**
533   * @return True if we should expire <code>serverName</code>
534   */
535  boolean expire(ServerName serverName) {
536    return this.onlineServers.containsKey(serverName) ||
537        this.deadservers.isDeadServer(serverName) ||
538        this.master.getAssignmentManager().getRegionStates().getServerNode(serverName) != null ||
539        this.master.getMasterWalManager().isWALDirectoryNameWithWALs(serverName);
540  }
541
542  /**
543   * Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
544   * @return True if we queued a ServerCrashProcedure else false if we did not (could happen for
545   *         many reasons including the fact that its this server that is going down or we already
546   *         have queued an SCP for this server or SCP processing is currently disabled because we
547   *         are in startup phase).
548   */
549  public synchronized boolean expireServer(final ServerName serverName) {
550    // THIS server is going down... can't handle our own expiration.
551    if (serverName.equals(master.getServerName())) {
552      if (!(master.isAborted() || master.isStopped())) {
553        master.stop("We lost our znode?");
554      }
555      return false;
556    }
557    // Check if we should bother running an expire!
558    if (!expire(serverName)) {
559      LOG.info("Skipping expire; {} is not online, not in deadservers, not in fs -- presuming " +
560          "long gone server instance!", serverName);
561      return false;
562    }
563
564    if (this.deadservers.isDeadServer(serverName)) {
565      LOG.warn("Expiration called on {} but crash processing in progress, serverStateNode={}",
566          serverName,
567          this.master.getAssignmentManager().getRegionStates().getServerNode(serverName));
568      return false;
569    }
570
571    if (!moveFromOnlineToDeadServers(serverName)) {
572      LOG.info("Expiration called on {} but NOT online", serverName);
573      // Continue.
574    }
575
576    // If cluster is going down, yes, servers are going to be expiring; don't
577    // process as a dead server
578    if (isClusterShutdown()) {
579      LOG.info("Cluster shutdown set; " + serverName +
580        " expired; onlineServers=" + this.onlineServers.size());
581      if (this.onlineServers.isEmpty()) {
582        master.stop("Cluster shutdown set; onlineServer=0");
583      }
584      return false;
585    }
586    LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
587    master.getAssignmentManager().submitServerCrash(serverName, true);
588
589    // Tell our listeners that a server was removed
590    if (!this.listeners.isEmpty()) {
591      for (ServerListener listener : this.listeners) {
592        listener.serverRemoved(serverName);
593      }
594    }
595    return true;
596  }
597
598  /**
599   * @return Returns true if was online.
600   */
601  @VisibleForTesting
602  public boolean moveFromOnlineToDeadServers(final ServerName sn) {
603    boolean online = false;
604    synchronized (onlineServers) {
605      // Remove the server from the known servers lists and update load info BUT
606      // add to deadservers first; do this so it'll show in dead servers list if
607      // not in online servers list.
608      this.deadservers.add(sn);
609      if (this.onlineServers.remove(sn) != null) {
610        online = true;
611        onlineServers.notifyAll();
612      }
613    }
614    this.rsAdmins.remove(sn);
615    return online;
616  }
617
618  /*
619   * Remove the server from the drain list.
620   */
621  public synchronized boolean removeServerFromDrainList(final ServerName sn) {
622    // Warn if the server (sn) is not online.  ServerName is of the form:
623    // <hostname> , <port> , <startcode>
624
625    if (!this.isServerOnline(sn)) {
626      LOG.warn("Server " + sn + " is not currently online. " +
627               "Removing from draining list anyway, as requested.");
628    }
629    // Remove the server from the draining servers lists.
630    return this.drainingServers.remove(sn);
631  }
632
633  /**
634   * Add the server to the drain list.
635   * @param sn
636   * @return True if the server is added or the server is already on the drain list.
637   */
638  public synchronized boolean addServerToDrainList(final ServerName sn) {
639    // Warn if the server (sn) is not online.  ServerName is of the form:
640    // <hostname> , <port> , <startcode>
641
642    if (!this.isServerOnline(sn)) {
643      LOG.warn("Server " + sn + " is not currently online. " +
644               "Ignoring request to add it to draining list.");
645      return false;
646    }
647    // Add the server to the draining servers lists, if it's not already in
648    // it.
649    if (this.drainingServers.contains(sn)) {
650      LOG.warn("Server " + sn + " is already in the draining server list." +
651               "Ignoring request to add it again.");
652      return true;
653    }
654    LOG.info("Server " + sn + " added to draining server list.");
655    return this.drainingServers.add(sn);
656  }
657
658  // RPC methods to region servers
659
660  private HBaseRpcController newRpcController() {
661    return rpcControllerFactory == null ? null : rpcControllerFactory.newController();
662  }
663
664  /**
665   * Sends a WARMUP RPC to the specified server to warmup the specified region.
666   * <p>
667   * A region server could reject the close request because it either does not
668   * have the specified region or the region is being split.
669   * @param server server to warmup a region
670   * @param region region to  warmup
671   */
672  public void sendRegionWarmup(ServerName server,
673      RegionInfo region) {
674    if (server == null) return;
675    try {
676      AdminService.BlockingInterface admin = getRsAdmin(server);
677      HBaseRpcController controller = newRpcController();
678      ProtobufUtil.warmupRegion(controller, admin, region);
679    } catch (IOException e) {
680      LOG.error("Received exception in RPC for warmup server:" +
681        server + "region: " + region +
682        "exception: " + e);
683    }
684  }
685
686  /**
687   * Contacts a region server and waits up to timeout ms
688   * to close the region.  This bypasses the active hmaster.
689   */
690  public static void closeRegionSilentlyAndWait(ClusterConnection connection,
691    ServerName server, RegionInfo region, long timeout) throws IOException, InterruptedException {
692    AdminService.BlockingInterface rs = connection.getAdmin(server);
693    HBaseRpcController controller = connection.getRpcControllerFactory().newController();
694    try {
695      ProtobufUtil.closeRegion(controller, rs, server, region.getRegionName());
696    } catch (IOException e) {
697      LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e);
698    }
699    long expiration = timeout + System.currentTimeMillis();
700    while (System.currentTimeMillis() < expiration) {
701      controller.reset();
702      try {
703        RegionInfo rsRegion =
704          ProtobufUtil.getRegionInfo(controller, rs, region.getRegionName());
705        if (rsRegion == null) return;
706      } catch (IOException ioe) {
707        if (ioe instanceof NotServingRegionException) // no need to retry again
708          return;
709        LOG.warn("Exception when retrieving regioninfo from: "
710          + region.getRegionNameAsString(), ioe);
711      }
712      Thread.sleep(1000);
713    }
714    throw new IOException("Region " + region + " failed to close within"
715        + " timeout " + timeout);
716  }
717
718  /**
719   * @param sn
720   * @return Admin interface for the remote regionserver named <code>sn</code>
721   * @throws IOException
722   * @throws RetriesExhaustedException wrapping a ConnectException if failed
723   */
724  public AdminService.BlockingInterface getRsAdmin(final ServerName sn)
725  throws IOException {
726    AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
727    if (admin == null) {
728      LOG.debug("New admin connection to " + sn.toString());
729      if (sn.equals(master.getServerName()) && master instanceof HRegionServer) {
730        // A master is also a region server now, see HBASE-10569 for details
731        admin = ((HRegionServer)master).getRSRpcServices();
732      } else {
733        admin = this.connection.getAdmin(sn);
734      }
735      this.rsAdmins.put(sn, admin);
736    }
737    return admin;
738  }
739
740  /**
741   * Calculate min necessary to start. This is not an absolute. It is just
742   * a friction that will cause us hang around a bit longer waiting on
743   * RegionServers to check-in.
744   */
745  private int getMinToStart() {
746    if (master.isInMaintenanceMode()) {
747      // If in maintenance mode, then master hosting meta will be the only server available
748      return 1;
749    }
750
751    int minimumRequired = 1;
752    if (LoadBalancer.isTablesOnMaster(master.getConfiguration()) &&
753        LoadBalancer.isSystemTablesOnlyOnMaster(master.getConfiguration())) {
754      // If Master is carrying regions it will show up as a 'server', but is not handling user-
755      // space regions, so we need a second server.
756      minimumRequired = 2;
757    }
758
759    int minToStart = this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
760    // Ensure we are never less than minimumRequired else stuff won't work.
761    return Math.max(minToStart, minimumRequired);
762  }
763
764  /**
765   * Wait for the region servers to report in.
766   * We will wait until one of this condition is met:
767   *  - the master is stopped
768   *  - the 'hbase.master.wait.on.regionservers.maxtostart' number of
769   *    region servers is reached
770   *  - the 'hbase.master.wait.on.regionservers.mintostart' is reached AND
771   *   there have been no new region server in for
772   *      'hbase.master.wait.on.regionservers.interval' time AND
773   *   the 'hbase.master.wait.on.regionservers.timeout' is reached
774   *
775   * @throws InterruptedException
776   */
777  public void waitForRegionServers(MonitoredTask status) throws InterruptedException {
778    final long interval = this.master.getConfiguration().
779        getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
780    final long timeout = this.master.getConfiguration().
781        getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
782    // Min is not an absolute; just a friction making us wait longer on server checkin.
783    int minToStart = getMinToStart();
784    int maxToStart = this.master.getConfiguration().
785        getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
786    if (maxToStart < minToStart) {
787      LOG.warn(String.format("The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
788          WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart,
789          WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
790      maxToStart = Integer.MAX_VALUE;
791    }
792
793    long now =  System.currentTimeMillis();
794    final long startTime = now;
795    long slept = 0;
796    long lastLogTime = 0;
797    long lastCountChange = startTime;
798    int count = countOfRegionServers();
799    int oldCount = 0;
800    // This while test is a little hard to read. We try to comment it in below but in essence:
801    // Wait if Master is not stopped and the number of regionservers that have checked-in is
802    // less than the maxToStart. Both of these conditions will be true near universally.
803    // Next, we will keep cycling if ANY of the following three conditions are true:
804    // 1. The time since a regionserver registered is < interval (means servers are actively checking in).
805    // 2. We are under the total timeout.
806    // 3. The count of servers is < minimum.
807    for (ServerListener listener: this.listeners) {
808      listener.waiting();
809    }
810    while (!this.master.isStopped() && !isClusterShutdown() && count < maxToStart &&
811        ((lastCountChange + interval) > now || timeout > slept || count < minToStart)) {
812      // Log some info at every interval time or if there is a change
813      if (oldCount != count || lastLogTime + interval < now) {
814        lastLogTime = now;
815        String msg =
816            "Waiting on regionserver count=" + count + "; waited="+
817                slept + "ms, expecting min=" + minToStart + " server(s), max="+ getStrForMax(maxToStart) +
818                " server(s), " + "timeout=" + timeout + "ms, lastChange=" + (lastCountChange - now) + "ms";
819        LOG.info(msg);
820        status.setStatus(msg);
821      }
822
823      // We sleep for some time
824      final long sleepTime = 50;
825      Thread.sleep(sleepTime);
826      now =  System.currentTimeMillis();
827      slept = now - startTime;
828
829      oldCount = count;
830      count = countOfRegionServers();
831      if (count != oldCount) {
832        lastCountChange = now;
833      }
834    }
835    // Did we exit the loop because cluster is going down?
836    if (isClusterShutdown()) {
837      this.master.stop("Cluster shutdown");
838    }
839    LOG.info("Finished waiting on RegionServer count=" + count + "; waited=" + slept + "ms," +
840        " expected min=" + minToStart + " server(s), max=" +  getStrForMax(maxToStart) + " server(s),"+
841        " master is "+ (this.master.isStopped() ? "stopped.": "running"));
842  }
843
844  private String getStrForMax(final int max) {
845    return max == Integer.MAX_VALUE? "NO_LIMIT": Integer.toString(max);
846  }
847
848  /**
849   * @return A copy of the internal list of online servers.
850   */
851  public List<ServerName> getOnlineServersList() {
852    // TODO: optimize the load balancer call so we don't need to make a new list
853    // TODO: FIX. THIS IS POPULAR CALL.
854    return new ArrayList<>(this.onlineServers.keySet());
855  }
856
857  /**
858   * @param keys The target server name
859   * @param idleServerPredicator Evaluates the server on the given load
860   * @return A copy of the internal list of online servers matched by the predicator
861   */
862  public List<ServerName> getOnlineServersListWithPredicator(List<ServerName> keys,
863    Predicate<ServerMetrics> idleServerPredicator) {
864    List<ServerName> names = new ArrayList<>();
865    if (keys != null && idleServerPredicator != null) {
866      keys.forEach(name -> {
867        ServerMetrics load = onlineServers.get(name);
868        if (load != null) {
869          if (idleServerPredicator.test(load)) {
870            names.add(name);
871          }
872        }
873      });
874    }
875    return names;
876  }
877
878  /**
879   * @return A copy of the internal list of draining servers.
880   */
881  public List<ServerName> getDrainingServersList() {
882    return new ArrayList<>(this.drainingServers);
883  }
884
885  public boolean isServerOnline(ServerName serverName) {
886    return serverName != null && onlineServers.containsKey(serverName);
887  }
888
889  /**
890   * Check if a server is known to be dead.  A server can be online,
891   * or known to be dead, or unknown to this manager (i.e, not online,
892   * not known to be dead either. it is simply not tracked by the
893   * master any more, for example, a very old previous instance).
894   */
895  public synchronized boolean isServerDead(ServerName serverName) {
896    return serverName == null || deadservers.isDeadServer(serverName);
897  }
898
899  public void shutdownCluster() {
900    String statusStr = "Cluster shutdown requested of master=" + this.master.getServerName();
901    LOG.info(statusStr);
902    this.clusterShutdown.set(true);
903    if (onlineServers.isEmpty()) {
904      // we do not synchronize here so this may cause a double stop, but not a big deal
905      master.stop("OnlineServer=0 right after cluster shutdown set");
906    }
907  }
908
909  public boolean isClusterShutdown() {
910    return this.clusterShutdown.get();
911  }
912
913  /**
914   * Stop the ServerManager.
915   */
916  public void stop() {
917    // Nothing to do.
918  }
919
920  /**
921   * Creates a list of possible destinations for a region. It contains the online servers, but not
922   *  the draining or dying servers.
923   *  @param serversToExclude can be null if there is no server to exclude
924   */
925  public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude){
926    final List<ServerName> destServers = getOnlineServersList();
927
928    if (serversToExclude != null) {
929      destServers.removeAll(serversToExclude);
930    }
931
932    // Loop through the draining server list and remove them from the server list
933    final List<ServerName> drainingServersCopy = getDrainingServersList();
934    destServers.removeAll(drainingServersCopy);
935
936    return destServers;
937  }
938
939  /**
940   * Calls {@link #createDestinationServersList} without server to exclude.
941   */
942  public List<ServerName> createDestinationServersList(){
943    return createDestinationServersList(null);
944  }
945
946  /**
947   * To clear any dead server with same host name and port of any online server
948   */
949  void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
950    for (ServerName serverName : getOnlineServersList()) {
951      deadservers.cleanAllPreviousInstances(serverName);
952    }
953  }
954
955  /**
956   * Called by delete table and similar to notify the ServerManager that a region was removed.
957   */
958  public void removeRegion(final RegionInfo regionInfo) {
959    final byte[] encodedName = regionInfo.getEncodedNameAsBytes();
960    storeFlushedSequenceIdsByRegion.remove(encodedName);
961    flushedSequenceIdByRegion.remove(encodedName);
962  }
963
964  @VisibleForTesting
965  public boolean isRegionInServerManagerStates(final RegionInfo hri) {
966    final byte[] encodedName = hri.getEncodedNameAsBytes();
967    return (storeFlushedSequenceIdsByRegion.containsKey(encodedName)
968        || flushedSequenceIdByRegion.containsKey(encodedName));
969  }
970
971  /**
972   * Called by delete table and similar to notify the ServerManager that a region was removed.
973   */
974  public void removeRegions(final List<RegionInfo> regions) {
975    for (RegionInfo hri: regions) {
976      removeRegion(hri);
977    }
978  }
979
980  /**
981   * May return 0 when server is not online.
982   */
983  public int getVersionNumber(ServerName serverName) {
984    ServerMetrics serverMetrics = onlineServers.get(serverName);
985    return serverMetrics != null ? serverMetrics.getVersionNumber() : 0;
986  }
987
988  /**
989   * May return "0.0.0" when server is not online
990   */
991  public String getVersion(ServerName serverName) {
992    ServerMetrics serverMetrics = onlineServers.get(serverName);
993    return serverMetrics != null ? serverMetrics.getVersion() : "0.0.0";
994  }
995
996  public int getInfoPort(ServerName serverName) {
997    ServerMetrics serverMetrics = onlineServers.get(serverName);
998    return serverMetrics != null ? serverMetrics.getInfoServerPort() : 0;
999  }
1000}