View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.net.InetAddress;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Map.Entry;
31  import java.util.Set;
32  import java.util.SortedMap;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentSkipListMap;
35  import java.util.concurrent.CopyOnWriteArrayList;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.ClockOutOfSyncException;
41  import org.apache.hadoop.hbase.HRegionInfo;
42  import org.apache.hadoop.hbase.RegionLoad;
43  import org.apache.hadoop.hbase.Server;
44  import org.apache.hadoop.hbase.ServerLoad;
45  import org.apache.hadoop.hbase.ServerName;
46  import org.apache.hadoop.hbase.YouAreDeadException;
47  import org.apache.hadoop.hbase.classification.InterfaceAudience;
48  import org.apache.hadoop.hbase.client.ClusterConnection;
49  import org.apache.hadoop.hbase.client.ConnectionFactory;
50  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
51  import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler;
52  import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
53  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
54  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
55  import org.apache.hadoop.hbase.protobuf.RequestConverter;
56  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
57  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
58  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
59  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
60  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
61  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
62  import org.apache.hadoop.hbase.regionserver.HRegionServer;
63  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
64  import org.apache.hadoop.hbase.util.Bytes;
65  import org.apache.hadoop.hbase.util.Pair;
66  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
67  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
68  import org.apache.zookeeper.KeeperException;
69  
70  import com.google.common.annotations.VisibleForTesting;
71  import com.google.protobuf.ServiceException;
72  
73  /**
74   * The ServerManager class manages info about region servers.
75   * <p>
76   * Maintains lists of online and dead servers.  Processes the startups,
77   * shutdowns, and deaths of region servers.
78   * <p>
79   * Servers are distinguished in two different ways.  A given server has a
80   * location, specified by hostname and port, and of which there can only be one
81   * online at any given time.  A server instance is specified by the location
82   * (hostname and port) as well as the startcode (timestamp from when the server
83   * was started).  This is used to differentiate a restarted instance of a given
84   * server from the original instance.
85   * <p>
86   * If a sever is known not to be running any more, it is called dead. The dead
87   * server needs to be handled by a ServerShutdownHandler.  If the handler is not
88   * enabled yet, the server can't be handled right away so it is queued up.
89   * After the handler is enabled, the server will be submitted to a handler to handle.
90   * However, the handler may be just partially enabled.  If so,
91   * the server cannot be fully processed, and be queued up for further processing.
92   * A server is fully processed only after the handler is fully enabled
93   * and has completed the handling.
94   */
95  @InterfaceAudience.Private
96  @SuppressWarnings("deprecation")
97  public class ServerManager {
98    public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
99        "hbase.master.wait.on.regionservers.maxtostart";
100 
101   public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
102       "hbase.master.wait.on.regionservers.mintostart";
103 
104   public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
105       "hbase.master.wait.on.regionservers.timeout";
106 
107   public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
108       "hbase.master.wait.on.regionservers.interval";
109 
110   private static final Log LOG = LogFactory.getLog(ServerManager.class);
111 
112   // Set if we are to shutdown the cluster.
113   private volatile boolean clusterShutdown = false;
114 
115   private final SortedMap<byte[], Long> flushedSequenceIdByRegion =
116     new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
117 
118   /** Map of registered servers to their current load */
119   private final ConcurrentHashMap<ServerName, ServerLoad> onlineServers =
120     new ConcurrentHashMap<ServerName, ServerLoad>();
121 
122   /**
123    * Map of admin interfaces per registered regionserver; these interfaces we use to control
124    * regionservers out on the cluster
125    */
126   private final Map<ServerName, AdminService.BlockingInterface> rsAdmins =
127     new HashMap<ServerName, AdminService.BlockingInterface>();
128 
129   /**
130    * List of region servers <ServerName> that should not get any more new
131    * regions.
132    */
133   private final ArrayList<ServerName> drainingServers =
134     new ArrayList<ServerName>();
135 
136   private final Server master;
137   private final MasterServices services;
138   private final ClusterConnection connection;
139 
140   private final DeadServer deadservers = new DeadServer();
141 
142   private final long maxSkew;
143   private final long warningSkew;
144 
145   /**
146    * Set of region servers which are dead but not processed immediately. If one
147    * server died before master enables ServerShutdownHandler, the server will be
148    * added to this set and will be processed through calling
149    * {@link ServerManager#processQueuedDeadServers()} by master.
150    * <p>
151    * A dead server is a server instance known to be dead, not listed in the /hbase/rs
152    * znode any more. It may have not been submitted to ServerShutdownHandler yet
153    * because the handler is not enabled.
154    * <p>
155    * A dead server, which has been submitted to ServerShutdownHandler while the
156    * handler is not enabled, is queued up.
157    * <p>
158    * So this is a set of region servers known to be dead but not submitted to
159    * ServerShutdownHandler for processing yet.
160    */
161   private Set<ServerName> queuedDeadServers = new HashSet<ServerName>();
162 
163   /**
164    * Set of region servers which are dead and submitted to ServerShutdownHandler to process but not
165    * fully processed immediately.
166    * <p>
167    * If one server died before assignment manager finished the failover cleanup, the server will be
168    * added to this set and will be processed through calling
169    * {@link ServerManager#processQueuedDeadServers()} by assignment manager.
170    * <p>
171    * The Boolean value indicates whether log split is needed inside ServerShutdownHandler
172    * <p>
173    * ServerShutdownHandler processes a dead server submitted to the handler after the handler is
174    * enabled. It may not be able to complete the processing because meta is not yet online or master
175    * is currently in startup mode. In this case, the dead server will be parked in this set
176    * temporarily.
177    */
178   private Map<ServerName, Boolean> requeuedDeadServers
179     = new ConcurrentHashMap<ServerName, Boolean>();
180 
181   /** Listeners that are called on server events. */
182   private List<ServerListener> listeners = new CopyOnWriteArrayList<ServerListener>();
183 
184   /**
185    * Constructor.
186    * @param master
187    * @param services
188    * @throws ZooKeeperConnectionException
189    */
190   public ServerManager(final Server master, final MasterServices services)
191       throws IOException {
192     this(master, services, true);
193   }
194 
195   ServerManager(final Server master, final MasterServices services,
196       final boolean connect) throws IOException {
197     this.master = master;
198     this.services = services;
199     Configuration c = master.getConfiguration();
200     maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
201     warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
202     this.connection = connect ? (ClusterConnection)ConnectionFactory.createConnection(c) : null;
203   }
204 
205   /**
206    * Add the listener to the notification list.
207    * @param listener The ServerListener to register
208    */
209   public void registerListener(final ServerListener listener) {
210     this.listeners.add(listener);
211   }
212 
213   /**
214    * Remove the listener from the notification list.
215    * @param listener The ServerListener to unregister
216    */
217   public boolean unregisterListener(final ServerListener listener) {
218     return this.listeners.remove(listener);
219   }
220 
221   /**
222    * Let the server manager know a new regionserver has come online
223    * @param ia The remote address
224    * @param port The remote port
225    * @param serverStartcode
226    * @param serverCurrentTime The current time of the region server in ms
227    * @return The ServerName we know this server as.
228    * @throws IOException
229    */
230   ServerName regionServerStartup(final InetAddress ia, final int port,
231     final long serverStartcode, long serverCurrentTime)
232   throws IOException {
233     // Test for case where we get a region startup message from a regionserver
234     // that has been quickly restarted but whose znode expiration handler has
235     // not yet run, or from a server whose fail we are currently processing.
236     // Test its host+port combo is present in serverAddresstoServerInfo.  If it
237     // is, reject the server and trigger its expiration. The next time it comes
238     // in, it should have been removed from serverAddressToServerInfo and queued
239     // for processing by ProcessServerShutdown.
240     ServerName sn = ServerName.valueOf(ia.getHostName(), port, serverStartcode);
241     checkClockSkew(sn, serverCurrentTime);
242     checkIsDead(sn, "STARTUP");
243     if (!checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
244       LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup"
245         + " could not record the server: " + sn);
246     }
247     return sn;
248   }
249 
250   /**
251    * Updates last flushed sequence Ids for the regions on server sn
252    * @param sn
253    * @param hsl
254    */
255   private void updateLastFlushedSequenceIds(ServerName sn, ServerLoad hsl) {
256     Map<byte[], RegionLoad> regionsLoad = hsl.getRegionsLoad();
257     for (Entry<byte[], RegionLoad> entry : regionsLoad.entrySet()) {
258       Long existingValue = flushedSequenceIdByRegion.get(entry.getKey());
259       long l = entry.getValue().getCompleteSequenceId();
260       if (existingValue != null) {
261         if (l != -1 && l < existingValue) {
262           LOG.warn("RegionServer " + sn +
263               " indicates a last flushed sequence id (" + entry.getValue() +
264               ") that is less than the previous last flushed sequence id (" +
265               existingValue + ") for region " +
266               Bytes.toString(entry.getKey()) + " Ignoring.");
267 
268           continue; // Don't let smaller sequence ids override greater
269           // sequence ids.
270         }
271       }
272       flushedSequenceIdByRegion.put(entry.getKey(), l);
273     }
274   }
275 
276   void regionServerReport(ServerName sn,
277       ServerLoad sl) throws YouAreDeadException {
278     checkIsDead(sn, "REPORT");
279     if (null == this.onlineServers.replace(sn, sl)) {
280       // Already have this host+port combo and its just different start code?
281       // Just let the server in. Presume master joining a running cluster.
282       // recordNewServer is what happens at the end of reportServerStartup.
283       // The only thing we are skipping is passing back to the regionserver
284       // the ServerName to use. Here we presume a master has already done
285       // that so we'll press on with whatever it gave us for ServerName.
286       if (!checkAndRecordNewServer(sn, sl)) {
287         LOG.info("RegionServerReport ignored, could not record the server: " + sn);
288         return; // Not recorded, so no need to move on
289       }
290     }
291     updateLastFlushedSequenceIds(sn, sl);
292   }
293 
294   /**
295    * Check is a server of same host and port already exists,
296    * if not, or the existed one got a smaller start code, record it.
297    *
298    * @param serverName the server to check and record
299    * @param sl the server load on the server
300    * @return true if the server is recorded, otherwise, false
301    */
302   boolean checkAndRecordNewServer(
303       final ServerName serverName, final ServerLoad sl) {
304     ServerName existingServer = null;
305     synchronized (this.onlineServers) {
306       existingServer = findServerWithSameHostnamePortWithLock(serverName);
307       if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
308         LOG.info("Server serverName=" + serverName + " rejected; we already have "
309             + existingServer.toString() + " registered with same hostname and port");
310         return false;
311       }
312       recordNewServerWithLock(serverName, sl);
313     }
314 
315     // Tell our listeners that a server was added
316     if (!this.listeners.isEmpty()) {
317       for (ServerListener listener : this.listeners) {
318         listener.serverAdded(serverName);
319       }
320     }
321 
322     // Note that we assume that same ts means same server, and don't expire in that case.
323     //  TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky.
324     if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
325       LOG.info("Triggering server recovery; existingServer " +
326           existingServer + " looks stale, new server:" + serverName);
327       expireServer(existingServer);
328     }
329     return true;
330   }
331 
332   /**
333    * Checks if the clock skew between the server and the master. If the clock skew exceeds the
334    * configured max, it will throw an exception; if it exceeds the configured warning threshold,
335    * it will log a warning but start normally.
336    * @param serverName Incoming servers's name
337    * @param serverCurrentTime
338    * @throws ClockOutOfSyncException if the skew exceeds the configured max value
339    */
340   private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
341   throws ClockOutOfSyncException {
342     long skew = Math.abs(System.currentTimeMillis() - serverCurrentTime);
343     if (skew > maxSkew) {
344       String message = "Server " + serverName + " has been " +
345         "rejected; Reported time is too far out of sync with master.  " +
346         "Time difference of " + skew + "ms > max allowed of " + maxSkew + "ms";
347       LOG.warn(message);
348       throw new ClockOutOfSyncException(message);
349     } else if (skew > warningSkew){
350       String message = "Reported time for server " + serverName + " is out of sync with master " +
351         "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " +
352         "error threshold is " + maxSkew + "ms)";
353       LOG.warn(message);
354     }
355   }
356 
357   /**
358    * If this server is on the dead list, reject it with a YouAreDeadException.
359    * If it was dead but came back with a new start code, remove the old entry
360    * from the dead list.
361    * @param serverName
362    * @param what START or REPORT
363    * @throws org.apache.hadoop.hbase.YouAreDeadException
364    */
365   private void checkIsDead(final ServerName serverName, final String what)
366       throws YouAreDeadException {
367     if (this.deadservers.isDeadServer(serverName)) {
368       // host name, port and start code all match with existing one of the
369       // dead servers. So, this server must be dead.
370       String message = "Server " + what + " rejected; currently processing " +
371           serverName + " as dead server";
372       LOG.debug(message);
373       throw new YouAreDeadException(message);
374     }
375     // remove dead server with same hostname and port of newly checking in rs after master
376     // initialization.See HBASE-5916 for more information.
377     if ((this.services == null || ((HMaster) this.services).isInitialized())
378         && this.deadservers.cleanPreviousInstance(serverName)) {
379       // This server has now become alive after we marked it as dead.
380       // We removed it's previous entry from the dead list to reflect it.
381       LOG.debug(what + ":" + " Server " + serverName + " came back up," +
382           " removed it from the dead servers list");
383     }
384   }
385 
386   /**
387    * Assumes onlineServers is locked.
388    * @return ServerName with matching hostname and port.
389    */
390   private ServerName findServerWithSameHostnamePortWithLock(
391       final ServerName serverName) {
392     for (ServerName sn: this.onlineServers.keySet()) {
393       if (ServerName.isSameHostnameAndPort(serverName, sn)) return sn;
394     }
395     return null;
396   }
397 
398   /**
399    * Adds the onlineServers list. onlineServers should be locked.
400    * @param serverName The remote servers name.
401    * @param sl
402    * @return Server load from the removed server, if any.
403    */
404   @VisibleForTesting
405   void recordNewServerWithLock(final ServerName serverName, final ServerLoad sl) {
406     LOG.info("Registering server=" + serverName);
407     this.onlineServers.put(serverName, sl);
408     this.rsAdmins.remove(serverName);
409   }
410 
411   public long getLastFlushedSequenceId(byte[] regionName) {
412     long seqId = -1;
413     if (flushedSequenceIdByRegion.containsKey(regionName)) {
414       seqId = flushedSequenceIdByRegion.get(regionName);
415     }
416     return seqId;
417   }
418 
419   /**
420    * @param serverName
421    * @return ServerLoad if serverName is known else null
422    */
423   public ServerLoad getLoad(final ServerName serverName) {
424     return this.onlineServers.get(serverName);
425   }
426 
427   /**
428    * Compute the average load across all region servers.
429    * Currently, this uses a very naive computation - just uses the number of
430    * regions being served, ignoring stats about number of requests.
431    * @return the average load
432    */
433   public double getAverageLoad() {
434     int totalLoad = 0;
435     int numServers = 0;
436     for (ServerLoad sl: this.onlineServers.values()) {
437         numServers++;
438         totalLoad += sl.getNumberOfRegions();
439     }
440     return numServers == 0 ? 0 :
441       (double)totalLoad / (double)numServers;
442   }
443 
444   /** @return the count of active regionservers */
445   public int countOfRegionServers() {
446     // Presumes onlineServers is a concurrent map
447     return this.onlineServers.size();
448   }
449 
450   /**
451    * @return Read-only map of servers to serverinfo
452    */
453   public Map<ServerName, ServerLoad> getOnlineServers() {
454     // Presumption is that iterating the returned Map is OK.
455     synchronized (this.onlineServers) {
456       return Collections.unmodifiableMap(this.onlineServers);
457     }
458   }
459 
460 
461   public DeadServer getDeadServers() {
462     return this.deadservers;
463   }
464 
465   /**
466    * Checks if any dead servers are currently in progress.
467    * @return true if any RS are being processed as dead, false if not
468    */
469   public boolean areDeadServersInProgress() {
470     return this.deadservers.areDeadServersInProgress();
471   }
472 
473   void letRegionServersShutdown() {
474     long previousLogTime = 0;
475     ServerName sn = master.getServerName();
476     ZooKeeperWatcher zkw = master.getZooKeeper();
477     int onlineServersCt;
478     while ((onlineServersCt = onlineServers.size()) > 0){
479 
480       if (System.currentTimeMillis() > (previousLogTime + 1000)) {
481         Set<ServerName> remainingServers = onlineServers.keySet();
482         synchronized (onlineServers) {
483           if (remainingServers.size() == 1 && remainingServers.contains(sn)) {
484             // Master will delete itself later.
485             return;
486           }
487         }
488         StringBuilder sb = new StringBuilder();
489         // It's ok here to not sync on onlineServers - merely logging
490         for (ServerName key : remainingServers) {
491           if (sb.length() > 0) {
492             sb.append(", ");
493           }
494           sb.append(key);
495         }
496         LOG.info("Waiting on regionserver(s) to go down " + sb.toString());
497         previousLogTime = System.currentTimeMillis();
498       }
499 
500       try {
501         List<String> servers = ZKUtil.listChildrenNoWatch(zkw, zkw.rsZNode);
502         if (servers == null || servers.size() == 0 || (servers.size() == 1
503             && servers.contains(sn.toString()))) {
504           LOG.info("ZK shows there is only the master self online, exiting now");
505           // Master could have lost some ZK events, no need to wait more.
506           break;
507         }
508       } catch (KeeperException ke) {
509         LOG.warn("Failed to list regionservers", ke);
510         // ZK is malfunctioning, don't hang here
511         break;
512       }
513       synchronized (onlineServers) {
514         try {
515           if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
516         } catch (InterruptedException ignored) {
517           // continue
518         }
519       }
520     }
521   }
522 
523   /*
524    * Expire the passed server.  Add it to list of dead servers and queue a
525    * shutdown processing.
526    */
527   public synchronized void expireServer(final ServerName serverName) {
528     if (serverName.equals(master.getServerName())) {
529       if (!(master.isAborted() || master.isStopped())) {
530         master.stop("We lost our znode?");
531       }
532       return;
533     }
534     if (!services.isServerShutdownHandlerEnabled()) {
535       LOG.info("Master doesn't enable ServerShutdownHandler during initialization, "
536           + "delay expiring server " + serverName);
537       this.queuedDeadServers.add(serverName);
538       return;
539     }
540     if (this.deadservers.isDeadServer(serverName)) {
541       // TODO: Can this happen?  It shouldn't be online in this case?
542       LOG.warn("Expiration of " + serverName +
543           " but server shutdown already in progress");
544       return;
545     }
546     synchronized (onlineServers) {
547       if (!this.onlineServers.containsKey(serverName)) {
548         LOG.warn("Expiration of " + serverName + " but server not online");
549       }
550       // Remove the server from the known servers lists and update load info BUT
551       // add to deadservers first; do this so it'll show in dead servers list if
552       // not in online servers list.
553       this.deadservers.add(serverName);
554       this.onlineServers.remove(serverName);
555       onlineServers.notifyAll();
556     }
557     this.rsAdmins.remove(serverName);
558     // If cluster is going down, yes, servers are going to be expiring; don't
559     // process as a dead server
560     if (this.clusterShutdown) {
561       LOG.info("Cluster shutdown set; " + serverName +
562         " expired; onlineServers=" + this.onlineServers.size());
563       if (this.onlineServers.isEmpty()) {
564         master.stop("Cluster shutdown set; onlineServer=0");
565       }
566       return;
567     }
568 
569     boolean carryingMeta = services.getAssignmentManager().isCarryingMeta(serverName);
570     if (carryingMeta) {
571       this.services.getExecutorService().submit(new MetaServerShutdownHandler(this.master,
572         this.services, this.deadservers, serverName));
573     } else {
574       this.services.getExecutorService().submit(new ServerShutdownHandler(this.master,
575         this.services, this.deadservers, serverName, true));
576     }
577     LOG.debug("Added=" + serverName +
578       " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
579 
580     // Tell our listeners that a server was removed
581     if (!this.listeners.isEmpty()) {
582       for (ServerListener listener : this.listeners) {
583         listener.serverRemoved(serverName);
584       }
585     }
586   }
587 
588   public synchronized void processDeadServer(final ServerName serverName) {
589     this.processDeadServer(serverName, false);
590   }
591 
592   public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitWal) {
593     // When assignment manager is cleaning up the zookeeper nodes and rebuilding the
594     // in-memory region states, region servers could be down. Meta table can and
595     // should be re-assigned, log splitting can be done too. However, it is better to
596     // wait till the cleanup is done before re-assigning user regions.
597     //
598     // We should not wait in the server shutdown handler thread since it can clog
599     // the handler threads and meta table could not be re-assigned in case
600     // the corresponding server is down. So we queue them up here instead.
601     if (!services.getAssignmentManager().isFailoverCleanupDone()) {
602       requeuedDeadServers.put(serverName, shouldSplitWal);
603       return;
604     }
605 
606     this.deadservers.add(serverName);
607     this.services.getExecutorService().submit(
608       new ServerShutdownHandler(this.master, this.services, this.deadservers, serverName,
609           shouldSplitWal));
610   }
611 
612   /**
613    * Process the servers which died during master's initialization. It will be
614    * called after HMaster#assignMeta and AssignmentManager#joinCluster.
615    * */
616   synchronized void processQueuedDeadServers() {
617     if (!services.isServerShutdownHandlerEnabled()) {
618       LOG.info("Master hasn't enabled ServerShutdownHandler");
619     }
620     Iterator<ServerName> serverIterator = queuedDeadServers.iterator();
621     while (serverIterator.hasNext()) {
622       ServerName tmpServerName = serverIterator.next();
623       expireServer(tmpServerName);
624       serverIterator.remove();
625       requeuedDeadServers.remove(tmpServerName);
626     }
627 
628     if (!services.getAssignmentManager().isFailoverCleanupDone()) {
629       LOG.info("AssignmentManager hasn't finished failover cleanup; waiting");
630     }
631 
632     for(ServerName tmpServerName : requeuedDeadServers.keySet()){
633       processDeadServer(tmpServerName, requeuedDeadServers.get(tmpServerName));
634     }
635     requeuedDeadServers.clear();
636   }
637 
638   /*
639    * Remove the server from the drain list.
640    */
641   public boolean removeServerFromDrainList(final ServerName sn) {
642     // Warn if the server (sn) is not online.  ServerName is of the form:
643     // <hostname> , <port> , <startcode>
644 
645     if (!this.isServerOnline(sn)) {
646       LOG.warn("Server " + sn + " is not currently online. " +
647                "Removing from draining list anyway, as requested.");
648     }
649     // Remove the server from the draining servers lists.
650     return this.drainingServers.remove(sn);
651   }
652 
653   /*
654    * Add the server to the drain list.
655    */
656   public boolean addServerToDrainList(final ServerName sn) {
657     // Warn if the server (sn) is not online.  ServerName is of the form:
658     // <hostname> , <port> , <startcode>
659 
660     if (!this.isServerOnline(sn)) {
661       LOG.warn("Server " + sn + " is not currently online. " +
662                "Ignoring request to add it to draining list.");
663       return false;
664     }
665     // Add the server to the draining servers lists, if it's not already in
666     // it.
667     if (this.drainingServers.contains(sn)) {
668       LOG.warn("Server " + sn + " is already in the draining server list." +
669                "Ignoring request to add it again.");
670       return false;
671     }
672     return this.drainingServers.add(sn);
673   }
674 
675   // RPC methods to region servers
676 
677   /**
678    * Sends an OPEN RPC to the specified server to open the specified region.
679    * <p>
680    * Open should not fail but can if server just crashed.
681    * <p>
682    * @param server server to open a region
683    * @param region region to open
684    * @param favoredNodes
685    */
686   public RegionOpeningState sendRegionOpen(final ServerName server,
687       HRegionInfo region, List<ServerName> favoredNodes)
688   throws IOException {
689     AdminService.BlockingInterface admin = getRsAdmin(server);
690     if (admin == null) {
691       throw new IOException("Attempting to send OPEN RPC to server " + server.toString() +
692         " failed because no RPC connection found to this server");
693     }
694     OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server,
695       region, favoredNodes,
696       (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
697     try {
698       OpenRegionResponse response = admin.openRegion(null, request);
699       return ResponseConverter.getRegionOpeningState(response);
700     } catch (ServiceException se) {
701       throw ProtobufUtil.getRemoteException(se);
702     }
703   }
704 
705   /**
706    * Sends an OPEN RPC to the specified server to open the specified region.
707    * <p>
708    * Open should not fail but can if server just crashed.
709    * <p>
710    * @param server server to open a region
711    * @param regionOpenInfos info of a list of regions to open
712    * @return a list of region opening states
713    */
714   public List<RegionOpeningState> sendRegionOpen(ServerName server,
715       List<Pair<HRegionInfo, List<ServerName>>> regionOpenInfos)
716   throws IOException {
717     AdminService.BlockingInterface admin = getRsAdmin(server);
718     if (admin == null) {
719       throw new IOException("Attempting to send OPEN RPC to server " + server.toString() +
720         " failed because no RPC connection found to this server");
721     }
722 
723     OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, regionOpenInfos,
724       (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
725     try {
726       OpenRegionResponse response = admin.openRegion(null, request);
727       return ResponseConverter.getRegionOpeningStateList(response);
728     } catch (ServiceException se) {
729       throw ProtobufUtil.getRemoteException(se);
730     }
731   }
732 
733   /**
734    * Sends an CLOSE RPC to the specified server to close the specified region.
735    * <p>
736    * A region server could reject the close request because it either does not
737    * have the specified region or the region is being split.
738    * @param server server to open a region
739    * @param region region to open
740    * @param dest - if the region is moved to another server, the destination server. null otherwise.
741    * @throws IOException
742    */
743   public boolean sendRegionClose(ServerName server, HRegionInfo region,
744       ServerName dest) throws IOException {
745     if (server == null) throw new NullPointerException("Passed server is null");
746     AdminService.BlockingInterface admin = getRsAdmin(server);
747     if (admin == null) {
748       throw new IOException("Attempting to send CLOSE RPC to server " +
749         server.toString() + " for region " +
750         region.getRegionNameAsString() +
751         " failed because no RPC connection found to this server");
752     }
753     return ProtobufUtil.closeRegion(admin, server, region.getRegionName(),
754       dest);
755   }
756 
757   public boolean sendRegionClose(ServerName server,
758       HRegionInfo region) throws IOException {
759     return sendRegionClose(server, region, null);
760   }
761 
762   /**
763    * Sends an MERGE REGIONS RPC to the specified server to merge the specified
764    * regions.
765    * <p>
766    * A region server could reject the close request because it either does not
767    * have the specified region.
768    * @param server server to merge regions
769    * @param region_a region to merge
770    * @param region_b region to merge
771    * @param forcible true if do a compulsory merge, otherwise we will only merge
772    *          two adjacent regions
773    * @throws IOException
774    */
775   public void sendRegionsMerge(ServerName server, HRegionInfo region_a,
776       HRegionInfo region_b, boolean forcible) throws IOException {
777     if (server == null)
778       throw new NullPointerException("Passed server is null");
779     if (region_a == null || region_b == null)
780       throw new NullPointerException("Passed region is null");
781     AdminService.BlockingInterface admin = getRsAdmin(server);
782     if (admin == null) {
783       throw new IOException("Attempting to send MERGE REGIONS RPC to server "
784           + server.toString() + " for region "
785           + region_a.getRegionNameAsString() + ","
786           + region_b.getRegionNameAsString()
787           + " failed because no RPC connection found to this server");
788     }
789     ProtobufUtil.mergeRegions(admin, region_a, region_b, forcible);
790   }
791 
792   /**
793    * Check if a region server is reachable and has the expected start code
794    */
795   public boolean isServerReachable(ServerName server) {
796     if (server == null) throw new NullPointerException("Passed server is null");
797     int maximumAttempts = Math.max(1, master.getConfiguration().getInt(
798       "hbase.master.maximum.ping.server.attempts", 10));
799     for (int i = 0; i < maximumAttempts; i++) {
800       try {
801         AdminService.BlockingInterface admin = getRsAdmin(server);
802         if (admin != null) {
803           ServerInfo info = ProtobufUtil.getServerInfo(admin);
804           return info != null && info.hasServerName()
805             && server.getStartcode() == info.getServerName().getStartCode();
806         }
807       } catch (IOException ioe) {
808         LOG.debug("Couldn't reach " + server + ", try=" + i
809           + " of " + maximumAttempts, ioe);
810       }
811     }
812     return false;
813   }
814 
815     /**
816     * @param sn
817     * @return Admin interface for the remote regionserver named <code>sn</code>
818     * @throws IOException
819     * @throws RetriesExhaustedException wrapping a ConnectException if failed
820     */
821   private AdminService.BlockingInterface getRsAdmin(final ServerName sn)
822   throws IOException {
823     AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
824     if (admin == null) {
825       LOG.debug("New admin connection to " + sn.toString());
826       if (sn.equals(master.getServerName()) && master instanceof HRegionServer) {
827         // A master is also a region server now, see HBASE-10569 for details
828         admin = ((HRegionServer)master).getRSRpcServices();
829       } else {
830         admin = this.connection.getAdmin(sn);
831       }
832       this.rsAdmins.put(sn, admin);
833     }
834     return admin;
835   }
836 
837   /**
838    * Wait for the region servers to report in.
839    * We will wait until one of this condition is met:
840    *  - the master is stopped
841    *  - the 'hbase.master.wait.on.regionservers.maxtostart' number of
842    *    region servers is reached
843    *  - the 'hbase.master.wait.on.regionservers.mintostart' is reached AND
844    *   there have been no new region server in for
845    *      'hbase.master.wait.on.regionservers.interval' time AND
846    *   the 'hbase.master.wait.on.regionservers.timeout' is reached
847    *
848    * @throws InterruptedException
849    */
850   public void waitForRegionServers(MonitoredTask status)
851   throws InterruptedException {
852     final long interval = this.master.getConfiguration().
853       getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
854     final long timeout = this.master.getConfiguration().
855       getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
856     int defaultMinToStart = 1;
857     if (BaseLoadBalancer.tablesOnMaster(master.getConfiguration())) {
858       // If we assign regions to master, we'd like to start
859       // at least another region server so that we don't
860       // assign all regions to master if other region servers
861       // don't come up in time.
862       defaultMinToStart = 2;
863     }
864     int minToStart = this.master.getConfiguration().
865       getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, defaultMinToStart);
866     if (minToStart < 1) {
867       LOG.warn(String.format(
868         "The value of '%s' (%d) can not be less than 1, ignoring.",
869         WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
870       minToStart = 1;
871     }
872     int maxToStart = this.master.getConfiguration().
873       getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
874     if (maxToStart < minToStart) {
875         LOG.warn(String.format(
876             "The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
877             WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart,
878             WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
879         maxToStart = Integer.MAX_VALUE;
880     }
881 
882     long now =  System.currentTimeMillis();
883     final long startTime = now;
884     long slept = 0;
885     long lastLogTime = 0;
886     long lastCountChange = startTime;
887     int count = countOfRegionServers();
888     int oldCount = 0;
889     while (!this.master.isStopped() && count < maxToStart
890         && (lastCountChange+interval > now || timeout > slept || count < minToStart)) {
891       // Log some info at every interval time or if there is a change
892       if (oldCount != count || lastLogTime+interval < now){
893         lastLogTime = now;
894         String msg =
895           "Waiting for region servers count to settle; currently"+
896             " checked in " + count + ", slept for " + slept + " ms," +
897             " expecting minimum of " + minToStart + ", maximum of "+ maxToStart+
898             ", timeout of "+timeout+" ms, interval of "+interval+" ms.";
899         LOG.info(msg);
900         status.setStatus(msg);
901       }
902 
903       // We sleep for some time
904       final long sleepTime = 50;
905       Thread.sleep(sleepTime);
906       now =  System.currentTimeMillis();
907       slept = now - startTime;
908 
909       oldCount = count;
910       count = countOfRegionServers();
911       if (count != oldCount) {
912         lastCountChange = now;
913       }
914     }
915 
916     LOG.info("Finished waiting for region servers count to settle;" +
917       " checked in " + count + ", slept for " + slept + " ms," +
918       " expecting minimum of " + minToStart + ", maximum of "+ maxToStart+","+
919       " master is "+ (this.master.isStopped() ? "stopped.": "running")
920     );
921   }
922 
923   /**
924    * @return A copy of the internal list of online servers.
925    */
926   public List<ServerName> getOnlineServersList() {
927     // TODO: optimize the load balancer call so we don't need to make a new list
928     // TODO: FIX. THIS IS POPULAR CALL.
929     return new ArrayList<ServerName>(this.onlineServers.keySet());
930   }
931 
932   /**
933    * @return A copy of the internal list of draining servers.
934    */
935   public List<ServerName> getDrainingServersList() {
936     return new ArrayList<ServerName>(this.drainingServers);
937   }
938 
939   /**
940    * @return A copy of the internal set of deadNotExpired servers.
941    */
942   Set<ServerName> getDeadNotExpiredServers() {
943     return new HashSet<ServerName>(this.queuedDeadServers);
944   }
945 
946   /**
947    * During startup, if we figure it is not a failover, i.e. there is
948    * no more WAL files to split, we won't try to recover these dead servers.
949    * So we just remove them from the queue. Use caution in calling this.
950    */
951   void removeRequeuedDeadServers() {
952     requeuedDeadServers.clear();
953   }
954 
955   /**
956    * @return A copy of the internal map of requeuedDeadServers servers and their corresponding
957    *         splitlog need flag.
958    */
959   Map<ServerName, Boolean> getRequeuedDeadServers() {
960     return Collections.unmodifiableMap(this.requeuedDeadServers);
961   }
962 
963   public boolean isServerOnline(ServerName serverName) {
964     return serverName != null && onlineServers.containsKey(serverName);
965   }
966 
967   /**
968    * Check if a server is known to be dead.  A server can be online,
969    * or known to be dead, or unknown to this manager (i.e, not online,
970    * not known to be dead either. it is simply not tracked by the
971    * master any more, for example, a very old previous instance).
972    */
973   public synchronized boolean isServerDead(ServerName serverName) {
974     return serverName == null || deadservers.isDeadServer(serverName)
975       || queuedDeadServers.contains(serverName)
976       || requeuedDeadServers.containsKey(serverName);
977   }
978 
979   public void shutdownCluster() {
980     this.clusterShutdown = true;
981     this.master.stop("Cluster shutdown requested");
982   }
983 
984   public boolean isClusterShutdown() {
985     return this.clusterShutdown;
986   }
987 
988   /**
989    * Stop the ServerManager.  Currently closes the connection to the master.
990    */
991   public void stop() {
992     if (connection != null) {
993       try {
994         connection.close();
995       } catch (IOException e) {
996         LOG.error("Attempt to close connection to master failed", e);
997       }
998     }
999   }
1000 
1001   /**
1002    * Creates a list of possible destinations for a region. It contains the online servers, but not
1003    *  the draining or dying servers.
1004    *  @param serverToExclude can be null if there is no server to exclude
1005    */
1006   public List<ServerName> createDestinationServersList(final ServerName serverToExclude){
1007     final List<ServerName> destServers = getOnlineServersList();
1008 
1009     if (serverToExclude != null){
1010       destServers.remove(serverToExclude);
1011     }
1012 
1013     // Loop through the draining server list and remove them from the server list
1014     final List<ServerName> drainingServersCopy = getDrainingServersList();
1015     if (!drainingServersCopy.isEmpty()) {
1016       for (final ServerName server: drainingServersCopy) {
1017         destServers.remove(server);
1018       }
1019     }
1020 
1021     // Remove the deadNotExpired servers from the server list.
1022     removeDeadNotExpiredServers(destServers);
1023     return destServers;
1024   }
1025 
1026   /**
1027    * Calls {@link #createDestinationServersList} without server to exclude.
1028    */
1029   public List<ServerName> createDestinationServersList(){
1030     return createDestinationServersList(null);
1031   }
1032 
1033     /**
1034     * Loop through the deadNotExpired server list and remove them from the
1035     * servers.
1036     * This function should be used carefully outside of this class. You should use a high level
1037     *  method such as {@link #createDestinationServersList()} instead of managing you own list.
1038     */
1039   void removeDeadNotExpiredServers(List<ServerName> servers) {
1040     Set<ServerName> deadNotExpiredServersCopy = this.getDeadNotExpiredServers();
1041     if (!deadNotExpiredServersCopy.isEmpty()) {
1042       for (ServerName server : deadNotExpiredServersCopy) {
1043         LOG.debug("Removing dead but not expired server: " + server
1044           + " from eligible server pool.");
1045         servers.remove(server);
1046       }
1047     }
1048   }
1049 
1050   /**
1051    * To clear any dead server with same host name and port of any online server
1052    */
1053   void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
1054     for (ServerName serverName : getOnlineServersList()) {
1055       deadservers.cleanAllPreviousInstances(serverName);
1056     }
1057   }
1058 }