001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master;
020
021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
022
023import java.io.IOException;
024import java.net.InetAddress;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031import java.util.Set;
032import java.util.concurrent.ConcurrentNavigableMap;
033import java.util.concurrent.ConcurrentSkipListMap;
034import java.util.concurrent.CopyOnWriteArrayList;
035import java.util.concurrent.atomic.AtomicBoolean;
036import java.util.function.Predicate;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.ClockOutOfSyncException;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.NotServingRegionException;
041import org.apache.hadoop.hbase.RegionMetrics;
042import org.apache.hadoop.hbase.ServerMetrics;
043import org.apache.hadoop.hbase.ServerMetricsBuilder;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.YouAreDeadException;
046import org.apache.hadoop.hbase.client.ClusterConnection;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.RetriesExhaustedException;
049import org.apache.hadoop.hbase.ipc.HBaseRpcController;
050import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
051import org.apache.hadoop.hbase.monitoring.MonitoredTask;
052import org.apache.hadoop.hbase.procedure2.Procedure;
053import org.apache.hadoop.hbase.regionserver.HRegionServer;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
056import org.apache.hadoop.hbase.zookeeper.ZKUtil;
057import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
058import org.apache.yetus.audience.InterfaceAudience;
059import org.apache.zookeeper.KeeperException;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
064import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
065
066import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
070import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
071
072/**
073 * The ServerManager class manages info about region servers.
074 * <p>
075 * Maintains lists of online and dead servers.  Processes the startups,
076 * shutdowns, and deaths of region servers.
077 * <p>
078 * Servers are distinguished in two different ways.  A given server has a
079 * location, specified by hostname and port, and of which there can only be one
080 * online at any given time.  A server instance is specified by the location
081 * (hostname and port) as well as the startcode (timestamp from when the server
082 * was started).  This is used to differentiate a restarted instance of a given
083 * server from the original instance.
084 * <p>
085 * If a sever is known not to be running any more, it is called dead. The dead
086 * server needs to be handled by a ServerShutdownHandler.  If the handler is not
087 * enabled yet, the server can't be handled right away so it is queued up.
088 * After the handler is enabled, the server will be submitted to a handler to handle.
089 * However, the handler may be just partially enabled.  If so,
090 * the server cannot be fully processed, and be queued up for further processing.
091 * A server is fully processed only after the handler is fully enabled
092 * and has completed the handling.
093 */
094@InterfaceAudience.Private
095public class ServerManager {
096  public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
097      "hbase.master.wait.on.regionservers.maxtostart";
098
099  public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
100      "hbase.master.wait.on.regionservers.mintostart";
101
102  public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
103      "hbase.master.wait.on.regionservers.timeout";
104
105  public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
106      "hbase.master.wait.on.regionservers.interval";
107
108  private static final Logger LOG = LoggerFactory.getLogger(ServerManager.class);
109
110  // Set if we are to shutdown the cluster.
111  private AtomicBoolean clusterShutdown = new AtomicBoolean(false);
112
113  /**
114   * The last flushed sequence id for a region.
115   */
116  private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
117    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
118
119  /**
120   * The last flushed sequence id for a store in a region.
121   */
122  private final ConcurrentNavigableMap<byte[], ConcurrentNavigableMap<byte[], Long>>
123    storeFlushedSequenceIdsByRegion = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
124
125  /** Map of registered servers to their current load */
126  private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
127    new ConcurrentSkipListMap<>();
128
129  /**
130   * Map of admin interfaces per registered regionserver; these interfaces we use to control
131   * regionservers out on the cluster
132   */
133  private final Map<ServerName, AdminService.BlockingInterface> rsAdmins = new HashMap<>();
134
135  /** List of region servers that should not get any more new regions. */
136  private final ArrayList<ServerName> drainingServers = new ArrayList<>();
137
138  private final MasterServices master;
139  private final ClusterConnection connection;
140
141  private final DeadServer deadservers = new DeadServer();
142
143  private final long maxSkew;
144  private final long warningSkew;
145
146  private final RpcControllerFactory rpcControllerFactory;
147
148  /** Listeners that are called on server events. */
149  private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
150
151  /**
152   * Constructor.
153   */
154  public ServerManager(final MasterServices master) {
155    this.master = master;
156    Configuration c = master.getConfiguration();
157    maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
158    warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
159    this.connection = master.getClusterConnection();
160    this.rpcControllerFactory = this.connection == null? null: connection.getRpcControllerFactory();
161  }
162
163  /**
164   * Add the listener to the notification list.
165   * @param listener The ServerListener to register
166   */
167  public void registerListener(final ServerListener listener) {
168    this.listeners.add(listener);
169  }
170
171  /**
172   * Remove the listener from the notification list.
173   * @param listener The ServerListener to unregister
174   */
175  public boolean unregisterListener(final ServerListener listener) {
176    return this.listeners.remove(listener);
177  }
178
179  /**
180   * Let the server manager know a new regionserver has come online
181   * @param request the startup request
182   * @param versionNumber the version number of the new regionserver
183   * @param version the version of the new regionserver, could contain strings like "SNAPSHOT"
184   * @param ia the InetAddress from which request is received
185   * @return The ServerName we know this server as.
186   * @throws IOException
187   */
188  ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber,
189      String version, InetAddress ia) throws IOException {
190    // Test for case where we get a region startup message from a regionserver
191    // that has been quickly restarted but whose znode expiration handler has
192    // not yet run, or from a server whose fail we are currently processing.
193    // Test its host+port combo is present in serverAddressToServerInfo. If it
194    // is, reject the server and trigger its expiration. The next time it comes
195    // in, it should have been removed from serverAddressToServerInfo and queued
196    // for processing by ProcessServerShutdown.
197
198    final String hostname =
199      request.hasUseThisHostnameInstead() ? request.getUseThisHostnameInstead() : ia.getHostName();
200    ServerName sn = ServerName.valueOf(hostname, request.getPort(), request.getServerStartCode());
201    checkClockSkew(sn, request.getServerCurrentTime());
202    checkIsDead(sn, "STARTUP");
203    if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber, version))) {
204      LOG.warn(
205        "THIS SHOULD NOT HAPPEN, RegionServerStartup" + " could not record the server: " + sn);
206    }
207    return sn;
208  }
209
210  /**
211   * Updates last flushed sequence Ids for the regions on server sn
212   * @param sn
213   * @param hsl
214   */
215  private void updateLastFlushedSequenceIds(ServerName sn, ServerMetrics hsl) {
216    for (Entry<byte[], RegionMetrics> entry : hsl.getRegionMetrics().entrySet()) {
217      byte[] encodedRegionName = Bytes.toBytes(RegionInfo.encodeRegionName(entry.getKey()));
218      Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
219      long l = entry.getValue().getCompletedSequenceId();
220      // Don't let smaller sequence ids override greater sequence ids.
221      if (LOG.isTraceEnabled()) {
222        LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue +
223          ", completeSequenceId=" + l);
224      }
225      if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
226        flushedSequenceIdByRegion.put(encodedRegionName, l);
227      } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
228        LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id ("
229            + l + ") that is less than the previous last flushed sequence id ("
230            + existingValue + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
231      }
232      ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
233          computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
234            () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
235      for (Entry<byte[], Long> storeSeqId : entry.getValue().getStoreSequenceId().entrySet()) {
236        byte[] family = storeSeqId.getKey();
237        existingValue = storeFlushedSequenceId.get(family);
238        l = storeSeqId.getValue();
239        if (LOG.isTraceEnabled()) {
240          LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family) +
241            ", existingValue=" + existingValue + ", completeSequenceId=" + l);
242        }
243        // Don't let smaller sequence ids override greater sequence ids.
244        if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
245          storeFlushedSequenceId.put(family, l);
246        }
247      }
248    }
249  }
250
251  @VisibleForTesting
252  public void regionServerReport(ServerName sn,
253    ServerMetrics sl) throws YouAreDeadException {
254    checkIsDead(sn, "REPORT");
255    if (null == this.onlineServers.replace(sn, sl)) {
256      // Already have this host+port combo and its just different start code?
257      // Just let the server in. Presume master joining a running cluster.
258      // recordNewServer is what happens at the end of reportServerStartup.
259      // The only thing we are skipping is passing back to the regionserver
260      // the ServerName to use. Here we presume a master has already done
261      // that so we'll press on with whatever it gave us for ServerName.
262      if (!checkAndRecordNewServer(sn, sl)) {
263        LOG.info("RegionServerReport ignored, could not record the server: " + sn);
264        return; // Not recorded, so no need to move on
265      }
266    }
267    updateLastFlushedSequenceIds(sn, sl);
268  }
269
270  /**
271   * Check is a server of same host and port already exists,
272   * if not, or the existed one got a smaller start code, record it.
273   *
274   * @param serverName the server to check and record
275   * @param sl the server load on the server
276   * @return true if the server is recorded, otherwise, false
277   */
278  boolean checkAndRecordNewServer(final ServerName serverName, final ServerMetrics sl) {
279    ServerName existingServer = null;
280    synchronized (this.onlineServers) {
281      existingServer = findServerWithSameHostnamePortWithLock(serverName);
282      if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
283        LOG.info("Server serverName=" + serverName + " rejected; we already have "
284            + existingServer.toString() + " registered with same hostname and port");
285        return false;
286      }
287      recordNewServerWithLock(serverName, sl);
288    }
289
290    // Tell our listeners that a server was added
291    if (!this.listeners.isEmpty()) {
292      for (ServerListener listener : this.listeners) {
293        listener.serverAdded(serverName);
294      }
295    }
296
297    // Note that we assume that same ts means same server, and don't expire in that case.
298    //  TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky.
299    if (existingServer != null &&
300        (existingServer.getStartcode() < serverName.getStartcode())) {
301      LOG.info("Triggering server recovery; existingServer " +
302          existingServer + " looks stale, new server:" + serverName);
303      expireServer(existingServer);
304    }
305    return true;
306  }
307
308  /**
309   * Find out the region servers crashed between the crash of the previous master instance and the
310   * current master instance and schedule SCP for them.
311   * <p/>
312   * Since the {@code RegionServerTracker} has already helped us to construct the online servers set
313   * by scanning zookeeper, now we can compare the online servers with {@code liveServersFromWALDir}
314   * to find out whether there are servers which are already dead.
315   * <p/>
316   * Must be called inside the initialization method of {@code RegionServerTracker} to avoid
317   * concurrency issue.
318   * @param deadServersFromPE the region servers which already have a SCP associated.
319   * @param liveServersFromWALDir the live region servers from wal directory.
320   */
321  void findDeadServersAndProcess(Set<ServerName> deadServersFromPE,
322      Set<ServerName> liveServersFromWALDir) {
323    deadServersFromPE.forEach(deadservers::putIfAbsent);
324    liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
325      .forEach(this::expireServer);
326  }
327
328  /**
329   * Checks if the clock skew between the server and the master. If the clock skew exceeds the
330   * configured max, it will throw an exception; if it exceeds the configured warning threshold,
331   * it will log a warning but start normally.
332   * @param serverName Incoming servers's name
333   * @param serverCurrentTime
334   * @throws ClockOutOfSyncException if the skew exceeds the configured max value
335   */
336  private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
337      throws ClockOutOfSyncException {
338    long skew = Math.abs(EnvironmentEdgeManager.currentTime() - serverCurrentTime);
339    if (skew > maxSkew) {
340      String message = "Server " + serverName + " has been " +
341        "rejected; Reported time is too far out of sync with master.  " +
342        "Time difference of " + skew + "ms > max allowed of " + maxSkew + "ms";
343      LOG.warn(message);
344      throw new ClockOutOfSyncException(message);
345    } else if (skew > warningSkew){
346      String message = "Reported time for server " + serverName + " is out of sync with master " +
347        "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " +
348        "error threshold is " + maxSkew + "ms)";
349      LOG.warn(message);
350    }
351  }
352
353  /**
354   * Called when RegionServer first reports in for duty and thereafter each
355   * time it heartbeats to make sure it is has not been figured for dead.
356   * If this server is on the dead list, reject it with a YouAreDeadException.
357   * If it was dead but came back with a new start code, remove the old entry
358   * from the dead list.
359   * @param what START or REPORT
360   */
361  private void checkIsDead(final ServerName serverName, final String what)
362      throws YouAreDeadException {
363    if (this.deadservers.isDeadServer(serverName)) {
364      // Exact match: host name, port and start code all match with existing one of the
365      // dead servers. So, this server must be dead. Tell it to kill itself.
366      String message = "Server " + what + " rejected; currently processing " +
367          serverName + " as dead server";
368      LOG.debug(message);
369      throw new YouAreDeadException(message);
370    }
371    // Remove dead server with same hostname and port of newly checking in rs after master
372    // initialization. See HBASE-5916 for more information.
373    if ((this.master == null || this.master.isInitialized()) &&
374        this.deadservers.cleanPreviousInstance(serverName)) {
375      // This server has now become alive after we marked it as dead.
376      // We removed it's previous entry from the dead list to reflect it.
377      LOG.debug("{} {} came back up, removed it from the dead servers list", what, serverName);
378    }
379  }
380
381  /**
382   * Assumes onlineServers is locked.
383   * @return ServerName with matching hostname and port.
384   */
385  private ServerName findServerWithSameHostnamePortWithLock(
386      final ServerName serverName) {
387    ServerName end = ServerName.valueOf(serverName.getHostname(), serverName.getPort(),
388        Long.MAX_VALUE);
389
390    ServerName r = onlineServers.lowerKey(end);
391    if (r != null) {
392      if (ServerName.isSameAddress(r, serverName)) {
393        return r;
394      }
395    }
396    return null;
397  }
398
399  /**
400   * Adds the onlineServers list. onlineServers should be locked.
401   * @param serverName The remote servers name.
402   */
403  @VisibleForTesting
404  void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) {
405    LOG.info("Registering regionserver=" + serverName);
406    this.onlineServers.put(serverName, sl);
407    this.rsAdmins.remove(serverName);
408  }
409
410  public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
411    RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
412    Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
413    builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM);
414    Map<byte[], Long> storeFlushedSequenceId =
415        storeFlushedSequenceIdsByRegion.get(encodedRegionName);
416    if (storeFlushedSequenceId != null) {
417      for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) {
418        builder.addStoreSequenceId(StoreSequenceId.newBuilder()
419            .setFamilyName(UnsafeByteOperations.unsafeWrap(entry.getKey()))
420            .setSequenceId(entry.getValue().longValue()).build());
421      }
422    }
423    return builder.build();
424  }
425
426  /**
427   * @param serverName
428   * @return ServerMetrics if serverName is known else null
429   */
430  public ServerMetrics getLoad(final ServerName serverName) {
431    return this.onlineServers.get(serverName);
432  }
433
434  /**
435   * Compute the average load across all region servers.
436   * Currently, this uses a very naive computation - just uses the number of
437   * regions being served, ignoring stats about number of requests.
438   * @return the average load
439   */
440  public double getAverageLoad() {
441    int totalLoad = 0;
442    int numServers = 0;
443    for (ServerMetrics sl : this.onlineServers.values()) {
444      numServers++;
445      totalLoad += sl.getRegionMetrics().size();
446    }
447    return numServers == 0 ? 0 :
448      (double)totalLoad / (double)numServers;
449  }
450
451  /** @return the count of active regionservers */
452  public int countOfRegionServers() {
453    // Presumes onlineServers is a concurrent map
454    return this.onlineServers.size();
455  }
456
457  /**
458   * @return Read-only map of servers to serverinfo
459   */
460  public Map<ServerName, ServerMetrics> getOnlineServers() {
461    // Presumption is that iterating the returned Map is OK.
462    synchronized (this.onlineServers) {
463      return Collections.unmodifiableMap(this.onlineServers);
464    }
465  }
466
467  public DeadServer getDeadServers() {
468    return this.deadservers;
469  }
470
471  /**
472   * Checks if any dead servers are currently in progress.
473   * @return true if any RS are being processed as dead, false if not
474   */
475  public boolean areDeadServersInProgress() {
476    return this.deadservers.areDeadServersInProgress();
477  }
478
479  void letRegionServersShutdown() {
480    long previousLogTime = 0;
481    ServerName sn = master.getServerName();
482    ZKWatcher zkw = master.getZooKeeper();
483    int onlineServersCt;
484    while ((onlineServersCt = onlineServers.size()) > 0){
485
486      if (System.currentTimeMillis() > (previousLogTime + 1000)) {
487        Set<ServerName> remainingServers = onlineServers.keySet();
488        synchronized (onlineServers) {
489          if (remainingServers.size() == 1 && remainingServers.contains(sn)) {
490            // Master will delete itself later.
491            return;
492          }
493        }
494        StringBuilder sb = new StringBuilder();
495        // It's ok here to not sync on onlineServers - merely logging
496        for (ServerName key : remainingServers) {
497          if (sb.length() > 0) {
498            sb.append(", ");
499          }
500          sb.append(key);
501        }
502        LOG.info("Waiting on regionserver(s) " + sb.toString());
503        previousLogTime = System.currentTimeMillis();
504      }
505
506      try {
507        List<String> servers = getRegionServersInZK(zkw);
508        if (servers == null || servers.isEmpty() || (servers.size() == 1
509            && servers.contains(sn.toString()))) {
510          LOG.info("ZK shows there is only the master self online, exiting now");
511          // Master could have lost some ZK events, no need to wait more.
512          break;
513        }
514      } catch (KeeperException ke) {
515        LOG.warn("Failed to list regionservers", ke);
516        // ZK is malfunctioning, don't hang here
517        break;
518      }
519      synchronized (onlineServers) {
520        try {
521          if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
522        } catch (InterruptedException ignored) {
523          // continue
524        }
525      }
526    }
527  }
528
529  private List<String> getRegionServersInZK(final ZKWatcher zkw)
530  throws KeeperException {
531    return ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
532  }
533
534  /**
535   * Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
536   * @return pid if we queued a ServerCrashProcedure else {@link Procedure#NO_PROC_ID} if we did
537   *         not (could happen for many reasons including the fact that its this server that is
538   *         going down or we already have queued an SCP for this server or SCP processing is
539   *         currently disabled because we are in startup phase).
540   */
541  @VisibleForTesting // Redo test so we can make this protected.
542  public synchronized long expireServer(final ServerName serverName) {
543    return expireServer(serverName, false);
544
545  }
546
547  synchronized long expireServer(final ServerName serverName, boolean force) {
548    // THIS server is going down... can't handle our own expiration.
549    if (serverName.equals(master.getServerName())) {
550      if (!(master.isAborted() || master.isStopped())) {
551        master.stop("We lost our znode?");
552      }
553      return Procedure.NO_PROC_ID;
554    }
555    if (this.deadservers.isDeadServer(serverName)) {
556      LOG.warn("Expiration called on {} but already in DeadServer", serverName);
557      return Procedure.NO_PROC_ID;
558    }
559    moveFromOnlineToDeadServers(serverName);
560
561    // If cluster is going down, yes, servers are going to be expiring; don't
562    // process as a dead server
563    if (isClusterShutdown()) {
564      LOG.info("Cluster shutdown set; " + serverName +
565        " expired; onlineServers=" + this.onlineServers.size());
566      if (this.onlineServers.isEmpty()) {
567        master.stop("Cluster shutdown set; onlineServer=0");
568      }
569      return Procedure.NO_PROC_ID;
570    }
571    LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
572    long pid = master.getAssignmentManager().submitServerCrash(serverName, true, force);
573    // Tell our listeners that a server was removed
574    if (!this.listeners.isEmpty()) {
575      this.listeners.stream().forEach(l -> l.serverRemoved(serverName));
576    }
577    return pid;
578  }
579
580  /**
581   * Called when server has expired.
582   */
583  // Locking in this class needs cleanup.
584  @VisibleForTesting
585  public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
586    synchronized (this.onlineServers) {
587      boolean online = this.onlineServers.containsKey(sn);
588      if (online) {
589        // Remove the server from the known servers lists and update load info BUT
590        // add to deadservers first; do this so it'll show in dead servers list if
591        // not in online servers list.
592        this.deadservers.putIfAbsent(sn);
593        this.onlineServers.remove(sn);
594        onlineServers.notifyAll();
595      } else {
596        // If not online, that is odd but may happen if 'Unknown Servers' -- where meta
597        // has references to servers not online nor in dead servers list. If
598        // 'Unknown Server', don't add to DeadServers else will be there for ever.
599        LOG.trace("Expiration of {} but server not online", sn);
600      }
601    }
602    this.rsAdmins.remove(sn);
603  }
604
605  /*
606   * Remove the server from the drain list.
607   */
608  public synchronized boolean removeServerFromDrainList(final ServerName sn) {
609    // Warn if the server (sn) is not online.  ServerName is of the form:
610    // <hostname> , <port> , <startcode>
611
612    if (!this.isServerOnline(sn)) {
613      LOG.warn("Server " + sn + " is not currently online. " +
614               "Removing from draining list anyway, as requested.");
615    }
616    // Remove the server from the draining servers lists.
617    return this.drainingServers.remove(sn);
618  }
619
620  /**
621   * Add the server to the drain list.
622   * @param sn
623   * @return True if the server is added or the server is already on the drain list.
624   */
625  public synchronized boolean addServerToDrainList(final ServerName sn) {
626    // Warn if the server (sn) is not online.  ServerName is of the form:
627    // <hostname> , <port> , <startcode>
628
629    if (!this.isServerOnline(sn)) {
630      LOG.warn("Server " + sn + " is not currently online. " +
631               "Ignoring request to add it to draining list.");
632      return false;
633    }
634    // Add the server to the draining servers lists, if it's not already in
635    // it.
636    if (this.drainingServers.contains(sn)) {
637      LOG.warn("Server " + sn + " is already in the draining server list." +
638               "Ignoring request to add it again.");
639      return true;
640    }
641    LOG.info("Server " + sn + " added to draining server list.");
642    return this.drainingServers.add(sn);
643  }
644
645  // RPC methods to region servers
646
647  private HBaseRpcController newRpcController() {
648    return rpcControllerFactory == null ? null : rpcControllerFactory.newController();
649  }
650
651  /**
652   * Sends a WARMUP RPC to the specified server to warmup the specified region.
653   * <p>
654   * A region server could reject the close request because it either does not
655   * have the specified region or the region is being split.
656   * @param server server to warmup a region
657   * @param region region to  warmup
658   */
659  public void sendRegionWarmup(ServerName server,
660      RegionInfo region) {
661    if (server == null) return;
662    try {
663      AdminService.BlockingInterface admin = getRsAdmin(server);
664      HBaseRpcController controller = newRpcController();
665      ProtobufUtil.warmupRegion(controller, admin, region);
666    } catch (IOException e) {
667      LOG.error("Received exception in RPC for warmup server:" +
668        server + "region: " + region +
669        "exception: " + e);
670    }
671  }
672
673  /**
674   * Contacts a region server and waits up to timeout ms
675   * to close the region.  This bypasses the active hmaster.
676   * Pass -1 as timeout if you do not want to wait on result.
677   */
678  public static void closeRegionSilentlyAndWait(ClusterConnection connection,
679    ServerName server, RegionInfo region, long timeout) throws IOException, InterruptedException {
680    AdminService.BlockingInterface rs = connection.getAdmin(server);
681    HBaseRpcController controller = connection.getRpcControllerFactory().newController();
682    try {
683      ProtobufUtil.closeRegion(controller, rs, server, region.getRegionName());
684    } catch (IOException e) {
685      LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e);
686    }
687    if (timeout < 0) {
688      return;
689    }
690    long expiration = timeout + System.currentTimeMillis();
691    while (System.currentTimeMillis() < expiration) {
692      controller.reset();
693      try {
694        RegionInfo rsRegion =
695          ProtobufUtil.getRegionInfo(controller, rs, region.getRegionName());
696        if (rsRegion == null) return;
697      } catch (IOException ioe) {
698        if (ioe instanceof NotServingRegionException) // no need to retry again
699          return;
700        LOG.warn("Exception when retrieving regioninfo from: "
701          + region.getRegionNameAsString(), ioe);
702      }
703      Thread.sleep(1000);
704    }
705    throw new IOException("Region " + region + " failed to close within"
706        + " timeout " + timeout);
707  }
708
709  /**
710   * @param sn
711   * @return Admin interface for the remote regionserver named <code>sn</code>
712   * @throws IOException
713   * @throws RetriesExhaustedException wrapping a ConnectException if failed
714   */
715  public AdminService.BlockingInterface getRsAdmin(final ServerName sn)
716  throws IOException {
717    AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
718    if (admin == null) {
719      LOG.debug("New admin connection to " + sn.toString());
720      if (sn.equals(master.getServerName()) && master instanceof HRegionServer) {
721        // A master is also a region server now, see HBASE-10569 for details
722        admin = ((HRegionServer)master).getRSRpcServices();
723      } else {
724        admin = this.connection.getAdmin(sn);
725      }
726      this.rsAdmins.put(sn, admin);
727    }
728    return admin;
729  }
730
731  /**
732   * Calculate min necessary to start. This is not an absolute. It is just
733   * a friction that will cause us hang around a bit longer waiting on
734   * RegionServers to check-in.
735   */
736  private int getMinToStart() {
737    if (master.isInMaintenanceMode()) {
738      // If in maintenance mode, then master hosting meta will be the only server available
739      return 1;
740    }
741
742    int minimumRequired = 1;
743    if (LoadBalancer.isTablesOnMaster(master.getConfiguration()) &&
744        LoadBalancer.isSystemTablesOnlyOnMaster(master.getConfiguration())) {
745      // If Master is carrying regions it will show up as a 'server', but is not handling user-
746      // space regions, so we need a second server.
747      minimumRequired = 2;
748    }
749
750    int minToStart = this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
751    // Ensure we are never less than minimumRequired else stuff won't work.
752    return Math.max(minToStart, minimumRequired);
753  }
754
755  /**
756   * Wait for the region servers to report in.
757   * We will wait until one of this condition is met:
758   *  - the master is stopped
759   *  - the 'hbase.master.wait.on.regionservers.maxtostart' number of
760   *    region servers is reached
761   *  - the 'hbase.master.wait.on.regionservers.mintostart' is reached AND
762   *   there have been no new region server in for
763   *      'hbase.master.wait.on.regionservers.interval' time AND
764   *   the 'hbase.master.wait.on.regionservers.timeout' is reached
765   *
766   * @throws InterruptedException
767   */
768  public void waitForRegionServers(MonitoredTask status) throws InterruptedException {
769    final long interval = this.master.getConfiguration().
770        getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
771    final long timeout = this.master.getConfiguration().
772        getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
773    // Min is not an absolute; just a friction making us wait longer on server checkin.
774    int minToStart = getMinToStart();
775    int maxToStart = this.master.getConfiguration().
776        getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
777    if (maxToStart < minToStart) {
778      LOG.warn(String.format("The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
779          WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart,
780          WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
781      maxToStart = Integer.MAX_VALUE;
782    }
783
784    long now =  System.currentTimeMillis();
785    final long startTime = now;
786    long slept = 0;
787    long lastLogTime = 0;
788    long lastCountChange = startTime;
789    int count = countOfRegionServers();
790    int oldCount = 0;
791    // This while test is a little hard to read. We try to comment it in below but in essence:
792    // Wait if Master is not stopped and the number of regionservers that have checked-in is
793    // less than the maxToStart. Both of these conditions will be true near universally.
794    // Next, we will keep cycling if ANY of the following three conditions are true:
795    // 1. The time since a regionserver registered is < interval (means servers are actively checking in).
796    // 2. We are under the total timeout.
797    // 3. The count of servers is < minimum.
798    for (ServerListener listener: this.listeners) {
799      listener.waiting();
800    }
801    while (!this.master.isStopped() && !isClusterShutdown() && count < maxToStart &&
802        ((lastCountChange + interval) > now || timeout > slept || count < minToStart)) {
803      // Log some info at every interval time or if there is a change
804      if (oldCount != count || lastLogTime + interval < now) {
805        lastLogTime = now;
806        String msg =
807            "Waiting on regionserver count=" + count + "; waited="+
808                slept + "ms, expecting min=" + minToStart + " server(s), max="+ getStrForMax(maxToStart) +
809                " server(s), " + "timeout=" + timeout + "ms, lastChange=" + (lastCountChange - now) + "ms";
810        LOG.info(msg);
811        status.setStatus(msg);
812      }
813
814      // We sleep for some time
815      final long sleepTime = 50;
816      Thread.sleep(sleepTime);
817      now =  System.currentTimeMillis();
818      slept = now - startTime;
819
820      oldCount = count;
821      count = countOfRegionServers();
822      if (count != oldCount) {
823        lastCountChange = now;
824      }
825    }
826    // Did we exit the loop because cluster is going down?
827    if (isClusterShutdown()) {
828      this.master.stop("Cluster shutdown");
829    }
830    LOG.info("Finished waiting on RegionServer count=" + count + "; waited=" + slept + "ms," +
831        " expected min=" + minToStart + " server(s), max=" +  getStrForMax(maxToStart) + " server(s),"+
832        " master is "+ (this.master.isStopped() ? "stopped.": "running"));
833  }
834
835  private String getStrForMax(final int max) {
836    return max == Integer.MAX_VALUE? "NO_LIMIT": Integer.toString(max);
837  }
838
839  /**
840   * @return A copy of the internal list of online servers.
841   */
842  public List<ServerName> getOnlineServersList() {
843    // TODO: optimize the load balancer call so we don't need to make a new list
844    // TODO: FIX. THIS IS POPULAR CALL.
845    return new ArrayList<>(this.onlineServers.keySet());
846  }
847
848  /**
849   * @param keys The target server name
850   * @param idleServerPredicator Evaluates the server on the given load
851   * @return A copy of the internal list of online servers matched by the predicator
852   */
853  public List<ServerName> getOnlineServersListWithPredicator(List<ServerName> keys,
854    Predicate<ServerMetrics> idleServerPredicator) {
855    List<ServerName> names = new ArrayList<>();
856    if (keys != null && idleServerPredicator != null) {
857      keys.forEach(name -> {
858        ServerMetrics load = onlineServers.get(name);
859        if (load != null) {
860          if (idleServerPredicator.test(load)) {
861            names.add(name);
862          }
863        }
864      });
865    }
866    return names;
867  }
868
869  /**
870   * @return A copy of the internal list of draining servers.
871   */
872  public List<ServerName> getDrainingServersList() {
873    return new ArrayList<>(this.drainingServers);
874  }
875
876  public boolean isServerOnline(ServerName serverName) {
877    return serverName != null && onlineServers.containsKey(serverName);
878  }
879
880  public enum ServerLiveState {
881    LIVE,
882    DEAD,
883    UNKNOWN
884  }
885
886  /**
887   * @return whether the server is online, dead, or unknown.
888   */
889  public synchronized ServerLiveState isServerKnownAndOnline(ServerName serverName) {
890    return onlineServers.containsKey(serverName) ? ServerLiveState.LIVE
891      : (deadservers.isDeadServer(serverName) ? ServerLiveState.DEAD : ServerLiveState.UNKNOWN);
892  }
893
894  /**
895   * Check if a server is known to be dead.  A server can be online,
896   * or known to be dead, or unknown to this manager (i.e, not online,
897   * not known to be dead either; it is simply not tracked by the
898   * master any more, for example, a very old previous instance).
899   */
900  public synchronized boolean isServerDead(ServerName serverName) {
901    return serverName == null || deadservers.isDeadServer(serverName);
902  }
903
904  public void shutdownCluster() {
905    String statusStr = "Cluster shutdown requested of master=" + this.master.getServerName();
906    LOG.info(statusStr);
907    this.clusterShutdown.set(true);
908    if (onlineServers.isEmpty()) {
909      // we do not synchronize here so this may cause a double stop, but not a big deal
910      master.stop("OnlineServer=0 right after cluster shutdown set");
911    }
912  }
913
914  public boolean isClusterShutdown() {
915    return this.clusterShutdown.get();
916  }
917
918  /**
919   * Stop the ServerManager.
920   */
921  public void stop() {
922    // Nothing to do.
923  }
924
925  /**
926   * Creates a list of possible destinations for a region. It contains the online servers, but not
927   *  the draining or dying servers.
928   *  @param serversToExclude can be null if there is no server to exclude
929   */
930  public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude){
931    final List<ServerName> destServers = getOnlineServersList();
932
933    if (serversToExclude != null) {
934      destServers.removeAll(serversToExclude);
935    }
936
937    // Loop through the draining server list and remove them from the server list
938    final List<ServerName> drainingServersCopy = getDrainingServersList();
939    destServers.removeAll(drainingServersCopy);
940
941    return destServers;
942  }
943
944  /**
945   * Calls {@link #createDestinationServersList} without server to exclude.
946   */
947  public List<ServerName> createDestinationServersList(){
948    return createDestinationServersList(null);
949  }
950
951  /**
952   * To clear any dead server with same host name and port of any online server
953   */
954  void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
955    for (ServerName serverName : getOnlineServersList()) {
956      deadservers.cleanAllPreviousInstances(serverName);
957    }
958  }
959
960  /**
961   * Called by delete table and similar to notify the ServerManager that a region was removed.
962   */
963  public void removeRegion(final RegionInfo regionInfo) {
964    final byte[] encodedName = regionInfo.getEncodedNameAsBytes();
965    storeFlushedSequenceIdsByRegion.remove(encodedName);
966    flushedSequenceIdByRegion.remove(encodedName);
967  }
968
969  @VisibleForTesting
970  public boolean isRegionInServerManagerStates(final RegionInfo hri) {
971    final byte[] encodedName = hri.getEncodedNameAsBytes();
972    return (storeFlushedSequenceIdsByRegion.containsKey(encodedName)
973        || flushedSequenceIdByRegion.containsKey(encodedName));
974  }
975
976  /**
977   * Called by delete table and similar to notify the ServerManager that a region was removed.
978   */
979  public void removeRegions(final List<RegionInfo> regions) {
980    for (RegionInfo hri: regions) {
981      removeRegion(hri);
982    }
983  }
984
985  /**
986   * May return 0 when server is not online.
987   */
988  public int getVersionNumber(ServerName serverName) {
989    ServerMetrics serverMetrics = onlineServers.get(serverName);
990    return serverMetrics != null ? serverMetrics.getVersionNumber() : 0;
991  }
992
993  /**
994   * May return "0.0.0" when server is not online
995   */
996  public String getVersion(ServerName serverName) {
997    ServerMetrics serverMetrics = onlineServers.get(serverName);
998    return serverMetrics != null ? serverMetrics.getVersion() : "0.0.0";
999  }
1000
1001  public int getInfoPort(ServerName serverName) {
1002    ServerMetrics serverMetrics = onlineServers.get(serverName);
1003    return serverMetrics != null ? serverMetrics.getInfoServerPort() : 0;
1004  }
1005}