001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
021
022import java.io.IOException;
023import java.net.InetAddress;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031import java.util.Set;
032import java.util.concurrent.ConcurrentNavigableMap;
033import java.util.concurrent.ConcurrentSkipListMap;
034import java.util.concurrent.CopyOnWriteArrayList;
035import java.util.concurrent.atomic.AtomicBoolean;
036import java.util.function.Predicate;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.ClockOutOfSyncException;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.NotServingRegionException;
041import org.apache.hadoop.hbase.RegionMetrics;
042import org.apache.hadoop.hbase.ServerMetrics;
043import org.apache.hadoop.hbase.ServerMetricsBuilder;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.YouAreDeadException;
046import org.apache.hadoop.hbase.client.ClusterConnection;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.ipc.HBaseRpcController;
049import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
050import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
051import org.apache.hadoop.hbase.monitoring.MonitoredTask;
052import org.apache.hadoop.hbase.procedure2.Procedure;
053import org.apache.hadoop.hbase.regionserver.HRegionServer;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
056import org.apache.hadoop.hbase.zookeeper.ZKUtil;
057import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
058import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
059import org.apache.yetus.audience.InterfaceAudience;
060import org.apache.zookeeper.KeeperException;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
065
066import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
070import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
071
072/**
073 * The ServerManager class manages info about region servers.
074 * <p>
075 * Maintains lists of online and dead servers. Processes the startups, shutdowns, and deaths of
076 * region servers.
077 * <p>
078 * Servers are distinguished in two different ways. A given server has a location, specified by
079 * hostname and port, and of which there can only be one online at any given time. A server instance
080 * is specified by the location (hostname and port) as well as the startcode (timestamp from when
081 * the server was started). This is used to differentiate a restarted instance of a given server
082 * from the original instance.
083 * <p>
084 * If a sever is known not to be running any more, it is called dead. The dead server needs to be
085 * handled by a ServerShutdownHandler. If the handler is not enabled yet, the server can't be
086 * handled right away so it is queued up. After the handler is enabled, the server will be submitted
087 * to a handler to handle. However, the handler may be just partially enabled. If so, the server
088 * cannot be fully processed, and be queued up for further processing. A server is fully processed
089 * only after the handler is fully enabled and has completed the handling.
090 */
091@InterfaceAudience.Private
092public class ServerManager {
093  public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
094    "hbase.master.wait.on.regionservers.maxtostart";
095
096  public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
097    "hbase.master.wait.on.regionservers.mintostart";
098
099  public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
100    "hbase.master.wait.on.regionservers.timeout";
101
102  public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
103    "hbase.master.wait.on.regionservers.interval";
104
105  private static final Logger LOG = LoggerFactory.getLogger(ServerManager.class);
106
107  // Set if we are to shutdown the cluster.
108  private AtomicBoolean clusterShutdown = new AtomicBoolean(false);
109
110  /**
111   * The last flushed sequence id for a region.
112   */
113  private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
114    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
115
116  /**
117   * The last flushed sequence id for a store in a region.
118   */
119  private final ConcurrentNavigableMap<byte[],
120    ConcurrentNavigableMap<byte[], Long>> storeFlushedSequenceIdsByRegion =
121      new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
122
123  /** Map of registered servers to their current load */
124  private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
125    new ConcurrentSkipListMap<>();
126
127  /**
128   * Map of admin interfaces per registered regionserver; these interfaces we use to control
129   * regionservers out on the cluster
130   */
131  private final Map<ServerName, AdminService.BlockingInterface> rsAdmins = new HashMap<>();
132
133  /** List of region servers that should not get any more new regions. */
134  private final ArrayList<ServerName> drainingServers = new ArrayList<>();
135
136  private final MasterServices master;
137  private final ClusterConnection connection;
138  private final RegionServerList storage;
139
140  private final DeadServer deadservers = new DeadServer();
141
142  private final long maxSkew;
143  private final long warningSkew;
144
145  private final RpcControllerFactory rpcControllerFactory;
146
147  /** Listeners that are called on server events. */
148  private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
149
150  /**
151   * Constructor.
152   */
153  public ServerManager(final MasterServices master, RegionServerList storage) {
154    this.master = master;
155    this.storage = storage;
156    Configuration c = master.getConfiguration();
157    maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
158    warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
159    this.connection = master.getClusterConnection();
160    this.rpcControllerFactory =
161      this.connection == null ? null : connection.getRpcControllerFactory();
162  }
163
164  /**
165   * Add the listener to the notification list.
166   * @param listener The ServerListener to register
167   */
168  public void registerListener(final ServerListener listener) {
169    this.listeners.add(listener);
170  }
171
172  /**
173   * Remove the listener from the notification list.
174   * @param listener The ServerListener to unregister
175   */
176  public boolean unregisterListener(final ServerListener listener) {
177    return this.listeners.remove(listener);
178  }
179
180  /**
181   * Let the server manager know a new regionserver has come online
182   * @param request       the startup request
183   * @param versionNumber the version number of the new regionserver
184   * @param version       the version of the new regionserver, could contain strings like "SNAPSHOT"
185   * @param ia            the InetAddress from which request is received
186   * @return The ServerName we know this server as.
187   */
188  ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber,
189    String version, InetAddress ia) throws IOException {
190    // Test for case where we get a region startup message from a regionserver
191    // that has been quickly restarted but whose znode expiration handler has
192    // not yet run, or from a server whose fail we are currently processing.
193    // Test its host+port combo is present in serverAddressToServerInfo. If it
194    // is, reject the server and trigger its expiration. The next time it comes
195    // in, it should have been removed from serverAddressToServerInfo and queued
196    // for processing by ProcessServerShutdown.
197
198    final String hostname =
199      request.hasUseThisHostnameInstead() ? request.getUseThisHostnameInstead() : ia.getHostName();
200    ServerName sn = ServerName.valueOf(hostname, request.getPort(), request.getServerStartCode());
201    checkClockSkew(sn, request.getServerCurrentTime());
202    checkIsDead(sn, "STARTUP");
203    if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber, version))) {
204      LOG.warn(
205        "THIS SHOULD NOT HAPPEN, RegionServerStartup" + " could not record the server: " + sn);
206    }
207    storage.started(sn);
208    return sn;
209  }
210
211  /**
212   * Updates last flushed sequence Ids for the regions on server sn
213   */
214  private void updateLastFlushedSequenceIds(ServerName sn, ServerMetrics hsl) {
215    for (Entry<byte[], RegionMetrics> entry : hsl.getRegionMetrics().entrySet()) {
216      byte[] encodedRegionName = Bytes.toBytes(RegionInfo.encodeRegionName(entry.getKey()));
217      Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
218      long l = entry.getValue().getCompletedSequenceId();
219      // Don't let smaller sequence ids override greater sequence ids.
220      if (LOG.isTraceEnabled()) {
221        LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue
222          + ", completeSequenceId=" + l);
223      }
224      if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
225        flushedSequenceIdByRegion.put(encodedRegionName, l);
226      } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
227        LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id (" + l
228          + ") that is less than the previous last flushed sequence id (" + existingValue
229          + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
230      }
231      ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
232        computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
233          () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
234      for (Entry<byte[], Long> storeSeqId : entry.getValue().getStoreSequenceId().entrySet()) {
235        byte[] family = storeSeqId.getKey();
236        existingValue = storeFlushedSequenceId.get(family);
237        l = storeSeqId.getValue();
238        if (LOG.isTraceEnabled()) {
239          LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family)
240            + ", existingValue=" + existingValue + ", completeSequenceId=" + l);
241        }
242        // Don't let smaller sequence ids override greater sequence ids.
243        if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
244          storeFlushedSequenceId.put(family, l);
245        }
246      }
247    }
248  }
249
250  public void regionServerReport(ServerName sn, ServerMetrics sl) throws YouAreDeadException {
251    checkIsDead(sn, "REPORT");
252    if (null == this.onlineServers.replace(sn, sl)) {
253      // Already have this host+port combo and its just different start code?
254      // Just let the server in. Presume master joining a running cluster.
255      // recordNewServer is what happens at the end of reportServerStartup.
256      // The only thing we are skipping is passing back to the regionserver
257      // the ServerName to use. Here we presume a master has already done
258      // that so we'll press on with whatever it gave us for ServerName.
259      if (!checkAndRecordNewServer(sn, sl)) {
260        LOG.info("RegionServerReport ignored, could not record the server: " + sn);
261        return; // Not recorded, so no need to move on
262      }
263    }
264    updateLastFlushedSequenceIds(sn, sl);
265  }
266
267  /**
268   * Check is a server of same host and port already exists, if not, or the existed one got a
269   * smaller start code, record it.
270   * @param serverName the server to check and record
271   * @param sl         the server load on the server
272   * @return true if the server is recorded, otherwise, false
273   */
274  boolean checkAndRecordNewServer(final ServerName serverName, final ServerMetrics sl) {
275    ServerName existingServer = null;
276    synchronized (this.onlineServers) {
277      existingServer = findServerWithSameHostnamePortWithLock(serverName);
278      if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
279        LOG.info("Server serverName=" + serverName + " rejected; we already have "
280          + existingServer.toString() + " registered with same hostname and port");
281        return false;
282      }
283      recordNewServerWithLock(serverName, sl);
284    }
285
286    // Tell our listeners that a server was added
287    if (!this.listeners.isEmpty()) {
288      for (ServerListener listener : this.listeners) {
289        listener.serverAdded(serverName);
290      }
291    }
292
293    // Note that we assume that same ts means same server, and don't expire in that case.
294    // TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky.
295    if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
296      LOG.info("Triggering server recovery; existingServer " + existingServer
297        + " looks stale, new server:" + serverName);
298      expireServer(existingServer);
299    }
300    return true;
301  }
302
303  /**
304   * Find out the region servers crashed between the crash of the previous master instance and the
305   * current master instance and schedule SCP for them.
306   * <p/>
307   * Since the {@code RegionServerTracker} has already helped us to construct the online servers set
308   * by scanning zookeeper, now we can compare the online servers with {@code liveServersFromWALDir}
309   * to find out whether there are servers which are already dead.
310   * <p/>
311   * Must be called inside the initialization method of {@code RegionServerTracker} to avoid
312   * concurrency issue.
313   * @param deadServersFromPE     the region servers which already have a SCP associated.
314   * @param liveServersFromWALDir the live region servers from wal directory.
315   */
316  void findDeadServersAndProcess(Set<ServerName> deadServersFromPE,
317    Set<ServerName> liveServersFromWALDir) {
318    deadServersFromPE.forEach(deadservers::putIfAbsent);
319    liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
320      .forEach(this::expireServer);
321  }
322
323  /**
324   * Checks if the clock skew between the server and the master. If the clock skew exceeds the
325   * configured max, it will throw an exception; if it exceeds the configured warning threshold, it
326   * will log a warning but start normally.
327   * @param serverName Incoming servers's name n * @throws ClockOutOfSyncException if the skew
328   *                   exceeds the configured max value
329   */
330  private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
331    throws ClockOutOfSyncException {
332    long skew = Math.abs(EnvironmentEdgeManager.currentTime() - serverCurrentTime);
333    if (skew > maxSkew) {
334      String message = "Server " + serverName + " has been "
335        + "rejected; Reported time is too far out of sync with master.  " + "Time difference of "
336        + skew + "ms > max allowed of " + maxSkew + "ms";
337      LOG.warn(message);
338      throw new ClockOutOfSyncException(message);
339    } else if (skew > warningSkew) {
340      String message = "Reported time for server " + serverName + " is out of sync with master "
341        + "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " + "error threshold is "
342        + maxSkew + "ms)";
343      LOG.warn(message);
344    }
345  }
346
347  /**
348   * Called when RegionServer first reports in for duty and thereafter each time it heartbeats to
349   * make sure it is has not been figured for dead. If this server is on the dead list, reject it
350   * with a YouAreDeadException. If it was dead but came back with a new start code, remove the old
351   * entry from the dead list.
352   * @param what START or REPORT
353   */
354  private void checkIsDead(final ServerName serverName, final String what)
355    throws YouAreDeadException {
356    if (this.deadservers.isDeadServer(serverName)) {
357      // Exact match: host name, port and start code all match with existing one of the
358      // dead servers. So, this server must be dead. Tell it to kill itself.
359      String message =
360        "Server " + what + " rejected; currently processing " + serverName + " as dead server";
361      LOG.debug(message);
362      throw new YouAreDeadException(message);
363    }
364    // Remove dead server with same hostname and port of newly checking in rs after master
365    // initialization. See HBASE-5916 for more information.
366    if (
367      (this.master == null || this.master.isInitialized())
368        && this.deadservers.cleanPreviousInstance(serverName)
369    ) {
370      // This server has now become alive after we marked it as dead.
371      // We removed it's previous entry from the dead list to reflect it.
372      LOG.debug("{} {} came back up, removed it from the dead servers list", what, serverName);
373    }
374  }
375
376  /**
377   * Assumes onlineServers is locked.
378   * @return ServerName with matching hostname and port.
379   */
380  private ServerName findServerWithSameHostnamePortWithLock(final ServerName serverName) {
381    ServerName end =
382      ServerName.valueOf(serverName.getHostname(), serverName.getPort(), Long.MAX_VALUE);
383
384    ServerName r = onlineServers.lowerKey(end);
385    if (r != null) {
386      if (ServerName.isSameAddress(r, serverName)) {
387        return r;
388      }
389    }
390    return null;
391  }
392
393  /**
394   * Adds the onlineServers list. onlineServers should be locked.
395   * @param serverName The remote servers name.
396   */
397  void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) {
398    LOG.info("Registering regionserver=" + serverName);
399    this.onlineServers.put(serverName, sl);
400    this.rsAdmins.remove(serverName);
401  }
402
403  public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
404    RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
405    Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
406    builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM);
407    Map<byte[], Long> storeFlushedSequenceId =
408      storeFlushedSequenceIdsByRegion.get(encodedRegionName);
409    if (storeFlushedSequenceId != null) {
410      for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) {
411        builder.addStoreSequenceId(StoreSequenceId.newBuilder()
412          .setFamilyName(UnsafeByteOperations.unsafeWrap(entry.getKey()))
413          .setSequenceId(entry.getValue().longValue()).build());
414      }
415    }
416    return builder.build();
417  }
418
419  /**
420   * n * @return ServerMetrics if serverName is known else null
421   */
422  public ServerMetrics getLoad(final ServerName serverName) {
423    return this.onlineServers.get(serverName);
424  }
425
426  /**
427   * Compute the average load across all region servers. Currently, this uses a very naive
428   * computation - just uses the number of regions being served, ignoring stats about number of
429   * requests.
430   * @return the average load
431   */
432  public double getAverageLoad() {
433    int totalLoad = 0;
434    int numServers = 0;
435    for (ServerMetrics sl : this.onlineServers.values()) {
436      numServers++;
437      totalLoad += sl.getRegionMetrics().size();
438    }
439    return numServers == 0 ? 0 : (double) totalLoad / (double) numServers;
440  }
441
442  /** Returns the count of active regionservers */
443  public int countOfRegionServers() {
444    // Presumes onlineServers is a concurrent map
445    return this.onlineServers.size();
446  }
447
448  /** Returns Read-only map of servers to serverinfo */
449  public Map<ServerName, ServerMetrics> getOnlineServers() {
450    // Presumption is that iterating the returned Map is OK.
451    synchronized (this.onlineServers) {
452      return Collections.unmodifiableMap(this.onlineServers);
453    }
454  }
455
456  public DeadServer getDeadServers() {
457    return this.deadservers;
458  }
459
460  /**
461   * Checks if any dead servers are currently in progress.
462   * @return true if any RS are being processed as dead, false if not
463   */
464  public boolean areDeadServersInProgress() {
465    return this.deadservers.areDeadServersInProgress();
466  }
467
468  void letRegionServersShutdown() {
469    long previousLogTime = 0;
470    ServerName sn = master.getServerName();
471    ZKWatcher zkw = master.getZooKeeper();
472    int onlineServersCt;
473    while ((onlineServersCt = onlineServers.size()) > 0) {
474      if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
475        Set<ServerName> remainingServers = onlineServers.keySet();
476        synchronized (onlineServers) {
477          if (remainingServers.size() == 1 && remainingServers.contains(sn)) {
478            // Master will delete itself later.
479            return;
480          }
481        }
482        StringBuilder sb = new StringBuilder();
483        // It's ok here to not sync on onlineServers - merely logging
484        for (ServerName key : remainingServers) {
485          if (sb.length() > 0) {
486            sb.append(", ");
487          }
488          sb.append(key);
489        }
490        LOG.info("Waiting on regionserver(s) " + sb.toString());
491        previousLogTime = EnvironmentEdgeManager.currentTime();
492      }
493
494      try {
495        List<String> servers = getRegionServersInZK(zkw);
496        if (
497          servers == null || servers.isEmpty()
498            || (servers.size() == 1 && servers.contains(sn.toString()))
499        ) {
500          LOG.info("ZK shows there is only the master self online, exiting now");
501          // Master could have lost some ZK events, no need to wait more.
502          break;
503        }
504      } catch (KeeperException ke) {
505        LOG.warn("Failed to list regionservers", ke);
506        // ZK is malfunctioning, don't hang here
507        break;
508      }
509      synchronized (onlineServers) {
510        try {
511          if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
512        } catch (InterruptedException ignored) {
513          // continue
514        }
515      }
516    }
517  }
518
519  private List<String> getRegionServersInZK(final ZKWatcher zkw) throws KeeperException {
520    return ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
521  }
522
523  /**
524   * Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
525   * @return pid if we queued a ServerCrashProcedure else {@link Procedure#NO_PROC_ID} if we did not
526   *         (could happen for many reasons including the fact that its this server that is going
527   *         down or we already have queued an SCP for this server or SCP processing is currently
528   *         disabled because we are in startup phase).
529   */
530  // Redo test so we can make this protected.
531  public synchronized long expireServer(final ServerName serverName) {
532    return expireServer(serverName, false);
533
534  }
535
536  synchronized long expireServer(final ServerName serverName, boolean force) {
537    // THIS server is going down... can't handle our own expiration.
538    if (serverName.equals(master.getServerName())) {
539      if (!(master.isAborted() || master.isStopped())) {
540        master.stop("We lost our znode?");
541      }
542      return Procedure.NO_PROC_ID;
543    }
544    if (this.deadservers.isDeadServer(serverName)) {
545      LOG.warn("Expiration called on {} but already in DeadServer", serverName);
546      return Procedure.NO_PROC_ID;
547    }
548    moveFromOnlineToDeadServers(serverName);
549
550    // If server is in draining mode, remove corresponding znode
551    // In some tests, the mocked HM may not have ZK Instance, hence null check
552    if (master.getZooKeeper() != null) {
553      String drainingZnode = ZNodePaths
554        .joinZNode(master.getZooKeeper().getZNodePaths().drainingZNode, serverName.getServerName());
555      try {
556        ZKUtil.deleteNodeFailSilent(master.getZooKeeper(), drainingZnode);
557      } catch (KeeperException e) {
558        LOG.warn(
559          "Error deleting the draining znode for stopping server " + serverName.getServerName(), e);
560      }
561    }
562
563    // If cluster is going down, yes, servers are going to be expiring; don't
564    // process as a dead server
565    if (isClusterShutdown()) {
566      LOG.info("Cluster shutdown set; " + serverName + " expired; onlineServers="
567        + this.onlineServers.size());
568      if (this.onlineServers.isEmpty()) {
569        master.stop("Cluster shutdown set; onlineServer=0");
570      }
571      return Procedure.NO_PROC_ID;
572    }
573    LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
574    long pid = master.getAssignmentManager().submitServerCrash(serverName, true, force);
575    storage.expired(serverName);
576    // Tell our listeners that a server was removed
577    if (!this.listeners.isEmpty()) {
578      this.listeners.stream().forEach(l -> l.serverRemoved(serverName));
579    }
580    return pid;
581  }
582
583  /**
584   * Called when server has expired.
585   */
586  // Locking in this class needs cleanup.
587  public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
588    synchronized (this.onlineServers) {
589      boolean online = this.onlineServers.containsKey(sn);
590      if (online) {
591        // Remove the server from the known servers lists and update load info BUT
592        // add to deadservers first; do this so it'll show in dead servers list if
593        // not in online servers list.
594        this.deadservers.putIfAbsent(sn);
595        this.onlineServers.remove(sn);
596        onlineServers.notifyAll();
597      } else {
598        // If not online, that is odd but may happen if 'Unknown Servers' -- where meta
599        // has references to servers not online nor in dead servers list. If
600        // 'Unknown Server', don't add to DeadServers else will be there for ever.
601        LOG.trace("Expiration of {} but server not online", sn);
602      }
603    }
604    this.rsAdmins.remove(sn);
605  }
606
607  /*
608   * Remove the server from the drain list.
609   */
610  public synchronized boolean removeServerFromDrainList(final ServerName sn) {
611    // Warn if the server (sn) is not online. ServerName is of the form:
612    // <hostname> , <port> , <startcode>
613
614    if (!this.isServerOnline(sn)) {
615      LOG.warn("Server " + sn + " is not currently online. "
616        + "Removing from draining list anyway, as requested.");
617    }
618    // Remove the server from the draining servers lists.
619    return this.drainingServers.remove(sn);
620  }
621
622  /**
623   * Add the server to the drain list. n * @return True if the server is added or the server is
624   * already on the drain list.
625   */
626  public synchronized boolean addServerToDrainList(final ServerName sn) {
627    // Warn if the server (sn) is not online. ServerName is of the form:
628    // <hostname> , <port> , <startcode>
629
630    if (!this.isServerOnline(sn)) {
631      LOG.warn("Server " + sn + " is not currently online. "
632        + "Ignoring request to add it to draining list.");
633      return false;
634    }
635    // Add the server to the draining servers lists, if it's not already in
636    // it.
637    if (this.drainingServers.contains(sn)) {
638      LOG.warn("Server " + sn + " is already in the draining server list."
639        + "Ignoring request to add it again.");
640      return true;
641    }
642    LOG.info("Server " + sn + " added to draining server list.");
643    return this.drainingServers.add(sn);
644  }
645
646  // RPC methods to region servers
647
648  private HBaseRpcController newRpcController() {
649    return rpcControllerFactory == null ? null : rpcControllerFactory.newController();
650  }
651
652  /**
653   * Sends a WARMUP RPC to the specified server to warmup the specified region.
654   * <p>
655   * A region server could reject the close request because it either does not have the specified
656   * region or the region is being split.
657   * @param server server to warmup a region
658   * @param region region to warmup
659   */
660  public void sendRegionWarmup(ServerName server, RegionInfo region) {
661    if (server == null) return;
662    try {
663      AdminService.BlockingInterface admin = getRsAdmin(server);
664      HBaseRpcController controller = newRpcController();
665      ProtobufUtil.warmupRegion(controller, admin, region);
666    } catch (IOException e) {
667      LOG.error("Received exception in RPC for warmup server:" + server + "region: " + region
668        + "exception: " + e);
669    }
670  }
671
672  /**
673   * Contacts a region server and waits up to timeout ms to close the region. This bypasses the
674   * active hmaster. Pass -1 as timeout if you do not want to wait on result.
675   */
676  public static void closeRegionSilentlyAndWait(ClusterConnection connection, ServerName server,
677    RegionInfo region, long timeout) throws IOException, InterruptedException {
678    AdminService.BlockingInterface rs = connection.getAdmin(server);
679    HBaseRpcController controller = connection.getRpcControllerFactory().newController();
680    try {
681      ProtobufUtil.closeRegion(controller, rs, server, region.getRegionName());
682    } catch (IOException e) {
683      LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e);
684    }
685    if (timeout < 0) {
686      return;
687    }
688    long expiration = timeout + EnvironmentEdgeManager.currentTime();
689    while (EnvironmentEdgeManager.currentTime() < expiration) {
690      controller.reset();
691      try {
692        RegionInfo rsRegion = ProtobufUtil.getRegionInfo(controller, rs, region.getRegionName());
693        if (rsRegion == null) return;
694      } catch (IOException ioe) {
695        if (
696          ioe instanceof NotServingRegionException
697            || (ioe instanceof RemoteWithExtrasException && ((RemoteWithExtrasException) ioe)
698              .unwrapRemoteException() instanceof NotServingRegionException)
699        ) {
700          // no need to retry again
701          return;
702        }
703        LOG.warn("Exception when retrieving regioninfo from: " + region.getRegionNameAsString(),
704          ioe);
705      }
706      Thread.sleep(1000);
707    }
708    throw new IOException("Region " + region + " failed to close within" + " timeout " + timeout);
709  }
710
711  /**
712   * n * @return Admin interface for the remote regionserver named <code>sn</code> n * @throws
713   * RetriesExhaustedException wrapping a ConnectException if failed
714   */
715  public AdminService.BlockingInterface getRsAdmin(final ServerName sn) throws IOException {
716    AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
717    if (admin == null) {
718      LOG.debug("New admin connection to " + sn.toString());
719      if (sn.equals(master.getServerName()) && master instanceof HRegionServer) {
720        // A master is also a region server now, see HBASE-10569 for details
721        admin = ((HRegionServer) master).getRSRpcServices();
722      } else {
723        admin = this.connection.getAdmin(sn);
724      }
725      this.rsAdmins.put(sn, admin);
726    }
727    return admin;
728  }
729
730  /**
731   * Calculate min necessary to start. This is not an absolute. It is just a friction that will
732   * cause us hang around a bit longer waiting on RegionServers to check-in.
733   */
734  private int getMinToStart() {
735    if (master.isInMaintenanceMode()) {
736      // If in maintenance mode, then master hosting meta will be the only server available
737      return 1;
738    }
739
740    int minimumRequired = 1;
741    if (
742      LoadBalancer.isTablesOnMaster(master.getConfiguration())
743        && LoadBalancer.isSystemTablesOnlyOnMaster(master.getConfiguration())
744    ) {
745      // If Master is carrying regions it will show up as a 'server', but is not handling user-
746      // space regions, so we need a second server.
747      minimumRequired = 2;
748    }
749
750    int minToStart = this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
751    // Ensure we are never less than minimumRequired else stuff won't work.
752    return Math.max(minToStart, minimumRequired);
753  }
754
755  /**
756   * Wait for the region servers to report in. We will wait until one of this condition is met: -
757   * the master is stopped - the 'hbase.master.wait.on.regionservers.maxtostart' number of region
758   * servers is reached - the 'hbase.master.wait.on.regionservers.mintostart' is reached AND there
759   * have been no new region server in for 'hbase.master.wait.on.regionservers.interval' time AND
760   * the 'hbase.master.wait.on.regionservers.timeout' is reached n
761   */
762  public void waitForRegionServers(MonitoredTask status) throws InterruptedException {
763    final long interval =
764      this.master.getConfiguration().getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
765    final long timeout =
766      this.master.getConfiguration().getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
767    // Min is not an absolute; just a friction making us wait longer on server checkin.
768    int minToStart = getMinToStart();
769    int maxToStart =
770      this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
771    if (maxToStart < minToStart) {
772      LOG.warn(String.format("The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
773        WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart, WAIT_ON_REGIONSERVERS_MINTOSTART,
774        minToStart));
775      maxToStart = Integer.MAX_VALUE;
776    }
777
778    long now = EnvironmentEdgeManager.currentTime();
779    final long startTime = now;
780    long slept = 0;
781    long lastLogTime = 0;
782    long lastCountChange = startTime;
783    int count = countOfRegionServers();
784    int oldCount = 0;
785    // This while test is a little hard to read. We try to comment it in below but in essence:
786    // Wait if Master is not stopped and the number of regionservers that have checked-in is
787    // less than the maxToStart. Both of these conditions will be true near universally.
788    // Next, we will keep cycling if ANY of the following three conditions are true:
789    // 1. The time since a regionserver registered is < interval (means servers are actively
790    // checking in).
791    // 2. We are under the total timeout.
792    // 3. The count of servers is < minimum.
793    for (ServerListener listener : this.listeners) {
794      listener.waiting();
795    }
796    while (
797      !this.master.isStopped() && !isClusterShutdown() && count < maxToStart
798        && ((lastCountChange + interval) > now || timeout > slept || count < minToStart)
799    ) {
800      // Log some info at every interval time or if there is a change
801      if (oldCount != count || lastLogTime + interval < now) {
802        lastLogTime = now;
803        String msg =
804          "Waiting on regionserver count=" + count + "; waited=" + slept + "ms, expecting min="
805            + minToStart + " server(s), max=" + getStrForMax(maxToStart) + " server(s), "
806            + "timeout=" + timeout + "ms, lastChange=" + (now - lastCountChange) + "ms";
807        LOG.info(msg);
808        status.setStatus(msg);
809      }
810
811      // We sleep for some time
812      final long sleepTime = 50;
813      Thread.sleep(sleepTime);
814      now = EnvironmentEdgeManager.currentTime();
815      slept = now - startTime;
816
817      oldCount = count;
818      count = countOfRegionServers();
819      if (count != oldCount) {
820        lastCountChange = now;
821      }
822    }
823    // Did we exit the loop because cluster is going down?
824    if (isClusterShutdown()) {
825      this.master.stop("Cluster shutdown");
826    }
827    LOG.info("Finished waiting on RegionServer count=" + count + "; waited=" + slept + "ms,"
828      + " expected min=" + minToStart + " server(s), max=" + getStrForMax(maxToStart)
829      + " server(s)," + " master is " + (this.master.isStopped() ? "stopped." : "running"));
830  }
831
832  private String getStrForMax(final int max) {
833    return max == Integer.MAX_VALUE ? "NO_LIMIT" : Integer.toString(max);
834  }
835
836  /** Returns A copy of the internal list of online servers. */
837  public List<ServerName> getOnlineServersList() {
838    // TODO: optimize the load balancer call so we don't need to make a new list
839    // TODO: FIX. THIS IS POPULAR CALL.
840    return new ArrayList<>(this.onlineServers.keySet());
841  }
842
843  /**
844   * @param keys                 The target server name
845   * @param idleServerPredicator Evaluates the server on the given load
846   * @return A copy of the internal list of online servers matched by the predicator
847   */
848  public List<ServerName> getOnlineServersListWithPredicator(List<ServerName> keys,
849    Predicate<ServerMetrics> idleServerPredicator) {
850    List<ServerName> names = new ArrayList<>();
851    if (keys != null && idleServerPredicator != null) {
852      keys.forEach(name -> {
853        ServerMetrics load = onlineServers.get(name);
854        if (load != null) {
855          if (idleServerPredicator.test(load)) {
856            names.add(name);
857          }
858        }
859      });
860    }
861    return names;
862  }
863
864  /** Returns A copy of the internal list of draining servers. */
865  public List<ServerName> getDrainingServersList() {
866    return new ArrayList<>(this.drainingServers);
867  }
868
869  public boolean isServerOnline(ServerName serverName) {
870    return serverName != null && onlineServers.containsKey(serverName);
871  }
872
873  public enum ServerLiveState {
874    LIVE,
875    DEAD,
876    UNKNOWN
877  }
878
879  /** Returns whether the server is online, dead, or unknown. */
880  public synchronized ServerLiveState isServerKnownAndOnline(ServerName serverName) {
881    return onlineServers.containsKey(serverName)
882      ? ServerLiveState.LIVE
883      : (deadservers.isDeadServer(serverName) ? ServerLiveState.DEAD : ServerLiveState.UNKNOWN);
884  }
885
886  /**
887   * Check if a server is known to be dead. A server can be online, or known to be dead, or unknown
888   * to this manager (i.e, not online, not known to be dead either; it is simply not tracked by the
889   * master any more, for example, a very old previous instance).
890   */
891  public synchronized boolean isServerDead(ServerName serverName) {
892    return serverName == null || deadservers.isDeadServer(serverName);
893  }
894
895  /**
896   * Check if a server is unknown. A server can be online, or known to be dead, or unknown to this
897   * manager (i.e, not online, not known to be dead either; it is simply not tracked by the master
898   * any more, for example, a very old previous instance).
899   */
900  public boolean isServerUnknown(ServerName serverName) {
901    return serverName == null
902      || (!onlineServers.containsKey(serverName) && !deadservers.isDeadServer(serverName));
903  }
904
905  public void shutdownCluster() {
906    String statusStr = "Cluster shutdown requested of master=" + this.master.getServerName();
907    LOG.info(statusStr);
908    this.clusterShutdown.set(true);
909    if (onlineServers.isEmpty()) {
910      // we do not synchronize here so this may cause a double stop, but not a big deal
911      master.stop("OnlineServer=0 right after cluster shutdown set");
912    }
913  }
914
915  public boolean isClusterShutdown() {
916    return this.clusterShutdown.get();
917  }
918
919  /**
920   * Stop the ServerManager.
921   */
922  public void stop() {
923    // Nothing to do.
924  }
925
926  /**
927   * Creates a list of possible destinations for a region. It contains the online servers, but not
928   * the draining or dying servers.
929   * @param serversToExclude can be null if there is no server to exclude
930   */
931  public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude) {
932    Set<ServerName> destServers = new HashSet<>();
933    onlineServers.forEach((sn, sm) -> {
934      if (sm.getLastReportTimestamp() > 0) {
935        // This means we have already called regionServerReport at leaset once, then let's include
936        // this server for region assignment. This is an optimization to avoid assigning regions to
937        // an uninitialized server. See HBASE-25032 for more details.
938        destServers.add(sn);
939      }
940    });
941
942    if (serversToExclude != null) {
943      destServers.removeAll(serversToExclude);
944    }
945
946    // Loop through the draining server list and remove them from the server list
947    final List<ServerName> drainingServersCopy = getDrainingServersList();
948    destServers.removeAll(drainingServersCopy);
949
950    return new ArrayList<>(destServers);
951  }
952
953  /**
954   * Calls {@link #createDestinationServersList} without server to exclude.
955   */
956  public List<ServerName> createDestinationServersList() {
957    return createDestinationServersList(null);
958  }
959
960  /**
961   * To clear any dead server with same host name and port of any online server
962   */
963  void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
964    for (ServerName serverName : getOnlineServersList()) {
965      deadservers.cleanAllPreviousInstances(serverName);
966    }
967  }
968
969  /**
970   * Called by delete table and similar to notify the ServerManager that a region was removed.
971   */
972  public void removeRegion(final RegionInfo regionInfo) {
973    final byte[] encodedName = regionInfo.getEncodedNameAsBytes();
974    storeFlushedSequenceIdsByRegion.remove(encodedName);
975    flushedSequenceIdByRegion.remove(encodedName);
976  }
977
978  public boolean isRegionInServerManagerStates(final RegionInfo hri) {
979    final byte[] encodedName = hri.getEncodedNameAsBytes();
980    return (storeFlushedSequenceIdsByRegion.containsKey(encodedName)
981      || flushedSequenceIdByRegion.containsKey(encodedName));
982  }
983
984  /**
985   * Called by delete table and similar to notify the ServerManager that a region was removed.
986   */
987  public void removeRegions(final List<RegionInfo> regions) {
988    for (RegionInfo hri : regions) {
989      removeRegion(hri);
990    }
991  }
992
993  /**
994   * May return 0 when server is not online.
995   */
996  public int getVersionNumber(ServerName serverName) {
997    ServerMetrics serverMetrics = onlineServers.get(serverName);
998    return serverMetrics != null ? serverMetrics.getVersionNumber() : 0;
999  }
1000
1001  /**
1002   * May return "0.0.0" when server is not online
1003   */
1004  public String getVersion(ServerName serverName) {
1005    ServerMetrics serverMetrics = onlineServers.get(serverName);
1006    return serverMetrics != null ? serverMetrics.getVersion() : "0.0.0";
1007  }
1008
1009  public int getInfoPort(ServerName serverName) {
1010    ServerMetrics serverMetrics = onlineServers.get(serverName);
1011    return serverMetrics != null ? serverMetrics.getInfoServerPort() : 0;
1012  }
1013}