001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master;
020
021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
022
023import java.io.IOException;
024import java.net.InetAddress;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031import java.util.Set;
032import java.util.concurrent.ConcurrentNavigableMap;
033import java.util.concurrent.ConcurrentSkipListMap;
034import java.util.concurrent.CopyOnWriteArrayList;
035import java.util.concurrent.atomic.AtomicBoolean;
036import java.util.function.Predicate;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.ClockOutOfSyncException;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.NotServingRegionException;
041import org.apache.hadoop.hbase.RegionMetrics;
042import org.apache.hadoop.hbase.ServerMetrics;
043import org.apache.hadoop.hbase.ServerMetricsBuilder;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.YouAreDeadException;
046import org.apache.hadoop.hbase.client.ClusterConnection;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.RetriesExhaustedException;
049import org.apache.hadoop.hbase.ipc.HBaseRpcController;
050import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
051import org.apache.hadoop.hbase.monitoring.MonitoredTask;
052import org.apache.hadoop.hbase.procedure2.Procedure;
053import org.apache.hadoop.hbase.regionserver.HRegionServer;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.zookeeper.ZKUtil;
056import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
057import org.apache.yetus.audience.InterfaceAudience;
058import org.apache.zookeeper.KeeperException;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
063import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
064
065import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
070
071/**
072 * The ServerManager class manages info about region servers.
073 * <p>
074 * Maintains lists of online and dead servers.  Processes the startups,
075 * shutdowns, and deaths of region servers.
076 * <p>
077 * Servers are distinguished in two different ways.  A given server has a
078 * location, specified by hostname and port, and of which there can only be one
079 * online at any given time.  A server instance is specified by the location
080 * (hostname and port) as well as the startcode (timestamp from when the server
081 * was started).  This is used to differentiate a restarted instance of a given
082 * server from the original instance.
083 * <p>
084 * If a sever is known not to be running any more, it is called dead. The dead
085 * server needs to be handled by a ServerShutdownHandler.  If the handler is not
086 * enabled yet, the server can't be handled right away so it is queued up.
087 * After the handler is enabled, the server will be submitted to a handler to handle.
088 * However, the handler may be just partially enabled.  If so,
089 * the server cannot be fully processed, and be queued up for further processing.
090 * A server is fully processed only after the handler is fully enabled
091 * and has completed the handling.
092 */
093@InterfaceAudience.Private
094public class ServerManager {
095  public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
096      "hbase.master.wait.on.regionservers.maxtostart";
097
098  public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
099      "hbase.master.wait.on.regionservers.mintostart";
100
101  public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
102      "hbase.master.wait.on.regionservers.timeout";
103
104  public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
105      "hbase.master.wait.on.regionservers.interval";
106
107  private static final Logger LOG = LoggerFactory.getLogger(ServerManager.class);
108
109  // Set if we are to shutdown the cluster.
110  private AtomicBoolean clusterShutdown = new AtomicBoolean(false);
111
112  /**
113   * The last flushed sequence id for a region.
114   */
115  private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
116    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
117
118  /**
119   * The last flushed sequence id for a store in a region.
120   */
121  private final ConcurrentNavigableMap<byte[], ConcurrentNavigableMap<byte[], Long>>
122    storeFlushedSequenceIdsByRegion = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
123
124  /** Map of registered servers to their current load */
125  private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
126    new ConcurrentSkipListMap<>();
127
128  /**
129   * Map of admin interfaces per registered regionserver; these interfaces we use to control
130   * regionservers out on the cluster
131   */
132  private final Map<ServerName, AdminService.BlockingInterface> rsAdmins = new HashMap<>();
133
134  /** List of region servers that should not get any more new regions. */
135  private final ArrayList<ServerName> drainingServers = new ArrayList<>();
136
137  private final MasterServices master;
138  private final ClusterConnection connection;
139
140  private final DeadServer deadservers = new DeadServer();
141
142  private final long maxSkew;
143  private final long warningSkew;
144
145  private final RpcControllerFactory rpcControllerFactory;
146
147  /** Listeners that are called on server events. */
148  private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
149
150  /**
151   * Constructor.
152   */
153  public ServerManager(final MasterServices master) {
154    this.master = master;
155    Configuration c = master.getConfiguration();
156    maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
157    warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
158    this.connection = master.getClusterConnection();
159    this.rpcControllerFactory = this.connection == null? null: connection.getRpcControllerFactory();
160  }
161
162  /**
163   * Add the listener to the notification list.
164   * @param listener The ServerListener to register
165   */
166  public void registerListener(final ServerListener listener) {
167    this.listeners.add(listener);
168  }
169
170  /**
171   * Remove the listener from the notification list.
172   * @param listener The ServerListener to unregister
173   */
174  public boolean unregisterListener(final ServerListener listener) {
175    return this.listeners.remove(listener);
176  }
177
178  /**
179   * Let the server manager know a new regionserver has come online
180   * @param request the startup request
181   * @param versionNumber the version number of the new regionserver
182   * @param version the version of the new regionserver, could contain strings like "SNAPSHOT"
183   * @param ia the InetAddress from which request is received
184   * @return The ServerName we know this server as.
185   * @throws IOException
186   */
187  ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber,
188      String version, InetAddress ia) throws IOException {
189    // Test for case where we get a region startup message from a regionserver
190    // that has been quickly restarted but whose znode expiration handler has
191    // not yet run, or from a server whose fail we are currently processing.
192    // Test its host+port combo is present in serverAddressToServerInfo. If it
193    // is, reject the server and trigger its expiration. The next time it comes
194    // in, it should have been removed from serverAddressToServerInfo and queued
195    // for processing by ProcessServerShutdown.
196
197    final String hostname =
198      request.hasUseThisHostnameInstead() ? request.getUseThisHostnameInstead() : ia.getHostName();
199    ServerName sn = ServerName.valueOf(hostname, request.getPort(), request.getServerStartCode());
200    checkClockSkew(sn, request.getServerCurrentTime());
201    checkIsDead(sn, "STARTUP");
202    if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber, version))) {
203      LOG.warn(
204        "THIS SHOULD NOT HAPPEN, RegionServerStartup" + " could not record the server: " + sn);
205    }
206    return sn;
207  }
208
209  /**
210   * Updates last flushed sequence Ids for the regions on server sn
211   * @param sn
212   * @param hsl
213   */
214  private void updateLastFlushedSequenceIds(ServerName sn, ServerMetrics hsl) {
215    for (Entry<byte[], RegionMetrics> entry : hsl.getRegionMetrics().entrySet()) {
216      byte[] encodedRegionName = Bytes.toBytes(RegionInfo.encodeRegionName(entry.getKey()));
217      Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
218      long l = entry.getValue().getCompletedSequenceId();
219      // Don't let smaller sequence ids override greater sequence ids.
220      if (LOG.isTraceEnabled()) {
221        LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue +
222          ", completeSequenceId=" + l);
223      }
224      if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
225        flushedSequenceIdByRegion.put(encodedRegionName, l);
226      } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
227        LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id ("
228            + l + ") that is less than the previous last flushed sequence id ("
229            + existingValue + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
230      }
231      ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
232          computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
233            () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
234      for (Entry<byte[], Long> storeSeqId : entry.getValue().getStoreSequenceId().entrySet()) {
235        byte[] family = storeSeqId.getKey();
236        existingValue = storeFlushedSequenceId.get(family);
237        l = storeSeqId.getValue();
238        if (LOG.isTraceEnabled()) {
239          LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family) +
240            ", existingValue=" + existingValue + ", completeSequenceId=" + l);
241        }
242        // Don't let smaller sequence ids override greater sequence ids.
243        if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
244          storeFlushedSequenceId.put(family, l);
245        }
246      }
247    }
248  }
249
250  @VisibleForTesting
251  public void regionServerReport(ServerName sn,
252    ServerMetrics sl) throws YouAreDeadException {
253    checkIsDead(sn, "REPORT");
254    if (null == this.onlineServers.replace(sn, sl)) {
255      // Already have this host+port combo and its just different start code?
256      // Just let the server in. Presume master joining a running cluster.
257      // recordNewServer is what happens at the end of reportServerStartup.
258      // The only thing we are skipping is passing back to the regionserver
259      // the ServerName to use. Here we presume a master has already done
260      // that so we'll press on with whatever it gave us for ServerName.
261      if (!checkAndRecordNewServer(sn, sl)) {
262        LOG.info("RegionServerReport ignored, could not record the server: " + sn);
263        return; // Not recorded, so no need to move on
264      }
265    }
266    updateLastFlushedSequenceIds(sn, sl);
267  }
268
269  /**
270   * Check is a server of same host and port already exists,
271   * if not, or the existed one got a smaller start code, record it.
272   *
273   * @param serverName the server to check and record
274   * @param sl the server load on the server
275   * @return true if the server is recorded, otherwise, false
276   */
277  boolean checkAndRecordNewServer(final ServerName serverName, final ServerMetrics sl) {
278    ServerName existingServer = null;
279    synchronized (this.onlineServers) {
280      existingServer = findServerWithSameHostnamePortWithLock(serverName);
281      if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
282        LOG.info("Server serverName=" + serverName + " rejected; we already have "
283            + existingServer.toString() + " registered with same hostname and port");
284        return false;
285      }
286      recordNewServerWithLock(serverName, sl);
287    }
288
289    // Tell our listeners that a server was added
290    if (!this.listeners.isEmpty()) {
291      for (ServerListener listener : this.listeners) {
292        listener.serverAdded(serverName);
293      }
294    }
295
296    // Note that we assume that same ts means same server, and don't expire in that case.
297    //  TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky.
298    if (existingServer != null &&
299        (existingServer.getStartcode() < serverName.getStartcode())) {
300      LOG.info("Triggering server recovery; existingServer " +
301          existingServer + " looks stale, new server:" + serverName);
302      expireServer(existingServer);
303    }
304    return true;
305  }
306
307  /**
308   * Find out the region servers crashed between the crash of the previous master instance and the
309   * current master instance and schedule SCP for them.
310   * <p/>
311   * Since the {@code RegionServerTracker} has already helped us to construct the online servers set
312   * by scanning zookeeper, now we can compare the online servers with {@code liveServersFromWALDir}
313   * to find out whether there are servers which are already dead.
314   * <p/>
315   * Must be called inside the initialization method of {@code RegionServerTracker} to avoid
316   * concurrency issue.
317   * @param deadServersFromPE the region servers which already have a SCP associated.
318   * @param liveServersFromWALDir the live region servers from wal directory.
319   */
320  void findDeadServersAndProcess(Set<ServerName> deadServersFromPE,
321      Set<ServerName> liveServersFromWALDir) {
322    deadServersFromPE.forEach(deadservers::add);
323    liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
324      .forEach(this::expireServer);
325  }
326
327  /**
328   * Checks if the clock skew between the server and the master. If the clock skew exceeds the
329   * configured max, it will throw an exception; if it exceeds the configured warning threshold,
330   * it will log a warning but start normally.
331   * @param serverName Incoming servers's name
332   * @param serverCurrentTime
333   * @throws ClockOutOfSyncException if the skew exceeds the configured max value
334   */
335  private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
336      throws ClockOutOfSyncException {
337    long skew = Math.abs(System.currentTimeMillis() - serverCurrentTime);
338    if (skew > maxSkew) {
339      String message = "Server " + serverName + " has been " +
340        "rejected; Reported time is too far out of sync with master.  " +
341        "Time difference of " + skew + "ms > max allowed of " + maxSkew + "ms";
342      LOG.warn(message);
343      throw new ClockOutOfSyncException(message);
344    } else if (skew > warningSkew){
345      String message = "Reported time for server " + serverName + " is out of sync with master " +
346        "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " +
347        "error threshold is " + maxSkew + "ms)";
348      LOG.warn(message);
349    }
350  }
351
352  /**
353   * If this server is on the dead list, reject it with a YouAreDeadException.
354   * If it was dead but came back with a new start code, remove the old entry
355   * from the dead list.
356   * @param what START or REPORT
357   */
358  private void checkIsDead(final ServerName serverName, final String what)
359      throws YouAreDeadException {
360    if (this.deadservers.isDeadServer(serverName)) {
361      // host name, port and start code all match with existing one of the
362      // dead servers. So, this server must be dead.
363      String message = "Server " + what + " rejected; currently processing " +
364          serverName + " as dead server";
365      LOG.debug(message);
366      throw new YouAreDeadException(message);
367    }
368    // remove dead server with same hostname and port of newly checking in rs after master
369    // initialization.See HBASE-5916 for more information.
370    if ((this.master == null || this.master.isInitialized())
371        && this.deadservers.cleanPreviousInstance(serverName)) {
372      // This server has now become alive after we marked it as dead.
373      // We removed it's previous entry from the dead list to reflect it.
374      LOG.debug(what + ":" + " Server " + serverName + " came back up," +
375          " removed it from the dead servers list");
376    }
377  }
378
379  /**
380   * Assumes onlineServers is locked.
381   * @return ServerName with matching hostname and port.
382   */
383  private ServerName findServerWithSameHostnamePortWithLock(
384      final ServerName serverName) {
385    ServerName end = ServerName.valueOf(serverName.getHostname(), serverName.getPort(),
386        Long.MAX_VALUE);
387
388    ServerName r = onlineServers.lowerKey(end);
389    if (r != null) {
390      if (ServerName.isSameAddress(r, serverName)) {
391        return r;
392      }
393    }
394    return null;
395  }
396
397  /**
398   * Adds the onlineServers list. onlineServers should be locked.
399   * @param serverName The remote servers name.
400   */
401  @VisibleForTesting
402  void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) {
403    LOG.info("Registering regionserver=" + serverName);
404    this.onlineServers.put(serverName, sl);
405    this.rsAdmins.remove(serverName);
406  }
407
408  public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
409    RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
410    Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
411    builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM);
412    Map<byte[], Long> storeFlushedSequenceId =
413        storeFlushedSequenceIdsByRegion.get(encodedRegionName);
414    if (storeFlushedSequenceId != null) {
415      for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) {
416        builder.addStoreSequenceId(StoreSequenceId.newBuilder()
417            .setFamilyName(UnsafeByteOperations.unsafeWrap(entry.getKey()))
418            .setSequenceId(entry.getValue().longValue()).build());
419      }
420    }
421    return builder.build();
422  }
423
424  /**
425   * @param serverName
426   * @return ServerMetrics if serverName is known else null
427   */
428  public ServerMetrics getLoad(final ServerName serverName) {
429    return this.onlineServers.get(serverName);
430  }
431
432  /**
433   * Compute the average load across all region servers.
434   * Currently, this uses a very naive computation - just uses the number of
435   * regions being served, ignoring stats about number of requests.
436   * @return the average load
437   */
438  public double getAverageLoad() {
439    int totalLoad = 0;
440    int numServers = 0;
441    for (ServerMetrics sl : this.onlineServers.values()) {
442      numServers++;
443      totalLoad += sl.getRegionMetrics().size();
444    }
445    return numServers == 0 ? 0 :
446      (double)totalLoad / (double)numServers;
447  }
448
449  /** @return the count of active regionservers */
450  public int countOfRegionServers() {
451    // Presumes onlineServers is a concurrent map
452    return this.onlineServers.size();
453  }
454
455  /**
456   * @return Read-only map of servers to serverinfo
457   */
458  public Map<ServerName, ServerMetrics> getOnlineServers() {
459    // Presumption is that iterating the returned Map is OK.
460    synchronized (this.onlineServers) {
461      return Collections.unmodifiableMap(this.onlineServers);
462    }
463  }
464
465  public DeadServer getDeadServers() {
466    return this.deadservers;
467  }
468
469  /**
470   * Checks if any dead servers are currently in progress.
471   * @return true if any RS are being processed as dead, false if not
472   */
473  public boolean areDeadServersInProgress() {
474    return this.deadservers.areDeadServersInProgress();
475  }
476
477  void letRegionServersShutdown() {
478    long previousLogTime = 0;
479    ServerName sn = master.getServerName();
480    ZKWatcher zkw = master.getZooKeeper();
481    int onlineServersCt;
482    while ((onlineServersCt = onlineServers.size()) > 0){
483
484      if (System.currentTimeMillis() > (previousLogTime + 1000)) {
485        Set<ServerName> remainingServers = onlineServers.keySet();
486        synchronized (onlineServers) {
487          if (remainingServers.size() == 1 && remainingServers.contains(sn)) {
488            // Master will delete itself later.
489            return;
490          }
491        }
492        StringBuilder sb = new StringBuilder();
493        // It's ok here to not sync on onlineServers - merely logging
494        for (ServerName key : remainingServers) {
495          if (sb.length() > 0) {
496            sb.append(", ");
497          }
498          sb.append(key);
499        }
500        LOG.info("Waiting on regionserver(s) " + sb.toString());
501        previousLogTime = System.currentTimeMillis();
502      }
503
504      try {
505        List<String> servers = getRegionServersInZK(zkw);
506        if (servers == null || servers.isEmpty() || (servers.size() == 1
507            && servers.contains(sn.toString()))) {
508          LOG.info("ZK shows there is only the master self online, exiting now");
509          // Master could have lost some ZK events, no need to wait more.
510          break;
511        }
512      } catch (KeeperException ke) {
513        LOG.warn("Failed to list regionservers", ke);
514        // ZK is malfunctioning, don't hang here
515        break;
516      }
517      synchronized (onlineServers) {
518        try {
519          if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
520        } catch (InterruptedException ignored) {
521          // continue
522        }
523      }
524    }
525  }
526
527  private List<String> getRegionServersInZK(final ZKWatcher zkw)
528  throws KeeperException {
529    return ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
530  }
531
532  /**
533   * Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
534   * @return pid if we queued a ServerCrashProcedure else {@link Procedure#NO_PROC_ID} if we did
535   *         not (could happen for many reasons including the fact that its this server that is
536   *         going down or we already have queued an SCP for this server or SCP processing is
537   *         currently disabled because we are in startup phase).
538   */
539  @VisibleForTesting // Redo test so we can make this protected.
540  public synchronized long expireServer(final ServerName serverName) {
541    return expireServer(serverName, false);
542
543  }
544
545  synchronized long expireServer(final ServerName serverName, boolean force) {
546    // THIS server is going down... can't handle our own expiration.
547    if (serverName.equals(master.getServerName())) {
548      if (!(master.isAborted() || master.isStopped())) {
549        master.stop("We lost our znode?");
550      }
551      return Procedure.NO_PROC_ID;
552    }
553    if (this.deadservers.isDeadServer(serverName)) {
554      LOG.warn("Expiration called on {} but already in DeadServer", serverName);
555      return Procedure.NO_PROC_ID;
556    }
557    moveFromOnlineToDeadServers(serverName);
558
559    // If cluster is going down, yes, servers are going to be expiring; don't
560    // process as a dead server
561    if (isClusterShutdown()) {
562      LOG.info("Cluster shutdown set; " + serverName +
563        " expired; onlineServers=" + this.onlineServers.size());
564      if (this.onlineServers.isEmpty()) {
565        master.stop("Cluster shutdown set; onlineServer=0");
566      }
567      return Procedure.NO_PROC_ID;
568    }
569    LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
570    long pid = master.getAssignmentManager().submitServerCrash(serverName, true, force);
571    // Tell our listeners that a server was removed
572    if (!this.listeners.isEmpty()) {
573      this.listeners.stream().forEach(l -> l.serverRemoved(serverName));
574    }
575    return pid;
576  }
577
578  // Note: this is currently invoked from RPC, not just tests. Locking in this class needs cleanup.
579  @VisibleForTesting
580  public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
581    synchronized (this.onlineServers) {
582      boolean online = this.onlineServers.containsKey(sn);
583      if (online) {
584        // Remove the server from the known servers lists and update load info BUT
585        // add to deadservers first; do this so it'll show in dead servers list if
586        // not in online servers list.
587        this.deadservers.add(sn);
588        this.onlineServers.remove(sn);
589        onlineServers.notifyAll();
590      } else {
591        // If not online, that is odd but may happen if 'Unknown Servers' -- where meta
592        // has references to servers not online nor in dead servers list. If
593        // 'Unknown Server', don't add to DeadServers else will be there for ever.
594        LOG.trace("Expiration of {} but server not online", sn);
595      }
596    }
597    this.rsAdmins.remove(sn);
598  }
599
600  /*
601   * Remove the server from the drain list.
602   */
603  public synchronized boolean removeServerFromDrainList(final ServerName sn) {
604    // Warn if the server (sn) is not online.  ServerName is of the form:
605    // <hostname> , <port> , <startcode>
606
607    if (!this.isServerOnline(sn)) {
608      LOG.warn("Server " + sn + " is not currently online. " +
609               "Removing from draining list anyway, as requested.");
610    }
611    // Remove the server from the draining servers lists.
612    return this.drainingServers.remove(sn);
613  }
614
615  /**
616   * Add the server to the drain list.
617   * @param sn
618   * @return True if the server is added or the server is already on the drain list.
619   */
620  public synchronized boolean addServerToDrainList(final ServerName sn) {
621    // Warn if the server (sn) is not online.  ServerName is of the form:
622    // <hostname> , <port> , <startcode>
623
624    if (!this.isServerOnline(sn)) {
625      LOG.warn("Server " + sn + " is not currently online. " +
626               "Ignoring request to add it to draining list.");
627      return false;
628    }
629    // Add the server to the draining servers lists, if it's not already in
630    // it.
631    if (this.drainingServers.contains(sn)) {
632      LOG.warn("Server " + sn + " is already in the draining server list." +
633               "Ignoring request to add it again.");
634      return true;
635    }
636    LOG.info("Server " + sn + " added to draining server list.");
637    return this.drainingServers.add(sn);
638  }
639
640  // RPC methods to region servers
641
642  private HBaseRpcController newRpcController() {
643    return rpcControllerFactory == null ? null : rpcControllerFactory.newController();
644  }
645
646  /**
647   * Sends a WARMUP RPC to the specified server to warmup the specified region.
648   * <p>
649   * A region server could reject the close request because it either does not
650   * have the specified region or the region is being split.
651   * @param server server to warmup a region
652   * @param region region to  warmup
653   */
654  public void sendRegionWarmup(ServerName server,
655      RegionInfo region) {
656    if (server == null) return;
657    try {
658      AdminService.BlockingInterface admin = getRsAdmin(server);
659      HBaseRpcController controller = newRpcController();
660      ProtobufUtil.warmupRegion(controller, admin, region);
661    } catch (IOException e) {
662      LOG.error("Received exception in RPC for warmup server:" +
663        server + "region: " + region +
664        "exception: " + e);
665    }
666  }
667
668  /**
669   * Contacts a region server and waits up to timeout ms
670   * to close the region.  This bypasses the active hmaster.
671   */
672  public static void closeRegionSilentlyAndWait(ClusterConnection connection,
673    ServerName server, RegionInfo region, long timeout) throws IOException, InterruptedException {
674    AdminService.BlockingInterface rs = connection.getAdmin(server);
675    HBaseRpcController controller = connection.getRpcControllerFactory().newController();
676    try {
677      ProtobufUtil.closeRegion(controller, rs, server, region.getRegionName());
678    } catch (IOException e) {
679      LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e);
680    }
681    long expiration = timeout + System.currentTimeMillis();
682    while (System.currentTimeMillis() < expiration) {
683      controller.reset();
684      try {
685        RegionInfo rsRegion =
686          ProtobufUtil.getRegionInfo(controller, rs, region.getRegionName());
687        if (rsRegion == null) return;
688      } catch (IOException ioe) {
689        if (ioe instanceof NotServingRegionException) // no need to retry again
690          return;
691        LOG.warn("Exception when retrieving regioninfo from: "
692          + region.getRegionNameAsString(), ioe);
693      }
694      Thread.sleep(1000);
695    }
696    throw new IOException("Region " + region + " failed to close within"
697        + " timeout " + timeout);
698  }
699
700  /**
701   * @param sn
702   * @return Admin interface for the remote regionserver named <code>sn</code>
703   * @throws IOException
704   * @throws RetriesExhaustedException wrapping a ConnectException if failed
705   */
706  public AdminService.BlockingInterface getRsAdmin(final ServerName sn)
707  throws IOException {
708    AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
709    if (admin == null) {
710      LOG.debug("New admin connection to " + sn.toString());
711      if (sn.equals(master.getServerName()) && master instanceof HRegionServer) {
712        // A master is also a region server now, see HBASE-10569 for details
713        admin = ((HRegionServer)master).getRSRpcServices();
714      } else {
715        admin = this.connection.getAdmin(sn);
716      }
717      this.rsAdmins.put(sn, admin);
718    }
719    return admin;
720  }
721
722  /**
723   * Calculate min necessary to start. This is not an absolute. It is just
724   * a friction that will cause us hang around a bit longer waiting on
725   * RegionServers to check-in.
726   */
727  private int getMinToStart() {
728    if (master.isInMaintenanceMode()) {
729      // If in maintenance mode, then master hosting meta will be the only server available
730      return 1;
731    }
732
733    int minimumRequired = 1;
734    if (LoadBalancer.isTablesOnMaster(master.getConfiguration()) &&
735        LoadBalancer.isSystemTablesOnlyOnMaster(master.getConfiguration())) {
736      // If Master is carrying regions it will show up as a 'server', but is not handling user-
737      // space regions, so we need a second server.
738      minimumRequired = 2;
739    }
740
741    int minToStart = this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
742    // Ensure we are never less than minimumRequired else stuff won't work.
743    return Math.max(minToStart, minimumRequired);
744  }
745
746  /**
747   * Wait for the region servers to report in.
748   * We will wait until one of this condition is met:
749   *  - the master is stopped
750   *  - the 'hbase.master.wait.on.regionservers.maxtostart' number of
751   *    region servers is reached
752   *  - the 'hbase.master.wait.on.regionservers.mintostart' is reached AND
753   *   there have been no new region server in for
754   *      'hbase.master.wait.on.regionservers.interval' time AND
755   *   the 'hbase.master.wait.on.regionservers.timeout' is reached
756   *
757   * @throws InterruptedException
758   */
759  public void waitForRegionServers(MonitoredTask status) throws InterruptedException {
760    final long interval = this.master.getConfiguration().
761        getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
762    final long timeout = this.master.getConfiguration().
763        getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
764    // Min is not an absolute; just a friction making us wait longer on server checkin.
765    int minToStart = getMinToStart();
766    int maxToStart = this.master.getConfiguration().
767        getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
768    if (maxToStart < minToStart) {
769      LOG.warn(String.format("The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
770          WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart,
771          WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
772      maxToStart = Integer.MAX_VALUE;
773    }
774
775    long now =  System.currentTimeMillis();
776    final long startTime = now;
777    long slept = 0;
778    long lastLogTime = 0;
779    long lastCountChange = startTime;
780    int count = countOfRegionServers();
781    int oldCount = 0;
782    // This while test is a little hard to read. We try to comment it in below but in essence:
783    // Wait if Master is not stopped and the number of regionservers that have checked-in is
784    // less than the maxToStart. Both of these conditions will be true near universally.
785    // Next, we will keep cycling if ANY of the following three conditions are true:
786    // 1. The time since a regionserver registered is < interval (means servers are actively checking in).
787    // 2. We are under the total timeout.
788    // 3. The count of servers is < minimum.
789    for (ServerListener listener: this.listeners) {
790      listener.waiting();
791    }
792    while (!this.master.isStopped() && !isClusterShutdown() && count < maxToStart &&
793        ((lastCountChange + interval) > now || timeout > slept || count < minToStart)) {
794      // Log some info at every interval time or if there is a change
795      if (oldCount != count || lastLogTime + interval < now) {
796        lastLogTime = now;
797        String msg =
798            "Waiting on regionserver count=" + count + "; waited="+
799                slept + "ms, expecting min=" + minToStart + " server(s), max="+ getStrForMax(maxToStart) +
800                " server(s), " + "timeout=" + timeout + "ms, lastChange=" + (lastCountChange - now) + "ms";
801        LOG.info(msg);
802        status.setStatus(msg);
803      }
804
805      // We sleep for some time
806      final long sleepTime = 50;
807      Thread.sleep(sleepTime);
808      now =  System.currentTimeMillis();
809      slept = now - startTime;
810
811      oldCount = count;
812      count = countOfRegionServers();
813      if (count != oldCount) {
814        lastCountChange = now;
815      }
816    }
817    // Did we exit the loop because cluster is going down?
818    if (isClusterShutdown()) {
819      this.master.stop("Cluster shutdown");
820    }
821    LOG.info("Finished waiting on RegionServer count=" + count + "; waited=" + slept + "ms," +
822        " expected min=" + minToStart + " server(s), max=" +  getStrForMax(maxToStart) + " server(s),"+
823        " master is "+ (this.master.isStopped() ? "stopped.": "running"));
824  }
825
826  private String getStrForMax(final int max) {
827    return max == Integer.MAX_VALUE? "NO_LIMIT": Integer.toString(max);
828  }
829
830  /**
831   * @return A copy of the internal list of online servers.
832   */
833  public List<ServerName> getOnlineServersList() {
834    // TODO: optimize the load balancer call so we don't need to make a new list
835    // TODO: FIX. THIS IS POPULAR CALL.
836    return new ArrayList<>(this.onlineServers.keySet());
837  }
838
839  /**
840   * @param keys The target server name
841   * @param idleServerPredicator Evaluates the server on the given load
842   * @return A copy of the internal list of online servers matched by the predicator
843   */
844  public List<ServerName> getOnlineServersListWithPredicator(List<ServerName> keys,
845    Predicate<ServerMetrics> idleServerPredicator) {
846    List<ServerName> names = new ArrayList<>();
847    if (keys != null && idleServerPredicator != null) {
848      keys.forEach(name -> {
849        ServerMetrics load = onlineServers.get(name);
850        if (load != null) {
851          if (idleServerPredicator.test(load)) {
852            names.add(name);
853          }
854        }
855      });
856    }
857    return names;
858  }
859
860  /**
861   * @return A copy of the internal list of draining servers.
862   */
863  public List<ServerName> getDrainingServersList() {
864    return new ArrayList<>(this.drainingServers);
865  }
866
867  public boolean isServerOnline(ServerName serverName) {
868    return serverName != null && onlineServers.containsKey(serverName);
869  }
870
871  public enum ServerLiveState {
872    LIVE,
873    DEAD,
874    UNKNOWN
875  }
876
877  /**
878   * @return whether the server is online, dead, or unknown.
879   */
880  public synchronized ServerLiveState isServerKnownAndOnline(ServerName serverName) {
881    return onlineServers.containsKey(serverName) ? ServerLiveState.LIVE
882      : (deadservers.isDeadServer(serverName) ? ServerLiveState.DEAD : ServerLiveState.UNKNOWN);
883  }
884
885  /**
886   * Check if a server is known to be dead.  A server can be online,
887   * or known to be dead, or unknown to this manager (i.e, not online,
888   * not known to be dead either. it is simply not tracked by the
889   * master any more, for example, a very old previous instance).
890   */
891  public synchronized boolean isServerDead(ServerName serverName) {
892    return serverName == null || deadservers.isDeadServer(serverName);
893  }
894
895  public void shutdownCluster() {
896    String statusStr = "Cluster shutdown requested of master=" + this.master.getServerName();
897    LOG.info(statusStr);
898    this.clusterShutdown.set(true);
899    if (onlineServers.isEmpty()) {
900      // we do not synchronize here so this may cause a double stop, but not a big deal
901      master.stop("OnlineServer=0 right after cluster shutdown set");
902    }
903  }
904
905  public boolean isClusterShutdown() {
906    return this.clusterShutdown.get();
907  }
908
909  /**
910   * Stop the ServerManager.
911   */
912  public void stop() {
913    // Nothing to do.
914  }
915
916  /**
917   * Creates a list of possible destinations for a region. It contains the online servers, but not
918   *  the draining or dying servers.
919   *  @param serversToExclude can be null if there is no server to exclude
920   */
921  public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude){
922    final List<ServerName> destServers = getOnlineServersList();
923
924    if (serversToExclude != null) {
925      destServers.removeAll(serversToExclude);
926    }
927
928    // Loop through the draining server list and remove them from the server list
929    final List<ServerName> drainingServersCopy = getDrainingServersList();
930    destServers.removeAll(drainingServersCopy);
931
932    return destServers;
933  }
934
935  /**
936   * Calls {@link #createDestinationServersList} without server to exclude.
937   */
938  public List<ServerName> createDestinationServersList(){
939    return createDestinationServersList(null);
940  }
941
942  /**
943   * To clear any dead server with same host name and port of any online server
944   */
945  void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
946    for (ServerName serverName : getOnlineServersList()) {
947      deadservers.cleanAllPreviousInstances(serverName);
948    }
949  }
950
951  /**
952   * Called by delete table and similar to notify the ServerManager that a region was removed.
953   */
954  public void removeRegion(final RegionInfo regionInfo) {
955    final byte[] encodedName = regionInfo.getEncodedNameAsBytes();
956    storeFlushedSequenceIdsByRegion.remove(encodedName);
957    flushedSequenceIdByRegion.remove(encodedName);
958  }
959
960  @VisibleForTesting
961  public boolean isRegionInServerManagerStates(final RegionInfo hri) {
962    final byte[] encodedName = hri.getEncodedNameAsBytes();
963    return (storeFlushedSequenceIdsByRegion.containsKey(encodedName)
964        || flushedSequenceIdByRegion.containsKey(encodedName));
965  }
966
967  /**
968   * Called by delete table and similar to notify the ServerManager that a region was removed.
969   */
970  public void removeRegions(final List<RegionInfo> regions) {
971    for (RegionInfo hri: regions) {
972      removeRegion(hri);
973    }
974  }
975
976  /**
977   * May return 0 when server is not online.
978   */
979  public int getVersionNumber(ServerName serverName) {
980    ServerMetrics serverMetrics = onlineServers.get(serverName);
981    return serverMetrics != null ? serverMetrics.getVersionNumber() : 0;
982  }
983
984  /**
985   * May return "0.0.0" when server is not online
986   */
987  public String getVersion(ServerName serverName) {
988    ServerMetrics serverMetrics = onlineServers.get(serverName);
989    return serverMetrics != null ? serverMetrics.getVersion() : "0.0.0";
990  }
991
992  public int getInfoPort(ServerName serverName) {
993    ServerMetrics serverMetrics = onlineServers.get(serverName);
994    return serverMetrics != null ? serverMetrics.getInfoServerPort() : 0;
995  }
996}