001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master;
020
021import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
022
023import java.io.IOException;
024import java.net.InetAddress;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031import java.util.Set;
032import java.util.concurrent.ConcurrentNavigableMap;
033import java.util.concurrent.ConcurrentSkipListMap;
034import java.util.concurrent.CopyOnWriteArrayList;
035import java.util.concurrent.atomic.AtomicBoolean;
036import java.util.function.Predicate;
037
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.ClockOutOfSyncException;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.NotServingRegionException;
042import org.apache.hadoop.hbase.RegionMetrics;
043import org.apache.hadoop.hbase.ServerMetrics;
044import org.apache.hadoop.hbase.ServerMetricsBuilder;
045import org.apache.hadoop.hbase.ServerName;
046import org.apache.hadoop.hbase.YouAreDeadException;
047import org.apache.hadoop.hbase.client.ClusterConnection;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.client.RetriesExhaustedException;
050import org.apache.hadoop.hbase.ipc.HBaseRpcController;
051import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
052import org.apache.hadoop.hbase.monitoring.MonitoredTask;
053import org.apache.hadoop.hbase.regionserver.HRegionServer;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.zookeeper.ZKUtil;
056import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
057import org.apache.yetus.audience.InterfaceAudience;
058import org.apache.zookeeper.KeeperException;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
063import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
064
065import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
070
071/**
072 * The ServerManager class manages info about region servers.
073 * <p>
074 * Maintains lists of online and dead servers.  Processes the startups,
075 * shutdowns, and deaths of region servers.
076 * <p>
077 * Servers are distinguished in two different ways.  A given server has a
078 * location, specified by hostname and port, and of which there can only be one
079 * online at any given time.  A server instance is specified by the location
080 * (hostname and port) as well as the startcode (timestamp from when the server
081 * was started).  This is used to differentiate a restarted instance of a given
082 * server from the original instance.
083 * <p>
084 * If a sever is known not to be running any more, it is called dead. The dead
085 * server needs to be handled by a ServerShutdownHandler.  If the handler is not
086 * enabled yet, the server can't be handled right away so it is queued up.
087 * After the handler is enabled, the server will be submitted to a handler to handle.
088 * However, the handler may be just partially enabled.  If so,
089 * the server cannot be fully processed, and be queued up for further processing.
090 * A server is fully processed only after the handler is fully enabled
091 * and has completed the handling.
092 */
093@InterfaceAudience.Private
094public class ServerManager {
095  public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
096      "hbase.master.wait.on.regionservers.maxtostart";
097
098  public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
099      "hbase.master.wait.on.regionservers.mintostart";
100
101  public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
102      "hbase.master.wait.on.regionservers.timeout";
103
104  public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
105      "hbase.master.wait.on.regionservers.interval";
106
107  private static final Logger LOG = LoggerFactory.getLogger(ServerManager.class);
108
109  // Set if we are to shutdown the cluster.
110  private AtomicBoolean clusterShutdown = new AtomicBoolean(false);
111
112  /**
113   * The last flushed sequence id for a region.
114   */
115  private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
116    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
117
118  /**
119   * The last flushed sequence id for a store in a region.
120   */
121  private final ConcurrentNavigableMap<byte[], ConcurrentNavigableMap<byte[], Long>>
122    storeFlushedSequenceIdsByRegion = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
123
124  /** Map of registered servers to their current load */
125  private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
126    new ConcurrentSkipListMap<>();
127
128  /**
129   * Map of admin interfaces per registered regionserver; these interfaces we use to control
130   * regionservers out on the cluster
131   */
132  private final Map<ServerName, AdminService.BlockingInterface> rsAdmins = new HashMap<>();
133
134  /** List of region servers that should not get any more new regions. */
135  private final ArrayList<ServerName> drainingServers = new ArrayList<>();
136
137  private final MasterServices master;
138  private final ClusterConnection connection;
139
140  private final DeadServer deadservers = new DeadServer();
141
142  private final long maxSkew;
143  private final long warningSkew;
144
145  private final RpcControllerFactory rpcControllerFactory;
146
147  /** Listeners that are called on server events. */
148  private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
149
150  /**
151   * Constructor.
152   */
153  public ServerManager(final MasterServices master) {
154    this.master = master;
155    Configuration c = master.getConfiguration();
156    maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
157    warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
158    this.connection = master.getClusterConnection();
159    this.rpcControllerFactory = this.connection == null? null: connection.getRpcControllerFactory();
160  }
161
162  /**
163   * Add the listener to the notification list.
164   * @param listener The ServerListener to register
165   */
166  public void registerListener(final ServerListener listener) {
167    this.listeners.add(listener);
168  }
169
170  /**
171   * Remove the listener from the notification list.
172   * @param listener The ServerListener to unregister
173   */
174  public boolean unregisterListener(final ServerListener listener) {
175    return this.listeners.remove(listener);
176  }
177
178  /**
179   * Let the server manager know a new regionserver has come online
180   * @param request the startup request
181   * @param versionNumber the version of the new regionserver
182   * @param ia the InetAddress from which request is received
183   * @return The ServerName we know this server as.
184   * @throws IOException
185   */
186  ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber,
187      InetAddress ia) throws IOException {
188    // Test for case where we get a region startup message from a regionserver
189    // that has been quickly restarted but whose znode expiration handler has
190    // not yet run, or from a server whose fail we are currently processing.
191    // Test its host+port combo is present in serverAddressToServerInfo.  If it
192    // is, reject the server and trigger its expiration. The next time it comes
193    // in, it should have been removed from serverAddressToServerInfo and queued
194    // for processing by ProcessServerShutdown.
195
196    final String hostname = request.hasUseThisHostnameInstead() ?
197        request.getUseThisHostnameInstead() :ia.getHostName();
198    ServerName sn = ServerName.valueOf(hostname, request.getPort(),
199      request.getServerStartCode());
200    checkClockSkew(sn, request.getServerCurrentTime());
201    checkIsDead(sn, "STARTUP");
202    if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber))) {
203      LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup"
204        + " could not record the server: " + sn);
205    }
206    return sn;
207  }
208
209  /**
210   * Updates last flushed sequence Ids for the regions on server sn
211   * @param sn
212   * @param hsl
213   */
214  private void updateLastFlushedSequenceIds(ServerName sn, ServerMetrics hsl) {
215    for (Entry<byte[], RegionMetrics> entry : hsl.getRegionMetrics().entrySet()) {
216      byte[] encodedRegionName = Bytes.toBytes(RegionInfo.encodeRegionName(entry.getKey()));
217      Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
218      long l = entry.getValue().getCompletedSequenceId();
219      // Don't let smaller sequence ids override greater sequence ids.
220      if (LOG.isTraceEnabled()) {
221        LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue +
222          ", completeSequenceId=" + l);
223      }
224      if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
225        flushedSequenceIdByRegion.put(encodedRegionName, l);
226      } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
227        LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id ("
228            + l + ") that is less than the previous last flushed sequence id ("
229            + existingValue + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
230      }
231      ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
232          computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
233            () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
234      for (Entry<byte[], Long> storeSeqId : entry.getValue().getStoreSequenceId().entrySet()) {
235        byte[] family = storeSeqId.getKey();
236        existingValue = storeFlushedSequenceId.get(family);
237        l = storeSeqId.getValue();
238        if (LOG.isTraceEnabled()) {
239          LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family) +
240            ", existingValue=" + existingValue + ", completeSequenceId=" + l);
241        }
242        // Don't let smaller sequence ids override greater sequence ids.
243        if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
244          storeFlushedSequenceId.put(family, l);
245        }
246      }
247    }
248  }
249
250  @VisibleForTesting
251  public void regionServerReport(ServerName sn, ServerMetrics sl) throws YouAreDeadException {
252    checkIsDead(sn, "REPORT");
253    if (null == this.onlineServers.replace(sn, sl)) {
254      // Already have this host+port combo and its just different start code?
255      // Just let the server in. Presume master joining a running cluster.
256      // recordNewServer is what happens at the end of reportServerStartup.
257      // The only thing we are skipping is passing back to the regionserver
258      // the ServerName to use. Here we presume a master has already done
259      // that so we'll press on with whatever it gave us for ServerName.
260      if (!checkAndRecordNewServer(sn, sl)) {
261        LOG.info("RegionServerReport ignored, could not record the server: " + sn);
262        return; // Not recorded, so no need to move on
263      }
264    }
265    updateLastFlushedSequenceIds(sn, sl);
266  }
267
268  /**
269   * Check is a server of same host and port already exists,
270   * if not, or the existed one got a smaller start code, record it.
271   *
272   * @param serverName the server to check and record
273   * @param sl the server load on the server
274   * @return true if the server is recorded, otherwise, false
275   */
276  boolean checkAndRecordNewServer(final ServerName serverName, final ServerMetrics sl) {
277    ServerName existingServer = null;
278    synchronized (this.onlineServers) {
279      existingServer = findServerWithSameHostnamePortWithLock(serverName);
280      if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
281        LOG.info("Server serverName=" + serverName + " rejected; we already have "
282            + existingServer.toString() + " registered with same hostname and port");
283        return false;
284      }
285      recordNewServerWithLock(serverName, sl);
286    }
287
288    // Tell our listeners that a server was added
289    if (!this.listeners.isEmpty()) {
290      for (ServerListener listener : this.listeners) {
291        listener.serverAdded(serverName);
292      }
293    }
294
295    // Note that we assume that same ts means same server, and don't expire in that case.
296    //  TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky.
297    if (existingServer != null &&
298        (existingServer.getStartcode() < serverName.getStartcode())) {
299      LOG.info("Triggering server recovery; existingServer " +
300          existingServer + " looks stale, new server:" + serverName);
301      expireServer(existingServer);
302    }
303    return true;
304  }
305
306  /**
307   * Find out the region servers crashed between the crash of the previous master instance and the
308   * current master instance and schedule SCP for them.
309   * <p/>
310   * Since the {@code RegionServerTracker} has already helped us to construct the online servers set
311   * by scanning zookeeper, now we can compare the online servers with {@code liveServersFromWALDir}
312   * to find out whether there are servers which are already dead.
313   * <p/>
314   * Must be called inside the initialization method of {@code RegionServerTracker} to avoid
315   * concurrency issue.
316   * @param deadServersFromPE the region servers which already have SCP associated.
317   * @param liveServersFromWALDir the live region servers from wal directory.
318   */
319  void findOutDeadServersAndProcess(Set<ServerName> deadServersFromPE,
320      Set<ServerName> liveServersFromWALDir) {
321    deadServersFromPE.forEach(deadservers::add);
322    liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
323      .forEach(this::expireServer);
324  }
325
326  /**
327   * Checks if the clock skew between the server and the master. If the clock skew exceeds the
328   * configured max, it will throw an exception; if it exceeds the configured warning threshold,
329   * it will log a warning but start normally.
330   * @param serverName Incoming servers's name
331   * @param serverCurrentTime
332   * @throws ClockOutOfSyncException if the skew exceeds the configured max value
333   */
334  private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
335      throws ClockOutOfSyncException {
336    long skew = Math.abs(System.currentTimeMillis() - serverCurrentTime);
337    if (skew > maxSkew) {
338      String message = "Server " + serverName + " has been " +
339        "rejected; Reported time is too far out of sync with master.  " +
340        "Time difference of " + skew + "ms > max allowed of " + maxSkew + "ms";
341      LOG.warn(message);
342      throw new ClockOutOfSyncException(message);
343    } else if (skew > warningSkew){
344      String message = "Reported time for server " + serverName + " is out of sync with master " +
345        "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " +
346        "error threshold is " + maxSkew + "ms)";
347      LOG.warn(message);
348    }
349  }
350
351  /**
352   * If this server is on the dead list, reject it with a YouAreDeadException.
353   * If it was dead but came back with a new start code, remove the old entry
354   * from the dead list.
355   * @param what START or REPORT
356   */
357  private void checkIsDead(final ServerName serverName, final String what)
358      throws YouAreDeadException {
359    if (this.deadservers.isDeadServer(serverName)) {
360      // host name, port and start code all match with existing one of the
361      // dead servers. So, this server must be dead.
362      String message = "Server " + what + " rejected; currently processing " +
363          serverName + " as dead server";
364      LOG.debug(message);
365      throw new YouAreDeadException(message);
366    }
367    // remove dead server with same hostname and port of newly checking in rs after master
368    // initialization.See HBASE-5916 for more information.
369    if ((this.master == null || this.master.isInitialized())
370        && this.deadservers.cleanPreviousInstance(serverName)) {
371      // This server has now become alive after we marked it as dead.
372      // We removed it's previous entry from the dead list to reflect it.
373      LOG.debug(what + ":" + " Server " + serverName + " came back up," +
374          " removed it from the dead servers list");
375    }
376  }
377
378  /**
379   * Assumes onlineServers is locked.
380   * @return ServerName with matching hostname and port.
381   */
382  private ServerName findServerWithSameHostnamePortWithLock(
383      final ServerName serverName) {
384    ServerName end = ServerName.valueOf(serverName.getHostname(), serverName.getPort(),
385        Long.MAX_VALUE);
386
387    ServerName r = onlineServers.lowerKey(end);
388    if (r != null) {
389      if (ServerName.isSameAddress(r, serverName)) {
390        return r;
391      }
392    }
393    return null;
394  }
395
396  /**
397   * Adds the onlineServers list. onlineServers should be locked.
398   * @param serverName The remote servers name.
399   */
400  @VisibleForTesting
401  void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) {
402    LOG.info("Registering regionserver=" + serverName);
403    this.onlineServers.put(serverName, sl);
404    this.rsAdmins.remove(serverName);
405  }
406
407  public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
408    RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
409    Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
410    builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM);
411    Map<byte[], Long> storeFlushedSequenceId =
412        storeFlushedSequenceIdsByRegion.get(encodedRegionName);
413    if (storeFlushedSequenceId != null) {
414      for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) {
415        builder.addStoreSequenceId(StoreSequenceId.newBuilder()
416            .setFamilyName(UnsafeByteOperations.unsafeWrap(entry.getKey()))
417            .setSequenceId(entry.getValue().longValue()).build());
418      }
419    }
420    return builder.build();
421  }
422
423  /**
424   * @param serverName
425   * @return ServerMetrics if serverName is known else null
426   */
427  public ServerMetrics getLoad(final ServerName serverName) {
428    return this.onlineServers.get(serverName);
429  }
430
431  /**
432   * Compute the average load across all region servers.
433   * Currently, this uses a very naive computation - just uses the number of
434   * regions being served, ignoring stats about number of requests.
435   * @return the average load
436   */
437  public double getAverageLoad() {
438    int totalLoad = 0;
439    int numServers = 0;
440    for (ServerMetrics sl : this.onlineServers.values()) {
441      numServers++;
442      totalLoad += sl.getRegionMetrics().size();
443    }
444    return numServers == 0 ? 0 :
445      (double)totalLoad / (double)numServers;
446  }
447
448  /** @return the count of active regionservers */
449  public int countOfRegionServers() {
450    // Presumes onlineServers is a concurrent map
451    return this.onlineServers.size();
452  }
453
454  /**
455   * @return Read-only map of servers to serverinfo
456   */
457  public Map<ServerName, ServerMetrics> getOnlineServers() {
458    // Presumption is that iterating the returned Map is OK.
459    synchronized (this.onlineServers) {
460      return Collections.unmodifiableMap(this.onlineServers);
461    }
462  }
463
464  public DeadServer getDeadServers() {
465    return this.deadservers;
466  }
467
468  /**
469   * Checks if any dead servers are currently in progress.
470   * @return true if any RS are being processed as dead, false if not
471   */
472  public boolean areDeadServersInProgress() {
473    return this.deadservers.areDeadServersInProgress();
474  }
475
476  void letRegionServersShutdown() {
477    long previousLogTime = 0;
478    ServerName sn = master.getServerName();
479    ZKWatcher zkw = master.getZooKeeper();
480    int onlineServersCt;
481    while ((onlineServersCt = onlineServers.size()) > 0){
482
483      if (System.currentTimeMillis() > (previousLogTime + 1000)) {
484        Set<ServerName> remainingServers = onlineServers.keySet();
485        synchronized (onlineServers) {
486          if (remainingServers.size() == 1 && remainingServers.contains(sn)) {
487            // Master will delete itself later.
488            return;
489          }
490        }
491        StringBuilder sb = new StringBuilder();
492        // It's ok here to not sync on onlineServers - merely logging
493        for (ServerName key : remainingServers) {
494          if (sb.length() > 0) {
495            sb.append(", ");
496          }
497          sb.append(key);
498        }
499        LOG.info("Waiting on regionserver(s) " + sb.toString());
500        previousLogTime = System.currentTimeMillis();
501      }
502
503      try {
504        List<String> servers = getRegionServersInZK(zkw);
505        if (servers == null || servers.isEmpty() || (servers.size() == 1
506            && servers.contains(sn.toString()))) {
507          LOG.info("ZK shows there is only the master self online, exiting now");
508          // Master could have lost some ZK events, no need to wait more.
509          break;
510        }
511      } catch (KeeperException ke) {
512        LOG.warn("Failed to list regionservers", ke);
513        // ZK is malfunctioning, don't hang here
514        break;
515      }
516      synchronized (onlineServers) {
517        try {
518          if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
519        } catch (InterruptedException ignored) {
520          // continue
521        }
522      }
523    }
524  }
525
526  private List<String> getRegionServersInZK(final ZKWatcher zkw)
527  throws KeeperException {
528    return ZKUtil.listChildrenNoWatch(zkw, zkw.znodePaths.rsZNode);
529  }
530
531  /**
532   * @return True if we should expire <code>serverName</code>
533   */
534  boolean expire(ServerName serverName) {
535    return this.onlineServers.containsKey(serverName) ||
536        this.deadservers.isDeadServer(serverName) ||
537        this.master.getAssignmentManager().getRegionStates().getServerNode(serverName) != null ||
538        this.master.getMasterWalManager().isWALDirectoryNameWithWALs(serverName);
539  }
540
541  /**
542   * Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
543   * @return True if we queued a ServerCrashProcedure else false if we did not (could happen for
544   *         many reasons including the fact that its this server that is going down or we already
545   *         have queued an SCP for this server or SCP processing is currently disabled because we
546   *         are in startup phase).
547   */
548  public synchronized boolean expireServer(final ServerName serverName) {
549    // THIS server is going down... can't handle our own expiration.
550    if (serverName.equals(master.getServerName())) {
551      if (!(master.isAborted() || master.isStopped())) {
552        master.stop("We lost our znode?");
553      }
554      return false;
555    }
556    // Check if we should bother running an expire!
557    if (!expire(serverName)) {
558      LOG.info("Skipping expire; {} is not online, not in deadservers, not in fs -- presuming " +
559          "long gone server instance!", serverName);
560      return false;
561    }
562
563    if (this.deadservers.isDeadServer(serverName)) {
564      LOG.warn("Expiration called on {} but crash processing in progress, serverStateNode={}",
565          serverName,
566          this.master.getAssignmentManager().getRegionStates().getServerNode(serverName));
567      return false;
568    }
569
570    if (!moveFromOnlineToDeadServers(serverName)) {
571      LOG.info("Expiration called on {} but NOT online", serverName);
572      // Continue.
573    }
574
575    // If cluster is going down, yes, servers are going to be expiring; don't
576    // process as a dead server
577    if (isClusterShutdown()) {
578      LOG.info("Cluster shutdown set; " + serverName +
579        " expired; onlineServers=" + this.onlineServers.size());
580      if (this.onlineServers.isEmpty()) {
581        master.stop("Cluster shutdown set; onlineServer=0");
582      }
583      return false;
584    }
585    LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
586    master.getAssignmentManager().submitServerCrash(serverName, true);
587
588    // Tell our listeners that a server was removed
589    if (!this.listeners.isEmpty()) {
590      for (ServerListener listener : this.listeners) {
591        listener.serverRemoved(serverName);
592      }
593    }
594    return true;
595  }
596
597  /**
598   * @return Returns true if was online.
599   */
600  @VisibleForTesting
601  public boolean moveFromOnlineToDeadServers(final ServerName sn) {
602    boolean online = false;
603    synchronized (onlineServers) {
604      // Remove the server from the known servers lists and update load info BUT
605      // add to deadservers first; do this so it'll show in dead servers list if
606      // not in online servers list.
607      this.deadservers.add(sn);
608      if (this.onlineServers.remove(sn) != null) {
609        online = true;
610        onlineServers.notifyAll();
611      }
612    }
613    this.rsAdmins.remove(sn);
614    return online;
615  }
616
617  /*
618   * Remove the server from the drain list.
619   */
620  public synchronized boolean removeServerFromDrainList(final ServerName sn) {
621    // Warn if the server (sn) is not online.  ServerName is of the form:
622    // <hostname> , <port> , <startcode>
623
624    if (!this.isServerOnline(sn)) {
625      LOG.warn("Server " + sn + " is not currently online. " +
626               "Removing from draining list anyway, as requested.");
627    }
628    // Remove the server from the draining servers lists.
629    return this.drainingServers.remove(sn);
630  }
631
632  /**
633   * Add the server to the drain list.
634   * @param sn
635   * @return True if the server is added or the server is already on the drain list.
636   */
637  public synchronized boolean addServerToDrainList(final ServerName sn) {
638    // Warn if the server (sn) is not online.  ServerName is of the form:
639    // <hostname> , <port> , <startcode>
640
641    if (!this.isServerOnline(sn)) {
642      LOG.warn("Server " + sn + " is not currently online. " +
643               "Ignoring request to add it to draining list.");
644      return false;
645    }
646    // Add the server to the draining servers lists, if it's not already in
647    // it.
648    if (this.drainingServers.contains(sn)) {
649      LOG.warn("Server " + sn + " is already in the draining server list." +
650               "Ignoring request to add it again.");
651      return true;
652    }
653    LOG.info("Server " + sn + " added to draining server list.");
654    return this.drainingServers.add(sn);
655  }
656
657  // RPC methods to region servers
658
659  private HBaseRpcController newRpcController() {
660    return rpcControllerFactory == null ? null : rpcControllerFactory.newController();
661  }
662
663  /**
664   * Sends a WARMUP RPC to the specified server to warmup the specified region.
665   * <p>
666   * A region server could reject the close request because it either does not
667   * have the specified region or the region is being split.
668   * @param server server to warmup a region
669   * @param region region to  warmup
670   */
671  public void sendRegionWarmup(ServerName server,
672      RegionInfo region) {
673    if (server == null) return;
674    try {
675      AdminService.BlockingInterface admin = getRsAdmin(server);
676      HBaseRpcController controller = newRpcController();
677      ProtobufUtil.warmupRegion(controller, admin, region);
678    } catch (IOException e) {
679      LOG.error("Received exception in RPC for warmup server:" +
680        server + "region: " + region +
681        "exception: " + e);
682    }
683  }
684
685  /**
686   * Contacts a region server and waits up to timeout ms
687   * to close the region.  This bypasses the active hmaster.
688   */
689  public static void closeRegionSilentlyAndWait(ClusterConnection connection,
690    ServerName server, RegionInfo region, long timeout) throws IOException, InterruptedException {
691    AdminService.BlockingInterface rs = connection.getAdmin(server);
692    HBaseRpcController controller = connection.getRpcControllerFactory().newController();
693    try {
694      ProtobufUtil.closeRegion(controller, rs, server, region.getRegionName());
695    } catch (IOException e) {
696      LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e);
697    }
698    long expiration = timeout + System.currentTimeMillis();
699    while (System.currentTimeMillis() < expiration) {
700      controller.reset();
701      try {
702        RegionInfo rsRegion =
703          ProtobufUtil.getRegionInfo(controller, rs, region.getRegionName());
704        if (rsRegion == null) return;
705      } catch (IOException ioe) {
706        if (ioe instanceof NotServingRegionException) // no need to retry again
707          return;
708        LOG.warn("Exception when retrieving regioninfo from: "
709          + region.getRegionNameAsString(), ioe);
710      }
711      Thread.sleep(1000);
712    }
713    throw new IOException("Region " + region + " failed to close within"
714        + " timeout " + timeout);
715  }
716
717  /**
718   * @param sn
719   * @return Admin interface for the remote regionserver named <code>sn</code>
720   * @throws IOException
721   * @throws RetriesExhaustedException wrapping a ConnectException if failed
722   */
723  public AdminService.BlockingInterface getRsAdmin(final ServerName sn)
724  throws IOException {
725    AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
726    if (admin == null) {
727      LOG.debug("New admin connection to " + sn.toString());
728      if (sn.equals(master.getServerName()) && master instanceof HRegionServer) {
729        // A master is also a region server now, see HBASE-10569 for details
730        admin = ((HRegionServer)master).getRSRpcServices();
731      } else {
732        admin = this.connection.getAdmin(sn);
733      }
734      this.rsAdmins.put(sn, admin);
735    }
736    return admin;
737  }
738
739  /**
740   * Calculate min necessary to start. This is not an absolute. It is just
741   * a friction that will cause us hang around a bit longer waiting on
742   * RegionServers to check-in.
743   */
744  private int getMinToStart() {
745    if (master.isInMaintenanceMode()) {
746      // If in maintenance mode, then master hosting meta will be the only server available
747      return 1;
748    }
749
750    int minimumRequired = 1;
751    if (LoadBalancer.isTablesOnMaster(master.getConfiguration()) &&
752        LoadBalancer.isSystemTablesOnlyOnMaster(master.getConfiguration())) {
753      // If Master is carrying regions it will show up as a 'server', but is not handling user-
754      // space regions, so we need a second server.
755      minimumRequired = 2;
756    }
757
758    int minToStart = this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
759    // Ensure we are never less than minimumRequired else stuff won't work.
760    return Math.max(minToStart, minimumRequired);
761  }
762
763  /**
764   * Wait for the region servers to report in.
765   * We will wait until one of this condition is met:
766   *  - the master is stopped
767   *  - the 'hbase.master.wait.on.regionservers.maxtostart' number of
768   *    region servers is reached
769   *  - the 'hbase.master.wait.on.regionservers.mintostart' is reached AND
770   *   there have been no new region server in for
771   *      'hbase.master.wait.on.regionservers.interval' time AND
772   *   the 'hbase.master.wait.on.regionservers.timeout' is reached
773   *
774   * @throws InterruptedException
775   */
776  public void waitForRegionServers(MonitoredTask status) throws InterruptedException {
777    final long interval = this.master.getConfiguration().
778        getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
779    final long timeout = this.master.getConfiguration().
780        getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
781    // Min is not an absolute; just a friction making us wait longer on server checkin.
782    int minToStart = getMinToStart();
783    int maxToStart = this.master.getConfiguration().
784        getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
785    if (maxToStart < minToStart) {
786      LOG.warn(String.format("The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
787          WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart,
788          WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
789      maxToStart = Integer.MAX_VALUE;
790    }
791
792    long now =  System.currentTimeMillis();
793    final long startTime = now;
794    long slept = 0;
795    long lastLogTime = 0;
796    long lastCountChange = startTime;
797    int count = countOfRegionServers();
798    int oldCount = 0;
799    // This while test is a little hard to read. We try to comment it in below but in essence:
800    // Wait if Master is not stopped and the number of regionservers that have checked-in is
801    // less than the maxToStart. Both of these conditions will be true near universally.
802    // Next, we will keep cycling if ANY of the following three conditions are true:
803    // 1. The time since a regionserver registered is < interval (means servers are actively checking in).
804    // 2. We are under the total timeout.
805    // 3. The count of servers is < minimum.
806    for (ServerListener listener: this.listeners) {
807      listener.waiting();
808    }
809    while (!this.master.isStopped() && !isClusterShutdown() && count < maxToStart &&
810        ((lastCountChange + interval) > now || timeout > slept || count < minToStart)) {
811      // Log some info at every interval time or if there is a change
812      if (oldCount != count || lastLogTime + interval < now) {
813        lastLogTime = now;
814        String msg =
815            "Waiting on regionserver count=" + count + "; waited="+
816                slept + "ms, expecting min=" + minToStart + " server(s), max="+ getStrForMax(maxToStart) +
817                " server(s), " + "timeout=" + timeout + "ms, lastChange=" + (lastCountChange - now) + "ms";
818        LOG.info(msg);
819        status.setStatus(msg);
820      }
821
822      // We sleep for some time
823      final long sleepTime = 50;
824      Thread.sleep(sleepTime);
825      now =  System.currentTimeMillis();
826      slept = now - startTime;
827
828      oldCount = count;
829      count = countOfRegionServers();
830      if (count != oldCount) {
831        lastCountChange = now;
832      }
833    }
834    // Did we exit the loop because cluster is going down?
835    if (isClusterShutdown()) {
836      this.master.stop("Cluster shutdown");
837    }
838    LOG.info("Finished waiting on RegionServer count=" + count + "; waited=" + slept + "ms," +
839        " expected min=" + minToStart + " server(s), max=" +  getStrForMax(maxToStart) + " server(s),"+
840        " master is "+ (this.master.isStopped() ? "stopped.": "running"));
841  }
842
843  private String getStrForMax(final int max) {
844    return max == Integer.MAX_VALUE? "NO_LIMIT": Integer.toString(max);
845  }
846
847  /**
848   * @return A copy of the internal list of online servers.
849   */
850  public List<ServerName> getOnlineServersList() {
851    // TODO: optimize the load balancer call so we don't need to make a new list
852    // TODO: FIX. THIS IS POPULAR CALL.
853    return new ArrayList<>(this.onlineServers.keySet());
854  }
855
856  /**
857   * @param keys The target server name
858   * @param idleServerPredicator Evaluates the server on the given load
859   * @return A copy of the internal list of online servers matched by the predicator
860   */
861  public List<ServerName> getOnlineServersListWithPredicator(List<ServerName> keys,
862    Predicate<ServerMetrics> idleServerPredicator) {
863    List<ServerName> names = new ArrayList<>();
864    if (keys != null && idleServerPredicator != null) {
865      keys.forEach(name -> {
866        ServerMetrics load = onlineServers.get(name);
867        if (load != null) {
868          if (idleServerPredicator.test(load)) {
869            names.add(name);
870          }
871        }
872      });
873    }
874    return names;
875  }
876
877  /**
878   * @return A copy of the internal list of draining servers.
879   */
880  public List<ServerName> getDrainingServersList() {
881    return new ArrayList<>(this.drainingServers);
882  }
883
884  public boolean isServerOnline(ServerName serverName) {
885    return serverName != null && onlineServers.containsKey(serverName);
886  }
887
888  /**
889   * Check if a server is known to be dead.  A server can be online,
890   * or known to be dead, or unknown to this manager (i.e, not online,
891   * not known to be dead either. it is simply not tracked by the
892   * master any more, for example, a very old previous instance).
893   */
894  public synchronized boolean isServerDead(ServerName serverName) {
895    return serverName == null || deadservers.isDeadServer(serverName);
896  }
897
898  public void shutdownCluster() {
899    String statusStr = "Cluster shutdown requested of master=" + this.master.getServerName();
900    LOG.info(statusStr);
901    this.clusterShutdown.set(true);
902    if (onlineServers.isEmpty()) {
903      // we do not synchronize here so this may cause a double stop, but not a big deal
904      master.stop("OnlineServer=0 right after cluster shutdown set");
905    }
906  }
907
908  public boolean isClusterShutdown() {
909    return this.clusterShutdown.get();
910  }
911
912  /**
913   * Stop the ServerManager.
914   */
915  public void stop() {
916    // Nothing to do.
917  }
918
919  /**
920   * Creates a list of possible destinations for a region. It contains the online servers, but not
921   *  the draining or dying servers.
922   *  @param serversToExclude can be null if there is no server to exclude
923   */
924  public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude){
925    final List<ServerName> destServers = getOnlineServersList();
926
927    if (serversToExclude != null) {
928      destServers.removeAll(serversToExclude);
929    }
930
931    // Loop through the draining server list and remove them from the server list
932    final List<ServerName> drainingServersCopy = getDrainingServersList();
933    destServers.removeAll(drainingServersCopy);
934
935    return destServers;
936  }
937
938  /**
939   * Calls {@link #createDestinationServersList} without server to exclude.
940   */
941  public List<ServerName> createDestinationServersList(){
942    return createDestinationServersList(null);
943  }
944
945  /**
946   * To clear any dead server with same host name and port of any online server
947   */
948  void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
949    for (ServerName serverName : getOnlineServersList()) {
950      deadservers.cleanAllPreviousInstances(serverName);
951    }
952  }
953
954  /**
955   * Called by delete table and similar to notify the ServerManager that a region was removed.
956   */
957  public void removeRegion(final RegionInfo regionInfo) {
958    final byte[] encodedName = regionInfo.getEncodedNameAsBytes();
959    storeFlushedSequenceIdsByRegion.remove(encodedName);
960    flushedSequenceIdByRegion.remove(encodedName);
961  }
962
963  @VisibleForTesting
964  public boolean isRegionInServerManagerStates(final RegionInfo hri) {
965    final byte[] encodedName = hri.getEncodedNameAsBytes();
966    return (storeFlushedSequenceIdsByRegion.containsKey(encodedName)
967        || flushedSequenceIdByRegion.containsKey(encodedName));
968  }
969
970  /**
971   * Called by delete table and similar to notify the ServerManager that a region was removed.
972   */
973  public void removeRegions(final List<RegionInfo> regions) {
974    for (RegionInfo hri: regions) {
975      removeRegion(hri);
976    }
977  }
978
979  /**
980   * May return 0 when server is not online.
981   */
982  public int getServerVersion(final ServerName serverName) {
983    ServerMetrics serverMetrics = onlineServers.get(serverName);
984    return serverMetrics != null ? serverMetrics.getVersionNumber() : 0;
985  }
986
987  public int getInfoPort(ServerName serverName) {
988    ServerMetrics serverMetrics = onlineServers.get(serverName);
989    return serverMetrics != null ? serverMetrics.getInfoServerPort() : 0;
990  }
991}