View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.net.InetAddress;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Map.Entry;
31  import java.util.Set;
32  import java.util.concurrent.ConcurrentHashMap;
33  import java.util.concurrent.ConcurrentNavigableMap;
34  import java.util.concurrent.ConcurrentSkipListMap;
35  import java.util.concurrent.CopyOnWriteArrayList;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.ClockOutOfSyncException;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HRegionInfo;
43  import org.apache.hadoop.hbase.NotServingRegionException;
44  import org.apache.hadoop.hbase.RegionLoad;
45  import org.apache.hadoop.hbase.Server;
46  import org.apache.hadoop.hbase.ServerLoad;
47  import org.apache.hadoop.hbase.ServerName;
48  import org.apache.hadoop.hbase.YouAreDeadException;
49  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
50  import org.apache.hadoop.hbase.classification.InterfaceAudience;
51  import org.apache.hadoop.hbase.client.ClusterConnection;
52  import org.apache.hadoop.hbase.client.ConnectionFactory;
53  import org.apache.hadoop.hbase.client.RetriesExhaustedException;
54  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
55  import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler;
56  import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
57  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
58  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
59  import org.apache.hadoop.hbase.protobuf.RequestConverter;
60  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
61  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
62  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
63  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
64  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
65  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
66  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
67  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
68  import org.apache.hadoop.hbase.regionserver.HRegionServer;
69  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
70  import org.apache.hadoop.hbase.util.Bytes;
71  import org.apache.hadoop.hbase.util.Pair;
72  import org.apache.hadoop.hbase.util.RetryCounter;
73  import org.apache.hadoop.hbase.util.RetryCounterFactory;
74  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
75  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
76  import org.apache.zookeeper.KeeperException;
77  
78  import com.google.common.annotations.VisibleForTesting;
79  import com.google.protobuf.ByteString;
80  import com.google.protobuf.ServiceException;
81  
82  /**
83   * The ServerManager class manages info about region servers.
84   * <p>
85   * Maintains lists of online and dead servers.  Processes the startups,
86   * shutdowns, and deaths of region servers.
87   * <p>
88   * Servers are distinguished in two different ways.  A given server has a
89   * location, specified by hostname and port, and of which there can only be one
90   * online at any given time.  A server instance is specified by the location
91   * (hostname and port) as well as the startcode (timestamp from when the server
92   * was started).  This is used to differentiate a restarted instance of a given
93   * server from the original instance.
94   * <p>
95   * If a sever is known not to be running any more, it is called dead. The dead
96   * server needs to be handled by a ServerShutdownHandler.  If the handler is not
97   * enabled yet, the server can't be handled right away so it is queued up.
98   * After the handler is enabled, the server will be submitted to a handler to handle.
99   * However, the handler may be just partially enabled.  If so,
100  * the server cannot be fully processed, and be queued up for further processing.
101  * A server is fully processed only after the handler is fully enabled
102  * and has completed the handling.
103  */
104 @InterfaceAudience.Private
105 public class ServerManager {
106   public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
107       "hbase.master.wait.on.regionservers.maxtostart";
108 
109   public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
110       "hbase.master.wait.on.regionservers.mintostart";
111 
112   public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
113       "hbase.master.wait.on.regionservers.timeout";
114 
115   public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
116       "hbase.master.wait.on.regionservers.interval";
117 
118   private static final Log LOG = LogFactory.getLog(ServerManager.class);
119 
120   // Set if we are to shutdown the cluster.
121   private volatile boolean clusterShutdown = false;
122 
123   private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
124     new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
125 
126   private final ConcurrentNavigableMap<byte[], ConcurrentNavigableMap<byte[], Long>>
127     storeFlushedSequenceIdsByRegion =
128     new ConcurrentSkipListMap<byte[], ConcurrentNavigableMap<byte[], Long>>(Bytes.BYTES_COMPARATOR);
129 
130   /** Map of registered servers to their current load */
131   private final ConcurrentHashMap<ServerName, ServerLoad> onlineServers =
132     new ConcurrentHashMap<ServerName, ServerLoad>();
133 
134   /**
135    * Map of admin interfaces per registered regionserver; these interfaces we use to control
136    * regionservers out on the cluster
137    */
138   private final Map<ServerName, AdminService.BlockingInterface> rsAdmins =
139     new HashMap<ServerName, AdminService.BlockingInterface>();
140 
141   /**
142    * List of region servers <ServerName> that should not get any more new
143    * regions.
144    */
145   private final ArrayList<ServerName> drainingServers =
146     new ArrayList<ServerName>();
147 
148   private final Server master;
149   private final MasterServices services;
150   private final ClusterConnection connection;
151 
152   private final DeadServer deadservers = new DeadServer();
153 
154   private final long maxSkew;
155   private final long warningSkew;
156 
157   private final RetryCounterFactory pingRetryCounterFactory;
158 
159   /**
160    * Set of region servers which are dead but not processed immediately. If one
161    * server died before master enables ServerShutdownHandler, the server will be
162    * added to this set and will be processed through calling
163    * {@link ServerManager#processQueuedDeadServers()} by master.
164    * <p>
165    * A dead server is a server instance known to be dead, not listed in the /hbase/rs
166    * znode any more. It may have not been submitted to ServerShutdownHandler yet
167    * because the handler is not enabled.
168    * <p>
169    * A dead server, which has been submitted to ServerShutdownHandler while the
170    * handler is not enabled, is queued up.
171    * <p>
172    * So this is a set of region servers known to be dead but not submitted to
173    * ServerShutdownHandler for processing yet.
174    */
175   private Set<ServerName> queuedDeadServers = new HashSet<ServerName>();
176 
177   /**
178    * Set of region servers which are dead and submitted to ServerShutdownHandler to process but not
179    * fully processed immediately.
180    * <p>
181    * If one server died before assignment manager finished the failover cleanup, the server will be
182    * added to this set and will be processed through calling
183    * {@link ServerManager#processQueuedDeadServers()} by assignment manager.
184    * <p>
185    * The Boolean value indicates whether log split is needed inside ServerShutdownHandler
186    * <p>
187    * ServerShutdownHandler processes a dead server submitted to the handler after the handler is
188    * enabled. It may not be able to complete the processing because meta is not yet online or master
189    * is currently in startup mode. In this case, the dead server will be parked in this set
190    * temporarily.
191    */
192   private Map<ServerName, Boolean> requeuedDeadServers
193     = new ConcurrentHashMap<ServerName, Boolean>();
194 
195   /** Listeners that are called on server events. */
196   private List<ServerListener> listeners = new CopyOnWriteArrayList<ServerListener>();
197 
198   /**
199    * Constructor.
200    * @param master
201    * @param services
202    * @throws ZooKeeperConnectionException
203    */
204   public ServerManager(final Server master, final MasterServices services)
205       throws IOException {
206     this(master, services, true);
207   }
208 
209   ServerManager(final Server master, final MasterServices services,
210       final boolean connect) throws IOException {
211     this.master = master;
212     this.services = services;
213     Configuration c = master.getConfiguration();
214     maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
215     warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
216     this.connection = connect ? (ClusterConnection)ConnectionFactory.createConnection(c) : null;
217     int pingMaxAttempts = Math.max(1, master.getConfiguration().getInt(
218       "hbase.master.maximum.ping.server.attempts", 10));
219     int pingSleepInterval = Math.max(1, master.getConfiguration().getInt(
220       "hbase.master.ping.server.retry.sleep.interval", 100));
221     this.pingRetryCounterFactory = new RetryCounterFactory(pingMaxAttempts, pingSleepInterval);
222   }
223 
224   /**
225    * Add the listener to the notification list.
226    * @param listener The ServerListener to register
227    */
228   public void registerListener(final ServerListener listener) {
229     this.listeners.add(listener);
230   }
231 
232   /**
233    * Remove the listener from the notification list.
234    * @param listener The ServerListener to unregister
235    */
236   public boolean unregisterListener(final ServerListener listener) {
237     return this.listeners.remove(listener);
238   }
239 
240   /**
241    * Let the server manager know a new regionserver has come online
242    * @param ia The remote address
243    * @param port The remote port
244    * @param serverStartcode
245    * @param serverCurrentTime The current time of the region server in ms
246    * @return The ServerName we know this server as.
247    * @throws IOException
248    */
249   ServerName regionServerStartup(final InetAddress ia, final int port,
250     final long serverStartcode, long serverCurrentTime)
251   throws IOException {
252     // Test for case where we get a region startup message from a regionserver
253     // that has been quickly restarted but whose znode expiration handler has
254     // not yet run, or from a server whose fail we are currently processing.
255     // Test its host+port combo is present in serverAddresstoServerInfo.  If it
256     // is, reject the server and trigger its expiration. The next time it comes
257     // in, it should have been removed from serverAddressToServerInfo and queued
258     // for processing by ProcessServerShutdown.
259     ServerName sn = ServerName.valueOf(ia.getHostName(), port, serverStartcode);
260     checkClockSkew(sn, serverCurrentTime);
261     checkIsDead(sn, "STARTUP");
262     if (!checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
263       LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup"
264         + " could not record the server: " + sn);
265     }
266     return sn;
267   }
268 
269   private ConcurrentNavigableMap<byte[], Long> getOrCreateStoreFlushedSequenceId(
270     byte[] regionName) {
271     ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
272         storeFlushedSequenceIdsByRegion.get(regionName);
273     if (storeFlushedSequenceId != null) {
274       return storeFlushedSequenceId;
275     }
276     storeFlushedSequenceId = new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
277     ConcurrentNavigableMap<byte[], Long> alreadyPut =
278         storeFlushedSequenceIdsByRegion.putIfAbsent(regionName, storeFlushedSequenceId);
279     return alreadyPut == null ? storeFlushedSequenceId : alreadyPut;
280   }
281   /**
282    * Updates last flushed sequence Ids for the regions on server sn
283    * @param sn
284    * @param hsl
285    */
286   private void updateLastFlushedSequenceIds(ServerName sn, ServerLoad hsl) {
287     Map<byte[], RegionLoad> regionsLoad = hsl.getRegionsLoad();
288     for (Entry<byte[], RegionLoad> entry : regionsLoad.entrySet()) {
289       byte[] encodedRegionName = Bytes.toBytes(HRegionInfo.encodeRegionName(entry.getKey()));
290       Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
291       long l = entry.getValue().getCompleteSequenceId();
292       // Don't let smaller sequence ids override greater sequence ids.
293       if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
294         flushedSequenceIdByRegion.put(encodedRegionName, l);
295       } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
296         LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id ("
297             + l + ") that is less than the previous last flushed sequence id ("
298             + existingValue + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
299       }
300       ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
301           getOrCreateStoreFlushedSequenceId(encodedRegionName);
302       for (StoreSequenceId storeSeqId : entry.getValue().getStoreCompleteSequenceId()) {
303         byte[] family = storeSeqId.getFamilyName().toByteArray();
304         existingValue = storeFlushedSequenceId.get(family);
305         l = storeSeqId.getSequenceId();
306         // Don't let smaller sequence ids override greater sequence ids.
307         if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
308           storeFlushedSequenceId.put(family, l);
309         }
310       }
311     }
312   }
313 
314   void regionServerReport(ServerName sn,
315       ServerLoad sl) throws YouAreDeadException {
316     checkIsDead(sn, "REPORT");
317     if (null == this.onlineServers.replace(sn, sl)) {
318       // Already have this host+port combo and its just different start code?
319       // Just let the server in. Presume master joining a running cluster.
320       // recordNewServer is what happens at the end of reportServerStartup.
321       // The only thing we are skipping is passing back to the regionserver
322       // the ServerName to use. Here we presume a master has already done
323       // that so we'll press on with whatever it gave us for ServerName.
324       if (!checkAndRecordNewServer(sn, sl)) {
325         LOG.info("RegionServerReport ignored, could not record the server: " + sn);
326         return; // Not recorded, so no need to move on
327       }
328     }
329     updateLastFlushedSequenceIds(sn, sl);
330   }
331 
332   /**
333    * Check is a server of same host and port already exists,
334    * if not, or the existed one got a smaller start code, record it.
335    *
336    * @param serverName the server to check and record
337    * @param sl the server load on the server
338    * @return true if the server is recorded, otherwise, false
339    */
340   boolean checkAndRecordNewServer(
341       final ServerName serverName, final ServerLoad sl) {
342     ServerName existingServer = null;
343     synchronized (this.onlineServers) {
344       existingServer = findServerWithSameHostnamePortWithLock(serverName);
345       if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
346         LOG.info("Server serverName=" + serverName + " rejected; we already have "
347             + existingServer.toString() + " registered with same hostname and port");
348         return false;
349       }
350       recordNewServerWithLock(serverName, sl);
351     }
352 
353     // Tell our listeners that a server was added
354     if (!this.listeners.isEmpty()) {
355       for (ServerListener listener : this.listeners) {
356         listener.serverAdded(serverName);
357       }
358     }
359 
360     // Note that we assume that same ts means same server, and don't expire in that case.
361     //  TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky.
362     if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
363       LOG.info("Triggering server recovery; existingServer " +
364           existingServer + " looks stale, new server:" + serverName);
365       expireServer(existingServer);
366     }
367     return true;
368   }
369 
370   /**
371    * Checks if the clock skew between the server and the master. If the clock skew exceeds the
372    * configured max, it will throw an exception; if it exceeds the configured warning threshold,
373    * it will log a warning but start normally.
374    * @param serverName Incoming servers's name
375    * @param serverCurrentTime
376    * @throws ClockOutOfSyncException if the skew exceeds the configured max value
377    */
378   private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
379   throws ClockOutOfSyncException {
380     long skew = Math.abs(System.currentTimeMillis() - serverCurrentTime);
381     if (skew > maxSkew) {
382       String message = "Server " + serverName + " has been " +
383         "rejected; Reported time is too far out of sync with master.  " +
384         "Time difference of " + skew + "ms > max allowed of " + maxSkew + "ms";
385       LOG.warn(message);
386       throw new ClockOutOfSyncException(message);
387     } else if (skew > warningSkew){
388       String message = "Reported time for server " + serverName + " is out of sync with master " +
389         "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " +
390         "error threshold is " + maxSkew + "ms)";
391       LOG.warn(message);
392     }
393   }
394 
395   /**
396    * If this server is on the dead list, reject it with a YouAreDeadException.
397    * If it was dead but came back with a new start code, remove the old entry
398    * from the dead list.
399    * @param serverName
400    * @param what START or REPORT
401    * @throws org.apache.hadoop.hbase.YouAreDeadException
402    */
403   private void checkIsDead(final ServerName serverName, final String what)
404       throws YouAreDeadException {
405     if (this.deadservers.isDeadServer(serverName)) {
406       // host name, port and start code all match with existing one of the
407       // dead servers. So, this server must be dead.
408       String message = "Server " + what + " rejected; currently processing " +
409           serverName + " as dead server";
410       LOG.debug(message);
411       throw new YouAreDeadException(message);
412     }
413     // remove dead server with same hostname and port of newly checking in rs after master
414     // initialization.See HBASE-5916 for more information.
415     if ((this.services == null || ((HMaster) this.services).isInitialized())
416         && this.deadservers.cleanPreviousInstance(serverName)) {
417       // This server has now become alive after we marked it as dead.
418       // We removed it's previous entry from the dead list to reflect it.
419       LOG.debug(what + ":" + " Server " + serverName + " came back up," +
420           " removed it from the dead servers list");
421     }
422   }
423 
424   /**
425    * Assumes onlineServers is locked.
426    * @return ServerName with matching hostname and port.
427    */
428   private ServerName findServerWithSameHostnamePortWithLock(
429       final ServerName serverName) {
430     for (ServerName sn: this.onlineServers.keySet()) {
431       if (ServerName.isSameHostnameAndPort(serverName, sn)) return sn;
432     }
433     return null;
434   }
435 
436   /**
437    * Adds the onlineServers list. onlineServers should be locked.
438    * @param serverName The remote servers name.
439    * @param sl
440    * @return Server load from the removed server, if any.
441    */
442   @VisibleForTesting
443   void recordNewServerWithLock(final ServerName serverName, final ServerLoad sl) {
444     LOG.info("Registering server=" + serverName);
445     this.onlineServers.put(serverName, sl);
446     this.rsAdmins.remove(serverName);
447   }
448 
449   public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
450     RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
451     Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
452     builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM);
453     Map<byte[], Long> storeFlushedSequenceId =
454         storeFlushedSequenceIdsByRegion.get(encodedRegionName);
455     if (storeFlushedSequenceId != null) {
456       for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) {
457         builder.addStoreSequenceId(StoreSequenceId.newBuilder()
458             .setFamilyName(ByteString.copyFrom(entry.getKey()))
459             .setSequenceId(entry.getValue().longValue()).build());
460       }
461     }
462     return builder.build();
463   }
464 
465   /**
466    * @param serverName
467    * @return ServerLoad if serverName is known else null
468    */
469   public ServerLoad getLoad(final ServerName serverName) {
470     return this.onlineServers.get(serverName);
471   }
472 
473   /**
474    * Compute the average load across all region servers.
475    * Currently, this uses a very naive computation - just uses the number of
476    * regions being served, ignoring stats about number of requests.
477    * @return the average load
478    */
479   public double getAverageLoad() {
480     int totalLoad = 0;
481     int numServers = 0;
482     for (ServerLoad sl: this.onlineServers.values()) {
483         numServers++;
484         totalLoad += sl.getNumberOfRegions();
485     }
486     return numServers == 0 ? 0 :
487       (double)totalLoad / (double)numServers;
488   }
489 
490   /** @return the count of active regionservers */
491   public int countOfRegionServers() {
492     // Presumes onlineServers is a concurrent map
493     return this.onlineServers.size();
494   }
495 
496   /**
497    * @return Read-only map of servers to serverinfo
498    */
499   public Map<ServerName, ServerLoad> getOnlineServers() {
500     // Presumption is that iterating the returned Map is OK.
501     synchronized (this.onlineServers) {
502       return Collections.unmodifiableMap(this.onlineServers);
503     }
504   }
505 
506 
507   public DeadServer getDeadServers() {
508     return this.deadservers;
509   }
510 
511   /**
512    * Checks if any dead servers are currently in progress.
513    * @return true if any RS are being processed as dead, false if not
514    */
515   public boolean areDeadServersInProgress() {
516     return this.deadservers.areDeadServersInProgress();
517   }
518 
519   void letRegionServersShutdown() {
520     long previousLogTime = 0;
521     ServerName sn = master.getServerName();
522     ZooKeeperWatcher zkw = master.getZooKeeper();
523     int onlineServersCt;
524     while ((onlineServersCt = onlineServers.size()) > 0){
525 
526       if (System.currentTimeMillis() > (previousLogTime + 1000)) {
527         Set<ServerName> remainingServers = onlineServers.keySet();
528         synchronized (onlineServers) {
529           if (remainingServers.size() == 1 && remainingServers.contains(sn)) {
530             // Master will delete itself later.
531             return;
532           }
533         }
534         StringBuilder sb = new StringBuilder();
535         // It's ok here to not sync on onlineServers - merely logging
536         for (ServerName key : remainingServers) {
537           if (sb.length() > 0) {
538             sb.append(", ");
539           }
540           sb.append(key);
541         }
542         LOG.info("Waiting on regionserver(s) to go down " + sb.toString());
543         previousLogTime = System.currentTimeMillis();
544       }
545 
546       try {
547         List<String> servers = ZKUtil.listChildrenNoWatch(zkw, zkw.rsZNode);
548         if (servers == null || servers.size() == 0 || (servers.size() == 1
549             && servers.contains(sn.toString()))) {
550           LOG.info("ZK shows there is only the master self online, exiting now");
551           // Master could have lost some ZK events, no need to wait more.
552           break;
553         }
554       } catch (KeeperException ke) {
555         LOG.warn("Failed to list regionservers", ke);
556         // ZK is malfunctioning, don't hang here
557         break;
558       }
559       synchronized (onlineServers) {
560         try {
561           if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
562         } catch (InterruptedException ignored) {
563           // continue
564         }
565       }
566     }
567   }
568 
569   /*
570    * Expire the passed server.  Add it to list of dead servers and queue a
571    * shutdown processing.
572    */
573   public synchronized void expireServer(final ServerName serverName) {
574     if (serverName.equals(master.getServerName())) {
575       if (!(master.isAborted() || master.isStopped())) {
576         master.stop("We lost our znode?");
577       }
578       return;
579     }
580     if (!services.isServerShutdownHandlerEnabled()) {
581       LOG.info("Master doesn't enable ServerShutdownHandler during initialization, "
582           + "delay expiring server " + serverName);
583       this.queuedDeadServers.add(serverName);
584       return;
585     }
586     if (this.deadservers.isDeadServer(serverName)) {
587       // TODO: Can this happen?  It shouldn't be online in this case?
588       LOG.warn("Expiration of " + serverName +
589           " but server shutdown already in progress");
590       return;
591     }
592     synchronized (onlineServers) {
593       if (!this.onlineServers.containsKey(serverName)) {
594         LOG.warn("Expiration of " + serverName + " but server not online");
595       }
596       // Remove the server from the known servers lists and update load info BUT
597       // add to deadservers first; do this so it'll show in dead servers list if
598       // not in online servers list.
599       this.deadservers.add(serverName);
600       this.onlineServers.remove(serverName);
601       onlineServers.notifyAll();
602     }
603     this.rsAdmins.remove(serverName);
604     // If cluster is going down, yes, servers are going to be expiring; don't
605     // process as a dead server
606     if (this.clusterShutdown) {
607       LOG.info("Cluster shutdown set; " + serverName +
608         " expired; onlineServers=" + this.onlineServers.size());
609       if (this.onlineServers.isEmpty()) {
610         master.stop("Cluster shutdown set; onlineServer=0");
611       }
612       return;
613     }
614 
615     boolean carryingMeta = services.getAssignmentManager().isCarryingMeta(serverName);
616     if (carryingMeta) {
617       this.services.getExecutorService().submit(new MetaServerShutdownHandler(this.master,
618         this.services, this.deadservers, serverName));
619     } else {
620       this.services.getExecutorService().submit(new ServerShutdownHandler(this.master,
621         this.services, this.deadservers, serverName, true));
622     }
623     LOG.debug("Added=" + serverName +
624       " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
625 
626     // Tell our listeners that a server was removed
627     if (!this.listeners.isEmpty()) {
628       for (ServerListener listener : this.listeners) {
629         listener.serverRemoved(serverName);
630       }
631     }
632   }
633 
634   public synchronized void processDeadServer(final ServerName serverName) {
635     this.processDeadServer(serverName, false);
636   }
637 
638   public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitWal) {
639     // When assignment manager is cleaning up the zookeeper nodes and rebuilding the
640     // in-memory region states, region servers could be down. Meta table can and
641     // should be re-assigned, log splitting can be done too. However, it is better to
642     // wait till the cleanup is done before re-assigning user regions.
643     //
644     // We should not wait in the server shutdown handler thread since it can clog
645     // the handler threads and meta table could not be re-assigned in case
646     // the corresponding server is down. So we queue them up here instead.
647     if (!services.getAssignmentManager().isFailoverCleanupDone()) {
648       requeuedDeadServers.put(serverName, shouldSplitWal);
649       return;
650     }
651 
652     this.deadservers.add(serverName);
653     this.services.getExecutorService().submit(
654       new ServerShutdownHandler(this.master, this.services, this.deadservers, serverName,
655           shouldSplitWal));
656   }
657 
658   /**
659    * Process the servers which died during master's initialization. It will be
660    * called after HMaster#assignMeta and AssignmentManager#joinCluster.
661    * */
662   synchronized void processQueuedDeadServers() {
663     if (!services.isServerShutdownHandlerEnabled()) {
664       LOG.info("Master hasn't enabled ServerShutdownHandler");
665     }
666     Iterator<ServerName> serverIterator = queuedDeadServers.iterator();
667     while (serverIterator.hasNext()) {
668       ServerName tmpServerName = serverIterator.next();
669       expireServer(tmpServerName);
670       serverIterator.remove();
671       requeuedDeadServers.remove(tmpServerName);
672     }
673 
674     if (!services.getAssignmentManager().isFailoverCleanupDone()) {
675       LOG.info("AssignmentManager hasn't finished failover cleanup; waiting");
676     }
677 
678     for(ServerName tmpServerName : requeuedDeadServers.keySet()){
679       processDeadServer(tmpServerName, requeuedDeadServers.get(tmpServerName));
680     }
681     requeuedDeadServers.clear();
682   }
683 
684   /*
685    * Remove the server from the drain list.
686    */
687   public boolean removeServerFromDrainList(final ServerName sn) {
688     // Warn if the server (sn) is not online.  ServerName is of the form:
689     // <hostname> , <port> , <startcode>
690 
691     if (!this.isServerOnline(sn)) {
692       LOG.warn("Server " + sn + " is not currently online. " +
693                "Removing from draining list anyway, as requested.");
694     }
695     // Remove the server from the draining servers lists.
696     return this.drainingServers.remove(sn);
697   }
698 
699   /*
700    * Add the server to the drain list.
701    */
702   public boolean addServerToDrainList(final ServerName sn) {
703     // Warn if the server (sn) is not online.  ServerName is of the form:
704     // <hostname> , <port> , <startcode>
705 
706     if (!this.isServerOnline(sn)) {
707       LOG.warn("Server " + sn + " is not currently online. " +
708                "Ignoring request to add it to draining list.");
709       return false;
710     }
711     // Add the server to the draining servers lists, if it's not already in
712     // it.
713     if (this.drainingServers.contains(sn)) {
714       LOG.warn("Server " + sn + " is already in the draining server list." +
715                "Ignoring request to add it again.");
716       return false;
717     }
718     return this.drainingServers.add(sn);
719   }
720 
721   // RPC methods to region servers
722 
723   /**
724    * Sends an OPEN RPC to the specified server to open the specified region.
725    * <p>
726    * Open should not fail but can if server just crashed.
727    * <p>
728    * @param server server to open a region
729    * @param region region to open
730    * @param favoredNodes
731    */
732   public RegionOpeningState sendRegionOpen(final ServerName server,
733       HRegionInfo region, List<ServerName> favoredNodes)
734   throws IOException {
735     AdminService.BlockingInterface admin = getRsAdmin(server);
736     if (admin == null) {
737       throw new IOException("Attempting to send OPEN RPC to server " + server.toString() +
738         " failed because no RPC connection found to this server");
739     }
740     OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server,
741       region, favoredNodes,
742       (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
743     try {
744       OpenRegionResponse response = admin.openRegion(null, request);
745       return ResponseConverter.getRegionOpeningState(response);
746     } catch (ServiceException se) {
747       throw ProtobufUtil.getRemoteException(se);
748     }
749   }
750 
751   /**
752    * Sends an OPEN RPC to the specified server to open the specified region.
753    * <p>
754    * Open should not fail but can if server just crashed.
755    * <p>
756    * @param server server to open a region
757    * @param regionOpenInfos info of a list of regions to open
758    * @return a list of region opening states
759    */
760   public List<RegionOpeningState> sendRegionOpen(ServerName server,
761       List<Pair<HRegionInfo, List<ServerName>>> regionOpenInfos)
762   throws IOException {
763     AdminService.BlockingInterface admin = getRsAdmin(server);
764     if (admin == null) {
765       throw new IOException("Attempting to send OPEN RPC to server " + server.toString() +
766         " failed because no RPC connection found to this server");
767     }
768 
769     OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, regionOpenInfos,
770       (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
771     try {
772       OpenRegionResponse response = admin.openRegion(null, request);
773       return ResponseConverter.getRegionOpeningStateList(response);
774     } catch (ServiceException se) {
775       throw ProtobufUtil.getRemoteException(se);
776     }
777   }
778 
779   /**
780    * Sends an CLOSE RPC to the specified server to close the specified region.
781    * <p>
782    * A region server could reject the close request because it either does not
783    * have the specified region or the region is being split.
784    * @param server server to open a region
785    * @param region region to open
786    * @param dest - if the region is moved to another server, the destination server. null otherwise.
787    * @throws IOException
788    */
789   public boolean sendRegionClose(ServerName server, HRegionInfo region,
790       ServerName dest) throws IOException {
791     if (server == null) throw new NullPointerException("Passed server is null");
792     AdminService.BlockingInterface admin = getRsAdmin(server);
793     if (admin == null) {
794       throw new IOException("Attempting to send CLOSE RPC to server " +
795         server.toString() + " for region " +
796         region.getRegionNameAsString() +
797         " failed because no RPC connection found to this server");
798     }
799     return ProtobufUtil.closeRegion(admin, server, region.getRegionName(),
800       dest);
801   }
802 
803   public boolean sendRegionClose(ServerName server,
804       HRegionInfo region) throws IOException {
805     return sendRegionClose(server, region, null);
806   }
807 
808   /**
809    * Contacts a region server and waits up to timeout ms
810    * to close the region.  This bypasses the active hmaster.
811    */
812   public static void closeRegionSilentlyAndWait(ClusterConnection connection, 
813     ServerName server, HRegionInfo region, long timeout) throws IOException, InterruptedException {
814     AdminService.BlockingInterface rs = connection.getAdmin(server);
815     try {
816       ProtobufUtil.closeRegion(rs, server, region.getRegionName());
817     } catch (IOException e) {
818       LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e);
819     }
820     long expiration = timeout + System.currentTimeMillis();
821     while (System.currentTimeMillis() < expiration) {
822       try {
823         HRegionInfo rsRegion =
824           ProtobufUtil.getRegionInfo(rs, region.getRegionName());
825         if (rsRegion == null) return;
826       } catch (IOException ioe) {
827         if (ioe instanceof NotServingRegionException) // no need to retry again
828           return;
829         LOG.warn("Exception when retrieving regioninfo from: " + region.getRegionNameAsString(), ioe);
830       }
831       Thread.sleep(1000);
832     }
833     throw new IOException("Region " + region + " failed to close within"
834         + " timeout " + timeout);
835   }
836 
837   /**
838    * Sends an MERGE REGIONS RPC to the specified server to merge the specified
839    * regions.
840    * <p>
841    * A region server could reject the close request because it either does not
842    * have the specified region.
843    * @param server server to merge regions
844    * @param region_a region to merge
845    * @param region_b region to merge
846    * @param forcible true if do a compulsory merge, otherwise we will only merge
847    *          two adjacent regions
848    * @throws IOException
849    */
850   public void sendRegionsMerge(ServerName server, HRegionInfo region_a,
851       HRegionInfo region_b, boolean forcible) throws IOException {
852     if (server == null)
853       throw new NullPointerException("Passed server is null");
854     if (region_a == null || region_b == null)
855       throw new NullPointerException("Passed region is null");
856     AdminService.BlockingInterface admin = getRsAdmin(server);
857     if (admin == null) {
858       throw new IOException("Attempting to send MERGE REGIONS RPC to server "
859           + server.toString() + " for region "
860           + region_a.getRegionNameAsString() + ","
861           + region_b.getRegionNameAsString()
862           + " failed because no RPC connection found to this server");
863     }
864     ProtobufUtil.mergeRegions(admin, region_a, region_b, forcible);
865   }
866 
867   /**
868    * Check if a region server is reachable and has the expected start code
869    */
870   public boolean isServerReachable(ServerName server) {
871     if (server == null) throw new NullPointerException("Passed server is null");
872 
873     RetryCounter retryCounter = pingRetryCounterFactory.create();
874     while (retryCounter.shouldRetry()) {
875       try {
876         AdminService.BlockingInterface admin = getRsAdmin(server);
877         if (admin != null) {
878           ServerInfo info = ProtobufUtil.getServerInfo(admin);
879           return info != null && info.hasServerName()
880             && server.getStartcode() == info.getServerName().getStartCode();
881         }
882       } catch (IOException ioe) {
883         LOG.debug("Couldn't reach " + server + ", try=" + retryCounter.getAttemptTimes()
884           + " of " + retryCounter.getMaxAttempts(), ioe);
885         try {
886           retryCounter.sleepUntilNextRetry();
887         } catch(InterruptedException ie) {
888           Thread.currentThread().interrupt();
889         }
890       }
891     }
892     return false;
893   }
894 
895     /**
896     * @param sn
897     * @return Admin interface for the remote regionserver named <code>sn</code>
898     * @throws IOException
899     * @throws RetriesExhaustedException wrapping a ConnectException if failed
900     */
901   private AdminService.BlockingInterface getRsAdmin(final ServerName sn)
902   throws IOException {
903     AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
904     if (admin == null) {
905       LOG.debug("New admin connection to " + sn.toString());
906       if (sn.equals(master.getServerName()) && master instanceof HRegionServer) {
907         // A master is also a region server now, see HBASE-10569 for details
908         admin = ((HRegionServer)master).getRSRpcServices();
909       } else {
910         admin = this.connection.getAdmin(sn);
911       }
912       this.rsAdmins.put(sn, admin);
913     }
914     return admin;
915   }
916 
917   /**
918    * Wait for the region servers to report in.
919    * We will wait until one of this condition is met:
920    *  - the master is stopped
921    *  - the 'hbase.master.wait.on.regionservers.maxtostart' number of
922    *    region servers is reached
923    *  - the 'hbase.master.wait.on.regionservers.mintostart' is reached AND
924    *   there have been no new region server in for
925    *      'hbase.master.wait.on.regionservers.interval' time AND
926    *   the 'hbase.master.wait.on.regionservers.timeout' is reached
927    *
928    * @throws InterruptedException
929    */
930   public void waitForRegionServers(MonitoredTask status)
931   throws InterruptedException {
932     final long interval = this.master.getConfiguration().
933       getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
934     final long timeout = this.master.getConfiguration().
935       getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
936     int defaultMinToStart = 1;
937     if (BaseLoadBalancer.tablesOnMaster(master.getConfiguration())) {
938       // If we assign regions to master, we'd like to start
939       // at least another region server so that we don't
940       // assign all regions to master if other region servers
941       // don't come up in time.
942       defaultMinToStart = 2;
943     }
944     int minToStart = this.master.getConfiguration().
945       getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, defaultMinToStart);
946     if (minToStart < 1) {
947       LOG.warn(String.format(
948         "The value of '%s' (%d) can not be less than 1, ignoring.",
949         WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
950       minToStart = 1;
951     }
952     int maxToStart = this.master.getConfiguration().
953       getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
954     if (maxToStart < minToStart) {
955         LOG.warn(String.format(
956             "The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
957             WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart,
958             WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
959         maxToStart = Integer.MAX_VALUE;
960     }
961 
962     long now =  System.currentTimeMillis();
963     final long startTime = now;
964     long slept = 0;
965     long lastLogTime = 0;
966     long lastCountChange = startTime;
967     int count = countOfRegionServers();
968     int oldCount = 0;
969     while (!this.master.isStopped() && count < maxToStart
970         && (lastCountChange+interval > now || timeout > slept || count < minToStart)) {
971       // Log some info at every interval time or if there is a change
972       if (oldCount != count || lastLogTime+interval < now){
973         lastLogTime = now;
974         String msg =
975           "Waiting for region servers count to settle; currently"+
976             " checked in " + count + ", slept for " + slept + " ms," +
977             " expecting minimum of " + minToStart + ", maximum of "+ maxToStart+
978             ", timeout of "+timeout+" ms, interval of "+interval+" ms.";
979         LOG.info(msg);
980         status.setStatus(msg);
981       }
982 
983       // We sleep for some time
984       final long sleepTime = 50;
985       Thread.sleep(sleepTime);
986       now =  System.currentTimeMillis();
987       slept = now - startTime;
988 
989       oldCount = count;
990       count = countOfRegionServers();
991       if (count != oldCount) {
992         lastCountChange = now;
993       }
994     }
995 
996     LOG.info("Finished waiting for region servers count to settle;" +
997       " checked in " + count + ", slept for " + slept + " ms," +
998       " expecting minimum of " + minToStart + ", maximum of "+ maxToStart+","+
999       " master is "+ (this.master.isStopped() ? "stopped.": "running")
1000     );
1001   }
1002 
1003   /**
1004    * @return A copy of the internal list of online servers.
1005    */
1006   public List<ServerName> getOnlineServersList() {
1007     // TODO: optimize the load balancer call so we don't need to make a new list
1008     // TODO: FIX. THIS IS POPULAR CALL.
1009     return new ArrayList<ServerName>(this.onlineServers.keySet());
1010   }
1011 
1012   /**
1013    * @return A copy of the internal list of draining servers.
1014    */
1015   public List<ServerName> getDrainingServersList() {
1016     return new ArrayList<ServerName>(this.drainingServers);
1017   }
1018 
1019   /**
1020    * @return A copy of the internal set of deadNotExpired servers.
1021    */
1022   Set<ServerName> getDeadNotExpiredServers() {
1023     return new HashSet<ServerName>(this.queuedDeadServers);
1024   }
1025 
1026   /**
1027    * During startup, if we figure it is not a failover, i.e. there is
1028    * no more WAL files to split, we won't try to recover these dead servers.
1029    * So we just remove them from the queue. Use caution in calling this.
1030    */
1031   void removeRequeuedDeadServers() {
1032     requeuedDeadServers.clear();
1033   }
1034 
1035   /**
1036    * @return A copy of the internal map of requeuedDeadServers servers and their corresponding
1037    *         splitlog need flag.
1038    */
1039   Map<ServerName, Boolean> getRequeuedDeadServers() {
1040     return Collections.unmodifiableMap(this.requeuedDeadServers);
1041   }
1042 
1043   public boolean isServerOnline(ServerName serverName) {
1044     return serverName != null && onlineServers.containsKey(serverName);
1045   }
1046 
1047   /**
1048    * Check if a server is known to be dead.  A server can be online,
1049    * or known to be dead, or unknown to this manager (i.e, not online,
1050    * not known to be dead either. it is simply not tracked by the
1051    * master any more, for example, a very old previous instance).
1052    */
1053   public synchronized boolean isServerDead(ServerName serverName) {
1054     return serverName == null || deadservers.isDeadServer(serverName)
1055       || queuedDeadServers.contains(serverName)
1056       || requeuedDeadServers.containsKey(serverName);
1057   }
1058 
1059   public void shutdownCluster() {
1060     this.clusterShutdown = true;
1061     this.master.stop("Cluster shutdown requested");
1062   }
1063 
1064   public boolean isClusterShutdown() {
1065     return this.clusterShutdown;
1066   }
1067 
1068   /**
1069    * Stop the ServerManager.  Currently closes the connection to the master.
1070    */
1071   public void stop() {
1072     if (connection != null) {
1073       try {
1074         connection.close();
1075       } catch (IOException e) {
1076         LOG.error("Attempt to close connection to master failed", e);
1077       }
1078     }
1079   }
1080 
1081   /**
1082    * Creates a list of possible destinations for a region. It contains the online servers, but not
1083    *  the draining or dying servers.
1084    *  @param serverToExclude can be null if there is no server to exclude
1085    */
1086   public List<ServerName> createDestinationServersList(final ServerName serverToExclude){
1087     final List<ServerName> destServers = getOnlineServersList();
1088 
1089     if (serverToExclude != null){
1090       destServers.remove(serverToExclude);
1091     }
1092 
1093     // Loop through the draining server list and remove them from the server list
1094     final List<ServerName> drainingServersCopy = getDrainingServersList();
1095     if (!drainingServersCopy.isEmpty()) {
1096       for (final ServerName server: drainingServersCopy) {
1097         destServers.remove(server);
1098       }
1099     }
1100 
1101     // Remove the deadNotExpired servers from the server list.
1102     removeDeadNotExpiredServers(destServers);
1103     return destServers;
1104   }
1105 
1106   /**
1107    * Calls {@link #createDestinationServersList} without server to exclude.
1108    */
1109   public List<ServerName> createDestinationServersList(){
1110     return createDestinationServersList(null);
1111   }
1112 
1113     /**
1114     * Loop through the deadNotExpired server list and remove them from the
1115     * servers.
1116     * This function should be used carefully outside of this class. You should use a high level
1117     *  method such as {@link #createDestinationServersList()} instead of managing you own list.
1118     */
1119   void removeDeadNotExpiredServers(List<ServerName> servers) {
1120     Set<ServerName> deadNotExpiredServersCopy = this.getDeadNotExpiredServers();
1121     if (!deadNotExpiredServersCopy.isEmpty()) {
1122       for (ServerName server : deadNotExpiredServersCopy) {
1123         LOG.debug("Removing dead but not expired server: " + server
1124           + " from eligible server pool.");
1125         servers.remove(server);
1126       }
1127     }
1128   }
1129 
1130   /**
1131    * To clear any dead server with same host name and port of any online server
1132    */
1133   void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
1134     for (ServerName serverName : getOnlineServersList()) {
1135       deadservers.cleanAllPreviousInstances(serverName);
1136     }
1137   }
1138 }