001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master;
020
021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
022
023import java.io.IOException;
024import java.net.InetAddress;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031import java.util.Set;
032import java.util.concurrent.ConcurrentNavigableMap;
033import java.util.concurrent.ConcurrentSkipListMap;
034import java.util.concurrent.CopyOnWriteArrayList;
035import java.util.concurrent.atomic.AtomicBoolean;
036import java.util.function.Predicate;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.ClockOutOfSyncException;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.NotServingRegionException;
041import org.apache.hadoop.hbase.RegionMetrics;
042import org.apache.hadoop.hbase.ServerMetrics;
043import org.apache.hadoop.hbase.ServerMetricsBuilder;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.YouAreDeadException;
046import org.apache.hadoop.hbase.client.ClusterConnection;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.RetriesExhaustedException;
049import org.apache.hadoop.hbase.ipc.HBaseRpcController;
050import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
051import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
052import org.apache.hadoop.hbase.monitoring.MonitoredTask;
053import org.apache.hadoop.hbase.procedure2.Procedure;
054import org.apache.hadoop.hbase.regionserver.HRegionServer;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
057import org.apache.hadoop.hbase.zookeeper.ZKUtil;
058import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
059import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
060import org.apache.yetus.audience.InterfaceAudience;
061import org.apache.zookeeper.KeeperException;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
066import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
067
068import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
070import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
073
074/**
075 * The ServerManager class manages info about region servers.
076 * <p>
077 * Maintains lists of online and dead servers.  Processes the startups,
078 * shutdowns, and deaths of region servers.
079 * <p>
080 * Servers are distinguished in two different ways.  A given server has a
081 * location, specified by hostname and port, and of which there can only be one
082 * online at any given time.  A server instance is specified by the location
083 * (hostname and port) as well as the startcode (timestamp from when the server
084 * was started).  This is used to differentiate a restarted instance of a given
085 * server from the original instance.
086 * <p>
087 * If a sever is known not to be running any more, it is called dead. The dead
088 * server needs to be handled by a ServerShutdownHandler.  If the handler is not
089 * enabled yet, the server can't be handled right away so it is queued up.
090 * After the handler is enabled, the server will be submitted to a handler to handle.
091 * However, the handler may be just partially enabled.  If so,
092 * the server cannot be fully processed, and be queued up for further processing.
093 * A server is fully processed only after the handler is fully enabled
094 * and has completed the handling.
095 */
096@InterfaceAudience.Private
097public class ServerManager {
098  public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
099      "hbase.master.wait.on.regionservers.maxtostart";
100
101  public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
102      "hbase.master.wait.on.regionservers.mintostart";
103
104  public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
105      "hbase.master.wait.on.regionservers.timeout";
106
107  public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
108      "hbase.master.wait.on.regionservers.interval";
109
110  private static final Logger LOG = LoggerFactory.getLogger(ServerManager.class);
111
112  // Set if we are to shutdown the cluster.
113  private AtomicBoolean clusterShutdown = new AtomicBoolean(false);
114
115  /**
116   * The last flushed sequence id for a region.
117   */
118  private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
119    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
120
121  /**
122   * The last flushed sequence id for a store in a region.
123   */
124  private final ConcurrentNavigableMap<byte[], ConcurrentNavigableMap<byte[], Long>>
125    storeFlushedSequenceIdsByRegion = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
126
127  /** Map of registered servers to their current load */
128  private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
129    new ConcurrentSkipListMap<>();
130
131  /**
132   * Map of admin interfaces per registered regionserver; these interfaces we use to control
133   * regionservers out on the cluster
134   */
135  private final Map<ServerName, AdminService.BlockingInterface> rsAdmins = new HashMap<>();
136
137  /** List of region servers that should not get any more new regions. */
138  private final ArrayList<ServerName> drainingServers = new ArrayList<>();
139
140  private final MasterServices master;
141  private final ClusterConnection connection;
142
143  private final DeadServer deadservers = new DeadServer();
144
145  private final long maxSkew;
146  private final long warningSkew;
147
148  private final RpcControllerFactory rpcControllerFactory;
149
150  /** Listeners that are called on server events. */
151  private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
152
153  /**
154   * Constructor.
155   */
156  public ServerManager(final MasterServices master) {
157    this.master = master;
158    Configuration c = master.getConfiguration();
159    maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
160    warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
161    this.connection = master.getClusterConnection();
162    this.rpcControllerFactory = this.connection == null? null: connection.getRpcControllerFactory();
163  }
164
165  /**
166   * Add the listener to the notification list.
167   * @param listener The ServerListener to register
168   */
169  public void registerListener(final ServerListener listener) {
170    this.listeners.add(listener);
171  }
172
173  /**
174   * Remove the listener from the notification list.
175   * @param listener The ServerListener to unregister
176   */
177  public boolean unregisterListener(final ServerListener listener) {
178    return this.listeners.remove(listener);
179  }
180
181  /**
182   * Let the server manager know a new regionserver has come online
183   * @param request the startup request
184   * @param versionNumber the version number of the new regionserver
185   * @param version the version of the new regionserver, could contain strings like "SNAPSHOT"
186   * @param ia the InetAddress from which request is received
187   * @return The ServerName we know this server as.
188   * @throws IOException
189   */
190  ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber,
191      String version, InetAddress ia) throws IOException {
192    // Test for case where we get a region startup message from a regionserver
193    // that has been quickly restarted but whose znode expiration handler has
194    // not yet run, or from a server whose fail we are currently processing.
195    // Test its host+port combo is present in serverAddressToServerInfo. If it
196    // is, reject the server and trigger its expiration. The next time it comes
197    // in, it should have been removed from serverAddressToServerInfo and queued
198    // for processing by ProcessServerShutdown.
199
200    final String hostname =
201      request.hasUseThisHostnameInstead() ? request.getUseThisHostnameInstead() : ia.getHostName();
202    ServerName sn = ServerName.valueOf(hostname, request.getPort(), request.getServerStartCode());
203    checkClockSkew(sn, request.getServerCurrentTime());
204    checkIsDead(sn, "STARTUP");
205    if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber, version))) {
206      LOG.warn(
207        "THIS SHOULD NOT HAPPEN, RegionServerStartup" + " could not record the server: " + sn);
208    }
209    return sn;
210  }
211
212  /**
213   * Updates last flushed sequence Ids for the regions on server sn
214   * @param sn
215   * @param hsl
216   */
217  private void updateLastFlushedSequenceIds(ServerName sn, ServerMetrics hsl) {
218    for (Entry<byte[], RegionMetrics> entry : hsl.getRegionMetrics().entrySet()) {
219      byte[] encodedRegionName = Bytes.toBytes(RegionInfo.encodeRegionName(entry.getKey()));
220      Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
221      long l = entry.getValue().getCompletedSequenceId();
222      // Don't let smaller sequence ids override greater sequence ids.
223      if (LOG.isTraceEnabled()) {
224        LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue +
225          ", completeSequenceId=" + l);
226      }
227      if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
228        flushedSequenceIdByRegion.put(encodedRegionName, l);
229      } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
230        LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id ("
231            + l + ") that is less than the previous last flushed sequence id ("
232            + existingValue + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
233      }
234      ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
235          computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
236            () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
237      for (Entry<byte[], Long> storeSeqId : entry.getValue().getStoreSequenceId().entrySet()) {
238        byte[] family = storeSeqId.getKey();
239        existingValue = storeFlushedSequenceId.get(family);
240        l = storeSeqId.getValue();
241        if (LOG.isTraceEnabled()) {
242          LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family) +
243            ", existingValue=" + existingValue + ", completeSequenceId=" + l);
244        }
245        // Don't let smaller sequence ids override greater sequence ids.
246        if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
247          storeFlushedSequenceId.put(family, l);
248        }
249      }
250    }
251  }
252
253  public void regionServerReport(ServerName sn,
254    ServerMetrics sl) throws YouAreDeadException {
255    checkIsDead(sn, "REPORT");
256    if (null == this.onlineServers.replace(sn, sl)) {
257      // Already have this host+port combo and its just different start code?
258      // Just let the server in. Presume master joining a running cluster.
259      // recordNewServer is what happens at the end of reportServerStartup.
260      // The only thing we are skipping is passing back to the regionserver
261      // the ServerName to use. Here we presume a master has already done
262      // that so we'll press on with whatever it gave us for ServerName.
263      if (!checkAndRecordNewServer(sn, sl)) {
264        LOG.info("RegionServerReport ignored, could not record the server: " + sn);
265        return; // Not recorded, so no need to move on
266      }
267    }
268    updateLastFlushedSequenceIds(sn, sl);
269  }
270
271  /**
272   * Check is a server of same host and port already exists,
273   * if not, or the existed one got a smaller start code, record it.
274   *
275   * @param serverName the server to check and record
276   * @param sl the server load on the server
277   * @return true if the server is recorded, otherwise, false
278   */
279  boolean checkAndRecordNewServer(final ServerName serverName, final ServerMetrics sl) {
280    ServerName existingServer = null;
281    synchronized (this.onlineServers) {
282      existingServer = findServerWithSameHostnamePortWithLock(serverName);
283      if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
284        LOG.info("Server serverName=" + serverName + " rejected; we already have "
285            + existingServer.toString() + " registered with same hostname and port");
286        return false;
287      }
288      recordNewServerWithLock(serverName, sl);
289    }
290
291    // Tell our listeners that a server was added
292    if (!this.listeners.isEmpty()) {
293      for (ServerListener listener : this.listeners) {
294        listener.serverAdded(serverName);
295      }
296    }
297
298    // Note that we assume that same ts means same server, and don't expire in that case.
299    //  TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky.
300    if (existingServer != null &&
301        (existingServer.getStartcode() < serverName.getStartcode())) {
302      LOG.info("Triggering server recovery; existingServer " +
303          existingServer + " looks stale, new server:" + serverName);
304      expireServer(existingServer);
305    }
306    return true;
307  }
308
309  /**
310   * Find out the region servers crashed between the crash of the previous master instance and the
311   * current master instance and schedule SCP for them.
312   * <p/>
313   * Since the {@code RegionServerTracker} has already helped us to construct the online servers set
314   * by scanning zookeeper, now we can compare the online servers with {@code liveServersFromWALDir}
315   * to find out whether there are servers which are already dead.
316   * <p/>
317   * Must be called inside the initialization method of {@code RegionServerTracker} to avoid
318   * concurrency issue.
319   * @param deadServersFromPE the region servers which already have a SCP associated.
320   * @param liveServersFromWALDir the live region servers from wal directory.
321   */
322  void findDeadServersAndProcess(Set<ServerName> deadServersFromPE,
323      Set<ServerName> liveServersFromWALDir) {
324    deadServersFromPE.forEach(deadservers::putIfAbsent);
325    liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
326      .forEach(this::expireServer);
327  }
328
329  /**
330   * Checks if the clock skew between the server and the master. If the clock skew exceeds the
331   * configured max, it will throw an exception; if it exceeds the configured warning threshold,
332   * it will log a warning but start normally.
333   * @param serverName Incoming servers's name
334   * @param serverCurrentTime
335   * @throws ClockOutOfSyncException if the skew exceeds the configured max value
336   */
337  private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
338      throws ClockOutOfSyncException {
339    long skew = Math.abs(EnvironmentEdgeManager.currentTime() - serverCurrentTime);
340    if (skew > maxSkew) {
341      String message = "Server " + serverName + " has been " +
342        "rejected; Reported time is too far out of sync with master.  " +
343        "Time difference of " + skew + "ms > max allowed of " + maxSkew + "ms";
344      LOG.warn(message);
345      throw new ClockOutOfSyncException(message);
346    } else if (skew > warningSkew){
347      String message = "Reported time for server " + serverName + " is out of sync with master " +
348        "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " +
349        "error threshold is " + maxSkew + "ms)";
350      LOG.warn(message);
351    }
352  }
353
354  /**
355   * Called when RegionServer first reports in for duty and thereafter each
356   * time it heartbeats to make sure it is has not been figured for dead.
357   * If this server is on the dead list, reject it with a YouAreDeadException.
358   * If it was dead but came back with a new start code, remove the old entry
359   * from the dead list.
360   * @param what START or REPORT
361   */
362  private void checkIsDead(final ServerName serverName, final String what)
363      throws YouAreDeadException {
364    if (this.deadservers.isDeadServer(serverName)) {
365      // Exact match: host name, port and start code all match with existing one of the
366      // dead servers. So, this server must be dead. Tell it to kill itself.
367      String message = "Server " + what + " rejected; currently processing " +
368          serverName + " as dead server";
369      LOG.debug(message);
370      throw new YouAreDeadException(message);
371    }
372    // Remove dead server with same hostname and port of newly checking in rs after master
373    // initialization. See HBASE-5916 for more information.
374    if ((this.master == null || this.master.isInitialized()) &&
375        this.deadservers.cleanPreviousInstance(serverName)) {
376      // This server has now become alive after we marked it as dead.
377      // We removed it's previous entry from the dead list to reflect it.
378      LOG.debug("{} {} came back up, removed it from the dead servers list", what, serverName);
379    }
380  }
381
382  /**
383   * Assumes onlineServers is locked.
384   * @return ServerName with matching hostname and port.
385   */
386  private ServerName findServerWithSameHostnamePortWithLock(
387      final ServerName serverName) {
388    ServerName end = ServerName.valueOf(serverName.getHostname(), serverName.getPort(),
389        Long.MAX_VALUE);
390
391    ServerName r = onlineServers.lowerKey(end);
392    if (r != null) {
393      if (ServerName.isSameAddress(r, serverName)) {
394        return r;
395      }
396    }
397    return null;
398  }
399
400  /**
401   * Adds the onlineServers list. onlineServers should be locked.
402   * @param serverName The remote servers name.
403   */
404  void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) {
405    LOG.info("Registering regionserver=" + serverName);
406    this.onlineServers.put(serverName, sl);
407    this.rsAdmins.remove(serverName);
408  }
409
410  public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
411    RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
412    Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
413    builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM);
414    Map<byte[], Long> storeFlushedSequenceId =
415        storeFlushedSequenceIdsByRegion.get(encodedRegionName);
416    if (storeFlushedSequenceId != null) {
417      for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) {
418        builder.addStoreSequenceId(StoreSequenceId.newBuilder()
419            .setFamilyName(UnsafeByteOperations.unsafeWrap(entry.getKey()))
420            .setSequenceId(entry.getValue().longValue()).build());
421      }
422    }
423    return builder.build();
424  }
425
426  /**
427   * @param serverName
428   * @return ServerMetrics if serverName is known else null
429   */
430  public ServerMetrics getLoad(final ServerName serverName) {
431    return this.onlineServers.get(serverName);
432  }
433
434  /**
435   * Compute the average load across all region servers.
436   * Currently, this uses a very naive computation - just uses the number of
437   * regions being served, ignoring stats about number of requests.
438   * @return the average load
439   */
440  public double getAverageLoad() {
441    int totalLoad = 0;
442    int numServers = 0;
443    for (ServerMetrics sl : this.onlineServers.values()) {
444      numServers++;
445      totalLoad += sl.getRegionMetrics().size();
446    }
447    return numServers == 0 ? 0 :
448      (double)totalLoad / (double)numServers;
449  }
450
451  /** @return the count of active regionservers */
452  public int countOfRegionServers() {
453    // Presumes onlineServers is a concurrent map
454    return this.onlineServers.size();
455  }
456
457  /**
458   * @return Read-only map of servers to serverinfo
459   */
460  public Map<ServerName, ServerMetrics> getOnlineServers() {
461    // Presumption is that iterating the returned Map is OK.
462    synchronized (this.onlineServers) {
463      return Collections.unmodifiableMap(this.onlineServers);
464    }
465  }
466
467  public DeadServer getDeadServers() {
468    return this.deadservers;
469  }
470
471  /**
472   * Checks if any dead servers are currently in progress.
473   * @return true if any RS are being processed as dead, false if not
474   */
475  public boolean areDeadServersInProgress() {
476    return this.deadservers.areDeadServersInProgress();
477  }
478
479  void letRegionServersShutdown() {
480    long previousLogTime = 0;
481    ServerName sn = master.getServerName();
482    ZKWatcher zkw = master.getZooKeeper();
483    int onlineServersCt;
484    while ((onlineServersCt = onlineServers.size()) > 0){
485
486      if (System.currentTimeMillis() > (previousLogTime + 1000)) {
487        Set<ServerName> remainingServers = onlineServers.keySet();
488        synchronized (onlineServers) {
489          if (remainingServers.size() == 1 && remainingServers.contains(sn)) {
490            // Master will delete itself later.
491            return;
492          }
493        }
494        StringBuilder sb = new StringBuilder();
495        // It's ok here to not sync on onlineServers - merely logging
496        for (ServerName key : remainingServers) {
497          if (sb.length() > 0) {
498            sb.append(", ");
499          }
500          sb.append(key);
501        }
502        LOG.info("Waiting on regionserver(s) " + sb.toString());
503        previousLogTime = System.currentTimeMillis();
504      }
505
506      try {
507        List<String> servers = getRegionServersInZK(zkw);
508        if (servers == null || servers.isEmpty() || (servers.size() == 1
509            && servers.contains(sn.toString()))) {
510          LOG.info("ZK shows there is only the master self online, exiting now");
511          // Master could have lost some ZK events, no need to wait more.
512          break;
513        }
514      } catch (KeeperException ke) {
515        LOG.warn("Failed to list regionservers", ke);
516        // ZK is malfunctioning, don't hang here
517        break;
518      }
519      synchronized (onlineServers) {
520        try {
521          if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
522        } catch (InterruptedException ignored) {
523          // continue
524        }
525      }
526    }
527  }
528
529  private List<String> getRegionServersInZK(final ZKWatcher zkw)
530  throws KeeperException {
531    return ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
532  }
533
534  /**
535   * Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
536   * @return pid if we queued a ServerCrashProcedure else {@link Procedure#NO_PROC_ID} if we did
537   *         not (could happen for many reasons including the fact that its this server that is
538   *         going down or we already have queued an SCP for this server or SCP processing is
539   *         currently disabled because we are in startup phase).
540   */
541  // Redo test so we can make this protected.
542  public synchronized long expireServer(final ServerName serverName) {
543    return expireServer(serverName, false);
544
545  }
546
547  synchronized long expireServer(final ServerName serverName, boolean force) {
548    // THIS server is going down... can't handle our own expiration.
549    if (serverName.equals(master.getServerName())) {
550      if (!(master.isAborted() || master.isStopped())) {
551        master.stop("We lost our znode?");
552      }
553      return Procedure.NO_PROC_ID;
554    }
555    if (this.deadservers.isDeadServer(serverName)) {
556      LOG.warn("Expiration called on {} but already in DeadServer", serverName);
557      return Procedure.NO_PROC_ID;
558    }
559    moveFromOnlineToDeadServers(serverName);
560
561    // If server is in draining mode, remove corresponding znode
562    // In some tests, the mocked HM may not have ZK Instance, hence null check
563    if (master.getZooKeeper() != null) {
564      String drainingZnode = ZNodePaths
565        .joinZNode(master.getZooKeeper().getZNodePaths().drainingZNode, serverName.getServerName());
566      try {
567        ZKUtil.deleteNodeFailSilent(master.getZooKeeper(), drainingZnode);
568      } catch (KeeperException e) {
569        LOG.warn("Error deleting the draining znode for stopping server " + serverName.getServerName(), e);
570      }
571    }
572    
573    // If cluster is going down, yes, servers are going to be expiring; don't
574    // process as a dead server
575    if (isClusterShutdown()) {
576      LOG.info("Cluster shutdown set; " + serverName +
577        " expired; onlineServers=" + this.onlineServers.size());
578      if (this.onlineServers.isEmpty()) {
579        master.stop("Cluster shutdown set; onlineServer=0");
580      }
581      return Procedure.NO_PROC_ID;
582    }
583    LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
584    long pid = master.getAssignmentManager().submitServerCrash(serverName, true, force);
585    // Tell our listeners that a server was removed
586    if (!this.listeners.isEmpty()) {
587      this.listeners.stream().forEach(l -> l.serverRemoved(serverName));
588    }
589    return pid;
590  }
591
592  /**
593   * Called when server has expired.
594   */
595  // Locking in this class needs cleanup.
596  public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
597    synchronized (this.onlineServers) {
598      boolean online = this.onlineServers.containsKey(sn);
599      if (online) {
600        // Remove the server from the known servers lists and update load info BUT
601        // add to deadservers first; do this so it'll show in dead servers list if
602        // not in online servers list.
603        this.deadservers.putIfAbsent(sn);
604        this.onlineServers.remove(sn);
605        onlineServers.notifyAll();
606      } else {
607        // If not online, that is odd but may happen if 'Unknown Servers' -- where meta
608        // has references to servers not online nor in dead servers list. If
609        // 'Unknown Server', don't add to DeadServers else will be there for ever.
610        LOG.trace("Expiration of {} but server not online", sn);
611      }
612    }
613    this.rsAdmins.remove(sn);
614  }
615
616  /*
617   * Remove the server from the drain list.
618   */
619  public synchronized boolean removeServerFromDrainList(final ServerName sn) {
620    // Warn if the server (sn) is not online.  ServerName is of the form:
621    // <hostname> , <port> , <startcode>
622
623    if (!this.isServerOnline(sn)) {
624      LOG.warn("Server " + sn + " is not currently online. " +
625               "Removing from draining list anyway, as requested.");
626    }
627    // Remove the server from the draining servers lists.
628    return this.drainingServers.remove(sn);
629  }
630
631  /**
632   * Add the server to the drain list.
633   * @param sn
634   * @return True if the server is added or the server is already on the drain list.
635   */
636  public synchronized boolean addServerToDrainList(final ServerName sn) {
637    // Warn if the server (sn) is not online.  ServerName is of the form:
638    // <hostname> , <port> , <startcode>
639
640    if (!this.isServerOnline(sn)) {
641      LOG.warn("Server " + sn + " is not currently online. " +
642               "Ignoring request to add it to draining list.");
643      return false;
644    }
645    // Add the server to the draining servers lists, if it's not already in
646    // it.
647    if (this.drainingServers.contains(sn)) {
648      LOG.warn("Server " + sn + " is already in the draining server list." +
649               "Ignoring request to add it again.");
650      return true;
651    }
652    LOG.info("Server " + sn + " added to draining server list.");
653    return this.drainingServers.add(sn);
654  }
655
656  // RPC methods to region servers
657
658  private HBaseRpcController newRpcController() {
659    return rpcControllerFactory == null ? null : rpcControllerFactory.newController();
660  }
661
662  /**
663   * Sends a WARMUP RPC to the specified server to warmup the specified region.
664   * <p>
665   * A region server could reject the close request because it either does not
666   * have the specified region or the region is being split.
667   * @param server server to warmup a region
668   * @param region region to  warmup
669   */
670  public void sendRegionWarmup(ServerName server,
671      RegionInfo region) {
672    if (server == null) return;
673    try {
674      AdminService.BlockingInterface admin = getRsAdmin(server);
675      HBaseRpcController controller = newRpcController();
676      ProtobufUtil.warmupRegion(controller, admin, region);
677    } catch (IOException e) {
678      LOG.error("Received exception in RPC for warmup server:" +
679        server + "region: " + region +
680        "exception: " + e);
681    }
682  }
683
684  /**
685   * Contacts a region server and waits up to timeout ms
686   * to close the region.  This bypasses the active hmaster.
687   * Pass -1 as timeout if you do not want to wait on result.
688   */
689  public static void closeRegionSilentlyAndWait(ClusterConnection connection,
690    ServerName server, RegionInfo region, long timeout) throws IOException, InterruptedException {
691    AdminService.BlockingInterface rs = connection.getAdmin(server);
692    HBaseRpcController controller = connection.getRpcControllerFactory().newController();
693    try {
694      ProtobufUtil.closeRegion(controller, rs, server, region.getRegionName());
695    } catch (IOException e) {
696      LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e);
697    }
698    if (timeout < 0) {
699      return;
700    }
701    long expiration = timeout + System.currentTimeMillis();
702    while (System.currentTimeMillis() < expiration) {
703      controller.reset();
704      try {
705        RegionInfo rsRegion =
706          ProtobufUtil.getRegionInfo(controller, rs, region.getRegionName());
707        if (rsRegion == null) return;
708      } catch (IOException ioe) {
709        if (ioe instanceof NotServingRegionException ||
710          (ioe instanceof RemoteWithExtrasException &&
711            ((RemoteWithExtrasException)ioe).unwrapRemoteException()
712              instanceof NotServingRegionException)) {
713          // no need to retry again
714          return;
715        }
716        LOG.warn("Exception when retrieving regioninfo from: "
717          + region.getRegionNameAsString(), ioe);
718      }
719      Thread.sleep(1000);
720    }
721    throw new IOException("Region " + region + " failed to close within"
722        + " timeout " + timeout);
723  }
724
725  /**
726   * @param sn
727   * @return Admin interface for the remote regionserver named <code>sn</code>
728   * @throws IOException
729   * @throws RetriesExhaustedException wrapping a ConnectException if failed
730   */
731  public AdminService.BlockingInterface getRsAdmin(final ServerName sn)
732  throws IOException {
733    AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
734    if (admin == null) {
735      LOG.debug("New admin connection to " + sn.toString());
736      if (sn.equals(master.getServerName()) && master instanceof HRegionServer) {
737        // A master is also a region server now, see HBASE-10569 for details
738        admin = ((HRegionServer)master).getRSRpcServices();
739      } else {
740        admin = this.connection.getAdmin(sn);
741      }
742      this.rsAdmins.put(sn, admin);
743    }
744    return admin;
745  }
746
747  /**
748   * Calculate min necessary to start. This is not an absolute. It is just
749   * a friction that will cause us hang around a bit longer waiting on
750   * RegionServers to check-in.
751   */
752  private int getMinToStart() {
753    if (master.isInMaintenanceMode()) {
754      // If in maintenance mode, then master hosting meta will be the only server available
755      return 1;
756    }
757
758    int minimumRequired = 1;
759    if (LoadBalancer.isTablesOnMaster(master.getConfiguration()) &&
760        LoadBalancer.isSystemTablesOnlyOnMaster(master.getConfiguration())) {
761      // If Master is carrying regions it will show up as a 'server', but is not handling user-
762      // space regions, so we need a second server.
763      minimumRequired = 2;
764    }
765
766    int minToStart = this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
767    // Ensure we are never less than minimumRequired else stuff won't work.
768    return Math.max(minToStart, minimumRequired);
769  }
770
771  /**
772   * Wait for the region servers to report in.
773   * We will wait until one of this condition is met:
774   *  - the master is stopped
775   *  - the 'hbase.master.wait.on.regionservers.maxtostart' number of
776   *    region servers is reached
777   *  - the 'hbase.master.wait.on.regionservers.mintostart' is reached AND
778   *   there have been no new region server in for
779   *      'hbase.master.wait.on.regionservers.interval' time AND
780   *   the 'hbase.master.wait.on.regionservers.timeout' is reached
781   *
782   * @throws InterruptedException
783   */
784  public void waitForRegionServers(MonitoredTask status) throws InterruptedException {
785    final long interval = this.master.getConfiguration().
786        getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
787    final long timeout = this.master.getConfiguration().
788        getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
789    // Min is not an absolute; just a friction making us wait longer on server checkin.
790    int minToStart = getMinToStart();
791    int maxToStart = this.master.getConfiguration().
792        getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
793    if (maxToStart < minToStart) {
794      LOG.warn(String.format("The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
795          WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart,
796          WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
797      maxToStart = Integer.MAX_VALUE;
798    }
799
800    long now =  System.currentTimeMillis();
801    final long startTime = now;
802    long slept = 0;
803    long lastLogTime = 0;
804    long lastCountChange = startTime;
805    int count = countOfRegionServers();
806    int oldCount = 0;
807    // This while test is a little hard to read. We try to comment it in below but in essence:
808    // Wait if Master is not stopped and the number of regionservers that have checked-in is
809    // less than the maxToStart. Both of these conditions will be true near universally.
810    // Next, we will keep cycling if ANY of the following three conditions are true:
811    // 1. The time since a regionserver registered is < interval (means servers are actively checking in).
812    // 2. We are under the total timeout.
813    // 3. The count of servers is < minimum.
814    for (ServerListener listener: this.listeners) {
815      listener.waiting();
816    }
817    while (!this.master.isStopped() && !isClusterShutdown() && count < maxToStart &&
818        ((lastCountChange + interval) > now || timeout > slept || count < minToStart)) {
819      // Log some info at every interval time or if there is a change
820      if (oldCount != count || lastLogTime + interval < now) {
821        lastLogTime = now;
822        String msg =
823            "Waiting on regionserver count=" + count + "; waited="+
824                slept + "ms, expecting min=" + minToStart + " server(s), max="+ getStrForMax(maxToStart) +
825                " server(s), " + "timeout=" + timeout + "ms, lastChange=" + (lastCountChange - now) + "ms";
826        LOG.info(msg);
827        status.setStatus(msg);
828      }
829
830      // We sleep for some time
831      final long sleepTime = 50;
832      Thread.sleep(sleepTime);
833      now =  System.currentTimeMillis();
834      slept = now - startTime;
835
836      oldCount = count;
837      count = countOfRegionServers();
838      if (count != oldCount) {
839        lastCountChange = now;
840      }
841    }
842    // Did we exit the loop because cluster is going down?
843    if (isClusterShutdown()) {
844      this.master.stop("Cluster shutdown");
845    }
846    LOG.info("Finished waiting on RegionServer count=" + count + "; waited=" + slept + "ms," +
847        " expected min=" + minToStart + " server(s), max=" +  getStrForMax(maxToStart) + " server(s),"+
848        " master is "+ (this.master.isStopped() ? "stopped.": "running"));
849  }
850
851  private String getStrForMax(final int max) {
852    return max == Integer.MAX_VALUE? "NO_LIMIT": Integer.toString(max);
853  }
854
855  /**
856   * @return A copy of the internal list of online servers.
857   */
858  public List<ServerName> getOnlineServersList() {
859    // TODO: optimize the load balancer call so we don't need to make a new list
860    // TODO: FIX. THIS IS POPULAR CALL.
861    return new ArrayList<>(this.onlineServers.keySet());
862  }
863
864  /**
865   * @param keys The target server name
866   * @param idleServerPredicator Evaluates the server on the given load
867   * @return A copy of the internal list of online servers matched by the predicator
868   */
869  public List<ServerName> getOnlineServersListWithPredicator(List<ServerName> keys,
870    Predicate<ServerMetrics> idleServerPredicator) {
871    List<ServerName> names = new ArrayList<>();
872    if (keys != null && idleServerPredicator != null) {
873      keys.forEach(name -> {
874        ServerMetrics load = onlineServers.get(name);
875        if (load != null) {
876          if (idleServerPredicator.test(load)) {
877            names.add(name);
878          }
879        }
880      });
881    }
882    return names;
883  }
884
885  /**
886   * @return A copy of the internal list of draining servers.
887   */
888  public List<ServerName> getDrainingServersList() {
889    return new ArrayList<>(this.drainingServers);
890  }
891
892  public boolean isServerOnline(ServerName serverName) {
893    return serverName != null && onlineServers.containsKey(serverName);
894  }
895
896  public enum ServerLiveState {
897    LIVE,
898    DEAD,
899    UNKNOWN
900  }
901
902  /**
903   * @return whether the server is online, dead, or unknown.
904   */
905  public synchronized ServerLiveState isServerKnownAndOnline(ServerName serverName) {
906    return onlineServers.containsKey(serverName) ? ServerLiveState.LIVE
907      : (deadservers.isDeadServer(serverName) ? ServerLiveState.DEAD : ServerLiveState.UNKNOWN);
908  }
909
910  /**
911   * Check if a server is known to be dead.  A server can be online,
912   * or known to be dead, or unknown to this manager (i.e, not online,
913   * not known to be dead either; it is simply not tracked by the
914   * master any more, for example, a very old previous instance).
915   */
916  public synchronized boolean isServerDead(ServerName serverName) {
917    return serverName == null || deadservers.isDeadServer(serverName);
918  }
919
920  public void shutdownCluster() {
921    String statusStr = "Cluster shutdown requested of master=" + this.master.getServerName();
922    LOG.info(statusStr);
923    this.clusterShutdown.set(true);
924    if (onlineServers.isEmpty()) {
925      // we do not synchronize here so this may cause a double stop, but not a big deal
926      master.stop("OnlineServer=0 right after cluster shutdown set");
927    }
928  }
929
930  public boolean isClusterShutdown() {
931    return this.clusterShutdown.get();
932  }
933
934  /**
935   * Stop the ServerManager.
936   */
937  public void stop() {
938    // Nothing to do.
939  }
940
941  /**
942   * Creates a list of possible destinations for a region. It contains the online servers, but not
943   *  the draining or dying servers.
944   *  @param serversToExclude can be null if there is no server to exclude
945   */
946  public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude){
947    final List<ServerName> destServers = getOnlineServersList();
948
949    if (serversToExclude != null) {
950      destServers.removeAll(serversToExclude);
951    }
952
953    // Loop through the draining server list and remove them from the server list
954    final List<ServerName> drainingServersCopy = getDrainingServersList();
955    destServers.removeAll(drainingServersCopy);
956
957    return destServers;
958  }
959
960  /**
961   * Calls {@link #createDestinationServersList} without server to exclude.
962   */
963  public List<ServerName> createDestinationServersList(){
964    return createDestinationServersList(null);
965  }
966
967  /**
968   * To clear any dead server with same host name and port of any online server
969   */
970  void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
971    for (ServerName serverName : getOnlineServersList()) {
972      deadservers.cleanAllPreviousInstances(serverName);
973    }
974  }
975
976  /**
977   * Called by delete table and similar to notify the ServerManager that a region was removed.
978   */
979  public void removeRegion(final RegionInfo regionInfo) {
980    final byte[] encodedName = regionInfo.getEncodedNameAsBytes();
981    storeFlushedSequenceIdsByRegion.remove(encodedName);
982    flushedSequenceIdByRegion.remove(encodedName);
983  }
984
985  public boolean isRegionInServerManagerStates(final RegionInfo hri) {
986    final byte[] encodedName = hri.getEncodedNameAsBytes();
987    return (storeFlushedSequenceIdsByRegion.containsKey(encodedName)
988        || flushedSequenceIdByRegion.containsKey(encodedName));
989  }
990
991  /**
992   * Called by delete table and similar to notify the ServerManager that a region was removed.
993   */
994  public void removeRegions(final List<RegionInfo> regions) {
995    for (RegionInfo hri: regions) {
996      removeRegion(hri);
997    }
998  }
999
1000  /**
1001   * May return 0 when server is not online.
1002   */
1003  public int getVersionNumber(ServerName serverName) {
1004    ServerMetrics serverMetrics = onlineServers.get(serverName);
1005    return serverMetrics != null ? serverMetrics.getVersionNumber() : 0;
1006  }
1007
1008  /**
1009   * May return "0.0.0" when server is not online
1010   */
1011  public String getVersion(ServerName serverName) {
1012    ServerMetrics serverMetrics = onlineServers.get(serverName);
1013    return serverMetrics != null ? serverMetrics.getVersion() : "0.0.0";
1014  }
1015
1016  public int getInfoPort(ServerName serverName) {
1017    ServerMetrics serverMetrics = onlineServers.get(serverName);
1018    return serverMetrics != null ? serverMetrics.getInfoServerPort() : 0;
1019  }
1020}