View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.lang.Thread.UncaughtExceptionHandler;
24  import java.lang.management.ManagementFactory;
25  import java.lang.management.MemoryUsage;
26  import java.lang.reflect.Constructor;
27  import java.net.BindException;
28  import java.net.InetAddress;
29  import java.net.InetSocketAddress;
30  import java.security.PrivilegedExceptionAction;
31  import java.util.ArrayList;
32  import java.util.Collection;
33  import java.util.Collections;
34  import java.util.Comparator;
35  import java.util.HashMap;
36  import java.util.HashSet;
37  import java.util.Iterator;
38  import java.util.List;
39  import java.util.Map;
40  import java.util.Map.Entry;
41  import java.util.Set;
42  import java.util.SortedMap;
43  import java.util.TreeMap;
44  import java.util.TreeSet;
45  import java.util.concurrent.ConcurrentHashMap;
46  import java.util.concurrent.ConcurrentMap;
47  import java.util.concurrent.ConcurrentSkipListMap;
48  import java.util.concurrent.atomic.AtomicBoolean;
49  import java.util.concurrent.atomic.AtomicReference;
50  import java.util.concurrent.locks.ReentrantReadWriteLock;
51  
52  import javax.management.MalformedObjectNameException;
53  import javax.management.ObjectName;
54  import javax.servlet.http.HttpServlet;
55  
56  import org.apache.commons.lang.SystemUtils;
57  import org.apache.commons.lang.math.RandomUtils;
58  import org.apache.commons.logging.Log;
59  import org.apache.commons.logging.LogFactory;
60  import org.apache.hadoop.conf.Configuration;
61  import org.apache.hadoop.fs.FileSystem;
62  import org.apache.hadoop.fs.Path;
63  import org.apache.hadoop.hbase.ChoreService;
64  import org.apache.hadoop.hbase.ClockOutOfSyncException;
65  import org.apache.hadoop.hbase.CoordinatedStateManager;
66  import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
67  import org.apache.hadoop.hbase.HBaseConfiguration;
68  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
69  import org.apache.hadoop.hbase.HConstants;
70  import org.apache.hadoop.hbase.HRegionInfo;
71  import org.apache.hadoop.hbase.HealthCheckChore;
72  import org.apache.hadoop.hbase.MetaTableAccessor;
73  import org.apache.hadoop.hbase.NotServingRegionException;
74  import org.apache.hadoop.hbase.RemoteExceptionHandler;
75  import org.apache.hadoop.hbase.ScheduledChore;
76  import org.apache.hadoop.hbase.ServerName;
77  import org.apache.hadoop.hbase.Stoppable;
78  import org.apache.hadoop.hbase.TableDescriptors;
79  import org.apache.hadoop.hbase.TableName;
80  import org.apache.hadoop.hbase.YouAreDeadException;
81  import org.apache.hadoop.hbase.ZNodeClearer;
82  import org.apache.hadoop.hbase.classification.InterfaceAudience;
83  import org.apache.hadoop.hbase.client.ClusterConnection;
84  import org.apache.hadoop.hbase.client.ConnectionUtils;
85  import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
86  import org.apache.hadoop.hbase.conf.ConfigurationManager;
87  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
88  import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
89  import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
90  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
91  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
92  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
93  import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
94  import org.apache.hadoop.hbase.executor.ExecutorService;
95  import org.apache.hadoop.hbase.executor.ExecutorType;
96  import org.apache.hadoop.hbase.fs.HFileSystem;
97  import org.apache.hadoop.hbase.http.InfoServer;
98  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
99  import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
100 import org.apache.hadoop.hbase.ipc.RpcClient;
101 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
102 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
103 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
104 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
105 import org.apache.hadoop.hbase.ipc.ServerRpcController;
106 import org.apache.hadoop.hbase.master.HMaster;
107 import org.apache.hadoop.hbase.master.RegionState.State;
108 import org.apache.hadoop.hbase.master.TableLockManager;
109 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
110 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
111 import org.apache.hadoop.hbase.protobuf.RequestConverter;
112 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
113 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
114 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
115 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
116 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
117 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
118 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
119 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
120 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
121 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor.Builder;
122 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
123 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
124 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
125 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
126 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
127 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
128 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
129 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
130 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
131 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
132 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
133 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
134 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
135 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
136 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
137 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
138 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
139 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
140 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
141 import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
142 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
143 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
144 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
145 import org.apache.hadoop.hbase.security.Superusers;
146 import org.apache.hadoop.hbase.security.User;
147 import org.apache.hadoop.hbase.security.UserProvider;
148 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
149 import org.apache.hadoop.hbase.util.Addressing;
150 import org.apache.hadoop.hbase.util.ByteStringer;
151 import org.apache.hadoop.hbase.util.Bytes;
152 import org.apache.hadoop.hbase.util.CompressionTest;
153 import org.apache.hadoop.hbase.util.ConfigUtil;
154 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
155 import org.apache.hadoop.hbase.util.FSTableDescriptors;
156 import org.apache.hadoop.hbase.util.FSUtils;
157 import org.apache.hadoop.hbase.util.HasThread;
158 import org.apache.hadoop.hbase.util.JSONBean;
159 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
160 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
161 import org.apache.hadoop.hbase.util.Sleeper;
162 import org.apache.hadoop.hbase.util.Threads;
163 import org.apache.hadoop.hbase.util.VersionInfo;
164 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
165 import org.apache.hadoop.hbase.wal.WAL;
166 import org.apache.hadoop.hbase.wal.WALFactory;
167 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
168 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
169 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
170 import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
171 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
172 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
173 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
174 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
175 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
176 import org.apache.hadoop.ipc.RemoteException;
177 import org.apache.hadoop.metrics.util.MBeanUtil;
178 import org.apache.hadoop.util.ReflectionUtils;
179 import org.apache.hadoop.util.StringUtils;
180 import org.apache.zookeeper.KeeperException;
181 import org.apache.zookeeper.KeeperException.NoNodeException;
182 import org.apache.zookeeper.data.Stat;
183 
184 import com.google.common.annotations.VisibleForTesting;
185 import com.google.common.base.Preconditions;
186 import com.google.common.collect.Maps;
187 import com.google.protobuf.BlockingRpcChannel;
188 import com.google.protobuf.Descriptors;
189 import com.google.protobuf.Message;
190 import com.google.protobuf.RpcCallback;
191 import com.google.protobuf.RpcController;
192 import com.google.protobuf.Service;
193 import com.google.protobuf.ServiceException;
194 import sun.misc.Signal;
195 import sun.misc.SignalHandler;
196 
197 /**
198  * HRegionServer makes a set of HRegions available to clients. It checks in with
199  * the HMaster. There are many HRegionServers in a single HBase deployment.
200  */
201 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
202 @SuppressWarnings("deprecation")
203 public class HRegionServer extends HasThread implements
204     RegionServerServices, LastSequenceId {
205 
206   private static final Log LOG = LogFactory.getLog(HRegionServer.class);
207 
208   /*
209    * Strings to be used in forming the exception message for
210    * RegionsAlreadyInTransitionException.
211    */
212   protected static final String OPEN = "OPEN";
213   protected static final String CLOSE = "CLOSE";
214 
215   //RegionName vs current action in progress
216   //true - if open region action in progress
217   //false - if close region action in progress
218   protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
219     new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
220 
221   // Cache flushing
222   protected MemStoreFlusher cacheFlusher;
223 
224   protected HeapMemoryManager hMemManager;
225 
226   /**
227    * Cluster connection to be shared by services.
228    * Initialized at server startup and closed when server shuts down.
229    * Clients must never close it explicitly.
230    */
231   protected ClusterConnection clusterConnection;
232 
233   /*
234    * Long-living meta table locator, which is created when the server is started and stopped
235    * when server shuts down. References to this locator shall be used to perform according
236    * operations in EventHandlers. Primary reason for this decision is to make it mockable
237    * for tests.
238    */
239   protected MetaTableLocator metaTableLocator;
240 
241   // Watch if a region is out of recovering state from ZooKeeper
242   @SuppressWarnings("unused")
243   private RecoveringRegionWatcher recoveringRegionWatcher;
244 
245   /**
246    * Go here to get table descriptors.
247    */
248   protected TableDescriptors tableDescriptors;
249 
250   // Replication services. If no replication, this handler will be null.
251   protected ReplicationSourceService replicationSourceHandler;
252   protected ReplicationSinkService replicationSinkHandler;
253 
254   // Compactions
255   public CompactSplitThread compactSplitThread;
256 
257   /**
258    * Map of regions currently being served by this region server. Key is the
259    * encoded region name.  All access should be synchronized.
260    */
261   protected final Map<String, Region> onlineRegions = new ConcurrentHashMap<String, Region>();
262 
263   /**
264    * Map of encoded region names to the DataNode locations they should be hosted on
265    * We store the value as InetSocketAddress since this is used only in HDFS
266    * API (create() that takes favored nodes as hints for placing file blocks).
267    * We could have used ServerName here as the value class, but we'd need to
268    * convert it to InetSocketAddress at some point before the HDFS API call, and
269    * it seems a bit weird to store ServerName since ServerName refers to RegionServers
270    * and here we really mean DataNode locations.
271    */
272   protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
273       new ConcurrentHashMap<String, InetSocketAddress[]>();
274 
275   /**
276    * Set of regions currently being in recovering state which means it can accept writes(edits from
277    * previous failed region server) but not reads. A recovering region is also an online region.
278    */
279   protected final Map<String, Region> recoveringRegions = Collections
280       .synchronizedMap(new HashMap<String, Region>());
281 
282   // Leases
283   protected Leases leases;
284 
285   // Instance of the hbase executor service.
286   protected ExecutorService service;
287 
288   // If false, the file system has become unavailable
289   protected volatile boolean fsOk;
290   protected HFileSystem fs;
291 
292   // Set when a report to the master comes back with a message asking us to
293   // shutdown. Also set by call to stop when debugging or running unit tests
294   // of HRegionServer in isolation.
295   private volatile boolean stopped = false;
296 
297   // Go down hard. Used if file system becomes unavailable and also in
298   // debugging and unit tests.
299   private volatile boolean abortRequested;
300 
301   ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
302 
303   // A state before we go into stopped state.  At this stage we're closing user
304   // space regions.
305   private boolean stopping = false;
306 
307   private volatile boolean killed = false;
308 
309   protected final Configuration conf;
310 
311   private Path rootDir;
312 
313   protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
314 
315   final int numRetries;
316   protected final int threadWakeFrequency;
317   protected final int msgInterval;
318 
319   protected final int numRegionsToReport;
320 
321   // Stub to do region server status calls against the master.
322   private volatile RegionServerStatusService.BlockingInterface rssStub;
323   // RPC client. Used to make the stub above that does region server status checking.
324   RpcClient rpcClient;
325 
326   private RpcRetryingCallerFactory rpcRetryingCallerFactory;
327   private RpcControllerFactory rpcControllerFactory;
328 
329   private UncaughtExceptionHandler uncaughtExceptionHandler;
330 
331   // Info server. Default access so can be used by unit tests. REGIONSERVER
332   // is name of the webapp and the attribute name used stuffing this instance
333   // into web context.
334   protected InfoServer infoServer;
335   private JvmPauseMonitor pauseMonitor;
336 
337   /** region server process name */
338   public static final String REGIONSERVER = "regionserver";
339 
340   MetricsRegionServer metricsRegionServer;
341   private SpanReceiverHost spanReceiverHost;
342 
343   /**
344    * ChoreService used to schedule tasks that we want to run periodically
345    */
346   private final ChoreService choreService;
347 
348   /*
349    * Check for compactions requests.
350    */
351   ScheduledChore compactionChecker;
352 
353   /*
354    * Check for flushes
355    */
356   ScheduledChore periodicFlusher;
357 
358   protected volatile WALFactory walFactory;
359 
360   // WAL roller. log is protected rather than private to avoid
361   // eclipse warning when accessed by inner classes
362   final LogRoller walRoller;
363   // Lazily initialized if this RegionServer hosts a meta table.
364   final AtomicReference<LogRoller> metawalRoller = new AtomicReference<LogRoller>();
365 
366   // flag set after we're done setting up server threads
367   final AtomicBoolean online = new AtomicBoolean(false);
368 
369   // zookeeper connection and watcher
370   protected ZooKeeperWatcher zooKeeper;
371 
372   // master address tracker
373   private MasterAddressTracker masterAddressTracker;
374 
375   // Cluster Status Tracker
376   protected ClusterStatusTracker clusterStatusTracker;
377 
378   // Log Splitting Worker
379   private SplitLogWorker splitLogWorker;
380 
381   // A sleeper that sleeps for msgInterval.
382   protected final Sleeper sleeper;
383 
384   private final int operationTimeout;
385   private final int shortOperationTimeout;
386 
387   private final RegionServerAccounting regionServerAccounting;
388 
389   // Cache configuration and block cache reference
390   protected CacheConfig cacheConfig;
391 
392   /** The health check chore. */
393   private HealthCheckChore healthCheckChore;
394 
395   /** The nonce manager chore. */
396   private ScheduledChore nonceManagerChore;
397 
398   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
399 
400   /**
401    * The server name the Master sees us as.  Its made from the hostname the
402    * master passes us, port, and server startcode. Gets set after registration
403    * against  Master.
404    */
405   protected ServerName serverName;
406 
407   /*
408    * hostname specified by hostname config
409    */
410   protected String useThisHostnameInstead;
411 
412   // key to the config parameter of server hostname
413   // the specification of server hostname is optional. The hostname should be resolvable from
414   // both master and region server
415   @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
416   final static String RS_HOSTNAME_KEY = "hbase.regionserver.hostname";
417   @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
418   protected final static String MASTER_HOSTNAME_KEY = "hbase.master.hostname";
419 
420   /**
421    * This servers startcode.
422    */
423   protected final long startcode;
424 
425   /**
426    * Unique identifier for the cluster we are a part of.
427    */
428   private String clusterId;
429 
430   /**
431    * MX Bean for RegionServerInfo
432    */
433   private ObjectName mxBean = null;
434 
435   /**
436    * Chore to clean periodically the moved region list
437    */
438   private MovedRegionsCleaner movedRegionsCleaner;
439 
440   // chore for refreshing store files for secondary regions
441   private StorefileRefresherChore storefileRefresher;
442 
443   private RegionServerCoprocessorHost rsHost;
444 
445   private RegionServerProcedureManagerHost rspmHost;
446   
447   private RegionServerQuotaManager rsQuotaManager;
448 
449   // Table level lock manager for locking for region operations
450   protected TableLockManager tableLockManager;
451 
452   /**
453    * Nonce manager. Nonces are used to make operations like increment and append idempotent
454    * in the case where client doesn't receive the response from a successful operation and
455    * retries. We track the successful ops for some time via a nonce sent by client and handle
456    * duplicate operations (currently, by failing them; in future we might use MVCC to return
457    * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
458    * HBASE-3787) are:
459    * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
460    *   of past records. If we don't read the records, we don't read and recover the nonces.
461    *   Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
462    * - There's no WAL recovery during normal region move, so nonces will not be transfered.
463    * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
464    * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
465    * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
466    * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
467    * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
468    * latest nonce in it expired. It can also be recovered during move.
469    */
470   final ServerNonceManager nonceManager;
471 
472   private UserProvider userProvider;
473 
474   protected final RSRpcServices rpcServices;
475 
476   protected BaseCoordinatedStateManager csm;
477 
478   private final boolean useZKForAssignment;
479 
480   /**
481    * Configuration manager is used to register/deregister and notify the configuration observers
482    * when the regionserver is notified that there was a change in the on disk configs.
483    */
484   protected final ConfigurationManager configurationManager;
485 
486   /**
487    * Starts a HRegionServer at the default location.
488    */
489   public HRegionServer(Configuration conf) throws IOException, InterruptedException {
490     this(conf, CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
491   }
492 
493   /**
494    * Starts a HRegionServer at the default location
495    * @param csm implementation of CoordinatedStateManager to be used
496    */
497   public HRegionServer(Configuration conf, CoordinatedStateManager csm)
498       throws IOException, InterruptedException {
499     super("RegionServer");  // thread name
500     this.fsOk = true;
501     this.conf = conf;
502     checkCodecs(this.conf);
503     this.userProvider = UserProvider.instantiate(conf);
504     FSUtils.setupShortCircuitRead(this.conf);
505     // Disable usage of meta replicas in the regionserver
506     this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
507 
508     // Config'ed params
509     this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
510         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
511     this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
512     this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
513 
514     this.sleeper = new Sleeper(this.msgInterval, this);
515 
516     boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
517     this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
518 
519     this.numRegionsToReport = conf.getInt(
520       "hbase.regionserver.numregionstoreport", 10);
521 
522     this.operationTimeout = conf.getInt(
523       HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
524       HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
525 
526     this.shortOperationTimeout = conf.getInt(
527       HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
528       HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
529 
530     this.abortRequested = false;
531     this.stopped = false;
532 
533     rpcServices = createRpcServices();
534     this.startcode = System.currentTimeMillis();
535     if (this instanceof HMaster) {
536       useThisHostnameInstead = conf.get(MASTER_HOSTNAME_KEY);
537     } else {
538       useThisHostnameInstead = conf.get(RS_HOSTNAME_KEY);
539     }
540     String hostName = shouldUseThisHostnameInstead() ? useThisHostnameInstead :
541       rpcServices.isa.getHostName();
542     serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode);
543 
544     rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
545     rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
546 
547     // login the zookeeper client principal (if using security)
548     ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
549       HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
550     // login the server principal (if using secure Hadoop)
551     login(userProvider, hostName);
552     // init superusers and add the server principal (if using security)
553     // or process owner as default super user.
554     Superusers.initialize(conf);
555 
556     regionServerAccounting = new RegionServerAccounting();
557     uncaughtExceptionHandler = new UncaughtExceptionHandler() {
558       @Override
559       public void uncaughtException(Thread t, Throwable e) {
560         abort("Uncaught exception in service thread " + t.getName(), e);
561       }
562     };
563 
564     useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
565 
566     initializeFileSystem();
567 
568     service = new ExecutorService(getServerName().toShortString());
569     spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
570 
571     // Some unit tests don't need a cluster, so no zookeeper at all
572     if (!conf.getBoolean("hbase.testing.nocluster", false)) {
573       // Open connection to zookeeper and set primary watcher
574       zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
575         rpcServices.isa.getPort(), this, canCreateBaseZNode());
576 
577       this.csm = (BaseCoordinatedStateManager) csm;
578       this.csm.initialize(this);
579       this.csm.start();
580 
581       tableLockManager = TableLockManager.createTableLockManager(
582         conf, zooKeeper, serverName);
583 
584       masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
585       masterAddressTracker.start();
586 
587       clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
588       clusterStatusTracker.start();
589     }
590     this.configurationManager = new ConfigurationManager();
591 
592     rpcServices.start();
593     putUpWebUI();
594     this.walRoller = new LogRoller(this, this);
595     this.choreService = new ChoreService(getServerName().toString(), true);
596 
597     if (!SystemUtils.IS_OS_WINDOWS) {
598       Signal.handle(new Signal("HUP"), new SignalHandler() {
599         public void handle(Signal signal) {
600           getConfiguration().reloadConfiguration();
601           configurationManager.notifyAllObservers(getConfiguration());
602         }
603       });
604     }
605   }
606 
607   private void initializeFileSystem() throws IOException {
608     // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
609     // underlying hadoop hdfs accessors will be going against wrong filesystem
610     // (unless all is set to defaults).
611     FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
612     // Get fs instance used by this RS.  Do we use checksum verification in the hbase? If hbase
613     // checksum verification enabled, then automatically switch off hdfs checksum verification.
614     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
615     this.fs = new HFileSystem(this.conf, useHBaseChecksum);
616     this.rootDir = FSUtils.getRootDir(this.conf);
617     this.tableDescriptors = new FSTableDescriptors(
618       this.conf, this.fs, this.rootDir, !canUpdateTableDescriptor(), false);
619   }
620 
621   /*
622    * Returns true if configured hostname should be used
623    */
624   protected boolean shouldUseThisHostnameInstead() {
625     return useThisHostnameInstead != null && !useThisHostnameInstead.isEmpty();
626   }
627 
628   protected void login(UserProvider user, String host) throws IOException {
629     user.login("hbase.regionserver.keytab.file",
630       "hbase.regionserver.kerberos.principal", host);
631   }
632 
633   protected void waitForMasterActive(){
634   }
635 
636   protected String getProcessName() {
637     return REGIONSERVER;
638   }
639 
640   protected boolean canCreateBaseZNode() {
641     return false;
642   }
643 
644   protected boolean canUpdateTableDescriptor() {
645     return false;
646   }
647 
648   protected RSRpcServices createRpcServices() throws IOException {
649     return new RSRpcServices(this);
650   }
651 
652   protected void configureInfoServer() {
653     infoServer.addServlet("rs-status", "/rs-status", RSStatusServlet.class);
654     infoServer.setAttribute(REGIONSERVER, this);
655   }
656 
657   protected Class<? extends HttpServlet> getDumpServlet() {
658     return RSDumpServlet.class;
659   }
660 
661   @Override
662   public boolean registerService(Service instance) {
663     /*
664      * No stacking of instances is allowed for a single service name
665      */
666     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
667     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
668       LOG.error("Coprocessor service " + serviceDesc.getFullName()
669           + " already registered, rejecting request from " + instance);
670       return false;
671     }
672 
673     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
674     if (LOG.isDebugEnabled()) {
675       LOG.debug("Registered regionserver coprocessor service: service="+serviceDesc.getFullName());
676     }
677     return true;
678   }
679 
680   /**
681    * Create a 'smarter' HConnection, one that is capable of by-passing RPC if the request is to
682    * the local server. Safe to use going to local or remote server.
683    * Create this instance in a method can be intercepted and mocked in tests.
684    * @throws IOException
685    */
686   @VisibleForTesting
687   protected ClusterConnection createClusterConnection() throws IOException {
688     // Create a cluster connection that when appropriate, can short-circuit and go directly to the
689     // local server if the request is to the local server bypassing RPC. Can be used for both local
690     // and remote invocations.
691     return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(),
692       serverName, rpcServices, rpcServices);
693   }
694 
695   /**
696    * Run test on configured codecs to make sure supporting libs are in place.
697    * @param c
698    * @throws IOException
699    */
700   private static void checkCodecs(final Configuration c) throws IOException {
701     // check to see if the codec list is available:
702     String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
703     if (codecs == null) return;
704     for (String codec : codecs) {
705       if (!CompressionTest.testCompression(codec)) {
706         throw new IOException("Compression codec " + codec +
707           " not supported, aborting RS construction");
708       }
709     }
710   }
711 
712   public String getClusterId() {
713     return this.clusterId;
714   }
715 
716   /**
717    * Setup our cluster connection if not already initialized.
718    * @throws IOException
719    */
720   protected synchronized void setupClusterConnection() throws IOException {
721     if (clusterConnection == null) {
722       clusterConnection = createClusterConnection();
723       metaTableLocator = new MetaTableLocator();
724     }
725   }
726 
727   /**
728    * All initialization needed before we go register with Master.
729    *
730    * @throws IOException
731    * @throws InterruptedException
732    */
733   private void preRegistrationInitialization(){
734     try {
735       setupClusterConnection();
736 
737       // Health checker thread.
738       if (isHealthCheckerConfigured()) {
739         int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
740           HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
741         healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
742       }
743       this.pauseMonitor = new JvmPauseMonitor(conf);
744       pauseMonitor.start();
745 
746       initializeZooKeeper();
747       if (!isStopped() && !isAborted()) {
748         initializeThreads();
749       }
750     } catch (Throwable t) {
751       // Call stop if error or process will stick around for ever since server
752       // puts up non-daemon threads.
753       this.rpcServices.stop();
754       abort("Initialization of RS failed.  Hence aborting RS.", t);
755     }
756   }
757 
758   /**
759    * Bring up connection to zk ensemble and then wait until a master for this
760    * cluster and then after that, wait until cluster 'up' flag has been set.
761    * This is the order in which master does things.
762    * Finally open long-living server short-circuit connection.
763    * @throws IOException
764    * @throws InterruptedException
765    */
766   private void initializeZooKeeper() throws IOException, InterruptedException {
767     // Create the master address tracker, register with zk, and start it.  Then
768     // block until a master is available.  No point in starting up if no master
769     // running.
770     blockAndCheckIfStopped(this.masterAddressTracker);
771 
772     // Wait on cluster being up.  Master will set this flag up in zookeeper
773     // when ready.
774     blockAndCheckIfStopped(this.clusterStatusTracker);
775 
776     // Retrieve clusterId
777     // Since cluster status is now up
778     // ID should have already been set by HMaster
779     try {
780       clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
781       if (clusterId == null) {
782         this.abort("Cluster ID has not been set");
783       }
784       LOG.info("ClusterId : "+clusterId);
785     } catch (KeeperException e) {
786       this.abort("Failed to retrieve Cluster ID",e);
787     }
788 
789     // In case colocated master, wait here till it's active.
790     // So backup masters won't start as regionservers.
791     // This is to avoid showing backup masters as regionservers
792     // in master web UI, or assigning any region to them.
793     waitForMasterActive();
794     if (isStopped() || isAborted()) {
795       return; // No need for further initialization
796     }
797 
798     // watch for snapshots and other procedures
799     try {
800       rspmHost = new RegionServerProcedureManagerHost();
801       rspmHost.loadProcedures(conf);
802       rspmHost.initialize(this);
803     } catch (KeeperException e) {
804       this.abort("Failed to reach zk cluster when creating procedure handler.", e);
805     }
806     // register watcher for recovering regions
807     this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
808   }
809 
810   /**
811    * Utilty method to wait indefinitely on a znode availability while checking
812    * if the region server is shut down
813    * @param tracker znode tracker to use
814    * @throws IOException any IO exception, plus if the RS is stopped
815    * @throws InterruptedException
816    */
817   private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker)
818       throws IOException, InterruptedException {
819     while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
820       if (this.stopped) {
821         throw new IOException("Received the shutdown message while waiting.");
822       }
823     }
824   }
825 
826   /**
827    * @return False if cluster shutdown in progress
828    */
829   private boolean isClusterUp() {
830     return clusterStatusTracker != null && clusterStatusTracker.isClusterUp();
831   }
832 
833   private void initializeThreads() throws IOException {
834     // Cache flushing thread.
835     this.cacheFlusher = new MemStoreFlusher(conf, this);
836 
837     // Compaction thread
838     this.compactSplitThread = new CompactSplitThread(this);
839 
840     // Background thread to check for compactions; needed if region has not gotten updates
841     // in a while. It will take care of not checking too frequently on store-by-store basis.
842     this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
843     this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
844     this.leases = new Leases(this.threadWakeFrequency);
845 
846     // Create the thread to clean the moved regions list
847     movedRegionsCleaner = MovedRegionsCleaner.create(this);
848 
849     if (this.nonceManager != null) {
850       // Create the scheduled chore that cleans up nonces.
851       nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
852     }
853 
854     // Setup the Quota Manager
855     rsQuotaManager = new RegionServerQuotaManager(this);
856     
857     // Setup RPC client for master communication
858     rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
859         rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
860 
861     boolean onlyMetaRefresh = false;
862     int storefileRefreshPeriod = conf.getInt(
863         StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
864       , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
865     if (storefileRefreshPeriod == 0) {
866       storefileRefreshPeriod = conf.getInt(
867           StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
868           StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
869       onlyMetaRefresh = true;
870     }
871     if (storefileRefreshPeriod > 0) {
872       this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
873           onlyMetaRefresh, this, this);
874     }
875     registerConfigurationObservers();
876   }
877 
878   private void registerConfigurationObservers() {
879     // Registering the compactSplitThread object with the ConfigurationManager.
880     configurationManager.registerObserver(this.compactSplitThread);
881     configurationManager.registerObserver(this.rpcServices);
882   }
883 
884   /**
885    * The HRegionServer sticks in this loop until closed.
886    */
887   @Override
888   public void run() {
889     try {
890       // Do pre-registration initializations; zookeeper, lease threads, etc.
891       preRegistrationInitialization();
892     } catch (Throwable e) {
893       abort("Fatal exception during initialization", e);
894     }
895 
896     try {
897       if (!isStopped() && !isAborted()) {
898         ShutdownHook.install(conf, fs, this, Thread.currentThread());
899         // Set our ephemeral znode up in zookeeper now we have a name.
900         createMyEphemeralNode();
901         // Initialize the RegionServerCoprocessorHost now that our ephemeral
902         // node was created, in case any coprocessors want to use ZooKeeper
903         this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
904       }
905 
906       // Try and register with the Master; tell it we are here.  Break if
907       // server is stopped or the clusterup flag is down or hdfs went wacky.
908       while (keepLooping()) {
909         RegionServerStartupResponse w = reportForDuty();
910         if (w == null) {
911           LOG.warn("reportForDuty failed; sleeping and then retrying.");
912           this.sleeper.sleep();
913         } else {
914           handleReportForDutyResponse(w);
915           break;
916         }
917       }
918 
919       if (!isStopped() && isHealthy()){
920         // start the snapshot handler and other procedure handlers,
921         // since the server is ready to run
922         rspmHost.start();
923       }
924       
925       // Start the Quota Manager
926       if (this.rsQuotaManager != null) {
927         rsQuotaManager.start(getRpcServer().getScheduler());
928       }
929 
930       // We registered with the Master.  Go into run mode.
931       long lastMsg = System.currentTimeMillis();
932       long oldRequestCount = -1;
933       // The main run loop.
934       while (!isStopped() && isHealthy()) {
935         if (!isClusterUp()) {
936           if (isOnlineRegionsEmpty()) {
937             stop("Exiting; cluster shutdown set and not carrying any regions");
938           } else if (!this.stopping) {
939             this.stopping = true;
940             LOG.info("Closing user regions");
941             closeUserRegions(this.abortRequested);
942           } else if (this.stopping) {
943             boolean allUserRegionsOffline = areAllUserRegionsOffline();
944             if (allUserRegionsOffline) {
945               // Set stopped if no more write requests tp meta tables
946               // since last time we went around the loop.  Any open
947               // meta regions will be closed on our way out.
948               if (oldRequestCount == getWriteRequestCount()) {
949                 stop("Stopped; only catalog regions remaining online");
950                 break;
951               }
952               oldRequestCount = getWriteRequestCount();
953             } else {
954               // Make sure all regions have been closed -- some regions may
955               // have not got it because we were splitting at the time of
956               // the call to closeUserRegions.
957               closeUserRegions(this.abortRequested);
958             }
959             LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
960           }
961         }
962         long now = System.currentTimeMillis();
963         if ((now - lastMsg) >= msgInterval) {
964           tryRegionServerReport(lastMsg, now);
965           lastMsg = System.currentTimeMillis();
966         }
967         if (!isStopped() && !isAborted()) {
968           this.sleeper.sleep();
969         }
970       } // for
971     } catch (Throwable t) {
972       if (!rpcServices.checkOOME(t)) {
973         String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
974         abort(prefix + t.getMessage(), t);
975       }
976     }
977     // Run shutdown.
978     if (mxBean != null) {
979       MBeanUtil.unregisterMBean(mxBean);
980       mxBean = null;
981     }
982     if (this.leases != null) this.leases.closeAfterLeasesExpire();
983     if (this.splitLogWorker != null) {
984       splitLogWorker.stop();
985     }
986     if (this.infoServer != null) {
987       LOG.info("Stopping infoServer");
988       try {
989         this.infoServer.stop();
990       } catch (Exception e) {
991         LOG.error("Failed to stop infoServer", e);
992       }
993     }
994     // Send cache a shutdown.
995     if (cacheConfig != null && cacheConfig.isBlockCacheEnabled()) {
996       cacheConfig.getBlockCache().shutdown();
997     }
998 
999     if (movedRegionsCleaner != null) {
1000       movedRegionsCleaner.stop("Region Server stopping");
1001     }
1002 
1003     // Send interrupts to wake up threads if sleeping so they notice shutdown.
1004     // TODO: Should we check they are alive? If OOME could have exited already
1005     if (this.hMemManager != null) this.hMemManager.stop();
1006     if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
1007     if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
1008     if (this.compactionChecker != null) this.compactionChecker.cancel(true);
1009     if (this.healthCheckChore != null) this.healthCheckChore.cancel(true);
1010     if (this.nonceManagerChore != null) this.nonceManagerChore.cancel(true);
1011     if (this.storefileRefresher != null) this.storefileRefresher.cancel(true);
1012     sendShutdownInterrupt();
1013 
1014     // Stop the quota manager
1015     if (rsQuotaManager != null) {
1016       rsQuotaManager.stop();
1017     }
1018     
1019     // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
1020     if (rspmHost != null) {
1021       rspmHost.stop(this.abortRequested || this.killed);
1022     }
1023 
1024     if (this.killed) {
1025       // Just skip out w/o closing regions.  Used when testing.
1026     } else if (abortRequested) {
1027       if (this.fsOk) {
1028         closeUserRegions(abortRequested); // Don't leave any open file handles
1029       }
1030       LOG.info("aborting server " + this.serverName);
1031     } else {
1032       closeUserRegions(abortRequested);
1033       LOG.info("stopping server " + this.serverName);
1034     }
1035 
1036     // so callers waiting for meta without timeout can stop
1037     if (this.metaTableLocator != null) this.metaTableLocator.stop();
1038     if (this.clusterConnection != null && !clusterConnection.isClosed()) {
1039       try {
1040         this.clusterConnection.close();
1041       } catch (IOException e) {
1042         // Although the {@link Closeable} interface throws an {@link
1043         // IOException}, in reality, the implementation would never do that.
1044         LOG.warn("Attempt to close server's short circuit HConnection failed.", e);
1045       }
1046     }
1047 
1048     // Closing the compactSplit thread before closing meta regions
1049     if (!this.killed && containsMetaTableRegions()) {
1050       if (!abortRequested || this.fsOk) {
1051         if (this.compactSplitThread != null) {
1052           this.compactSplitThread.join();
1053           this.compactSplitThread = null;
1054         }
1055         closeMetaTableRegions(abortRequested);
1056       }
1057     }
1058 
1059     if (!this.killed && this.fsOk) {
1060       waitOnAllRegionsToClose(abortRequested);
1061       LOG.info("stopping server " + this.serverName +
1062         "; all regions closed.");
1063     }
1064 
1065     //fsOk flag may be changed when closing regions throws exception.
1066     if (this.fsOk) {
1067       shutdownWAL(!abortRequested);
1068     }
1069 
1070     // Make sure the proxy is down.
1071     if (this.rssStub != null) {
1072       this.rssStub = null;
1073     }
1074     if (this.rpcClient != null) {
1075       this.rpcClient.close();
1076     }
1077     if (this.leases != null) {
1078       this.leases.close();
1079     }
1080     if (this.pauseMonitor != null) {
1081       this.pauseMonitor.stop();
1082     }
1083 
1084     if (!killed) {
1085       stopServiceThreads();
1086     }
1087 
1088     if (this.rpcServices != null) {
1089       this.rpcServices.stop();
1090     }
1091 
1092     try {
1093       deleteMyEphemeralNode();
1094     } catch (KeeperException.NoNodeException nn) {
1095     } catch (KeeperException e) {
1096       LOG.warn("Failed deleting my ephemeral node", e);
1097     }
1098     // We may have failed to delete the znode at the previous step, but
1099     //  we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1100     ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1101 
1102     if (this.zooKeeper != null) {
1103       this.zooKeeper.close();
1104     }
1105     LOG.info("stopping server " + this.serverName +
1106       "; zookeeper connection closed.");
1107 
1108     LOG.info(Thread.currentThread().getName() + " exiting");
1109   }
1110 
1111   private boolean containsMetaTableRegions() {
1112     return onlineRegions.containsKey(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1113   }
1114 
1115   private boolean areAllUserRegionsOffline() {
1116     if (getNumberOfOnlineRegions() > 2) return false;
1117     boolean allUserRegionsOffline = true;
1118     for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
1119       if (!e.getValue().getRegionInfo().isMetaTable()) {
1120         allUserRegionsOffline = false;
1121         break;
1122       }
1123     }
1124     return allUserRegionsOffline;
1125   }
1126 
1127   /**
1128    * @return Current write count for all online regions.
1129    */
1130   private long getWriteRequestCount() {
1131     long writeCount = 0;
1132     for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
1133       writeCount += e.getValue().getWriteRequestsCount();
1134     }
1135     return writeCount;
1136   }
1137 
1138   @VisibleForTesting
1139   protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1140   throws IOException {
1141     RegionServerStatusService.BlockingInterface rss = rssStub;
1142     if (rss == null) {
1143       // the current server could be stopping.
1144       return;
1145     }
1146     ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1147     try {
1148       RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1149       ServerName sn = ServerName.parseVersionedServerName(
1150         this.serverName.getVersionedBytes());
1151       request.setServer(ProtobufUtil.toServerName(sn));
1152       request.setLoad(sl);
1153       rss.regionServerReport(null, request.build());
1154     } catch (ServiceException se) {
1155       IOException ioe = ProtobufUtil.getRemoteException(se);
1156       if (ioe instanceof YouAreDeadException) {
1157         // This will be caught and handled as a fatal error in run()
1158         throw ioe;
1159       }
1160       if (rssStub == rss) {
1161         rssStub = null;
1162       }
1163       // Couldn't connect to the master, get location from zk and reconnect
1164       // Method blocks until new master is found or we are stopped
1165       createRegionServerStatusStub(true);
1166     }
1167   }
1168 
1169   ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1170       throws IOException {
1171     // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1172     // per second, and other metrics  As long as metrics are part of ServerLoad it's best to use
1173     // the wrapper to compute those numbers in one place.
1174     // In the long term most of these should be moved off of ServerLoad and the heart beat.
1175     // Instead they should be stored in an HBase table so that external visibility into HBase is
1176     // improved; Additionally the load balancer will be able to take advantage of a more complete
1177     // history.
1178     MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1179     Collection<Region> regions = getOnlineRegionsLocalContext();
1180     long usedMemory = -1L;
1181     long maxMemory = -1L;
1182     final MemoryUsage usage = HeapMemorySizeUtil.safeGetHeapMemoryUsage();
1183     if (usage != null) {
1184       usedMemory = usage.getUsed();
1185       maxMemory = usage.getMax();
1186     }
1187 
1188     ClusterStatusProtos.ServerLoad.Builder serverLoad =
1189       ClusterStatusProtos.ServerLoad.newBuilder();
1190     serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1191     serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
1192     serverLoad.setUsedHeapMB((int)(usedMemory / 1024 / 1024));
1193     serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
1194     Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1195     Builder coprocessorBuilder = Coprocessor.newBuilder();
1196     for (String coprocessor : coprocessors) {
1197       serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1198     }
1199     RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1200     RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1201     for (Region region : regions) {
1202       if (region.getCoprocessorHost() != null) {
1203         Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1204         Iterator<String> iterator = regionCoprocessors.iterator();
1205         while (iterator.hasNext()) {
1206           serverLoad.addCoprocessors(coprocessorBuilder.setName(iterator.next()).build());
1207         }
1208       }
1209       serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1210       for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1211           .getCoprocessors()) {
1212         serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1213       }
1214     }
1215     serverLoad.setReportStartTime(reportStartTime);
1216     serverLoad.setReportEndTime(reportEndTime);
1217     if (this.infoServer != null) {
1218       serverLoad.setInfoServerPort(this.infoServer.getPort());
1219     } else {
1220       serverLoad.setInfoServerPort(-1);
1221     }
1222 
1223     // for the replicationLoad purpose. Only need to get from one service
1224     // either source or sink will get the same info
1225     ReplicationSourceService rsources = getReplicationSourceService();
1226 
1227     if (rsources != null) {
1228       // always refresh first to get the latest value
1229       ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
1230       if (rLoad != null) {
1231         serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1232         for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) {
1233           serverLoad.addReplLoadSource(rLS);
1234         }
1235       }
1236     }
1237 
1238     return serverLoad.build();
1239   }
1240 
1241   String getOnlineRegionsAsPrintableString() {
1242     StringBuilder sb = new StringBuilder();
1243     for (Region r: this.onlineRegions.values()) {
1244       if (sb.length() > 0) sb.append(", ");
1245       sb.append(r.getRegionInfo().getEncodedName());
1246     }
1247     return sb.toString();
1248   }
1249 
1250   /**
1251    * Wait on regions close.
1252    */
1253   private void waitOnAllRegionsToClose(final boolean abort) {
1254     // Wait till all regions are closed before going out.
1255     int lastCount = -1;
1256     long previousLogTime = 0;
1257     Set<String> closedRegions = new HashSet<String>();
1258     boolean interrupted = false;
1259     try {
1260       while (!isOnlineRegionsEmpty()) {
1261         int count = getNumberOfOnlineRegions();
1262         // Only print a message if the count of regions has changed.
1263         if (count != lastCount) {
1264           // Log every second at most
1265           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1266             previousLogTime = System.currentTimeMillis();
1267             lastCount = count;
1268             LOG.info("Waiting on " + count + " regions to close");
1269             // Only print out regions still closing if a small number else will
1270             // swamp the log.
1271             if (count < 10 && LOG.isDebugEnabled()) {
1272               LOG.debug(this.onlineRegions);
1273             }
1274           }
1275         }
1276         // Ensure all user regions have been sent a close. Use this to
1277         // protect against the case where an open comes in after we start the
1278         // iterator of onlineRegions to close all user regions.
1279         for (Map.Entry<String, Region> e : this.onlineRegions.entrySet()) {
1280           HRegionInfo hri = e.getValue().getRegionInfo();
1281           if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1282               && !closedRegions.contains(hri.getEncodedName())) {
1283             closedRegions.add(hri.getEncodedName());
1284             // Don't update zk with this close transition; pass false.
1285             closeRegionIgnoreErrors(hri, abort);
1286               }
1287         }
1288         // No regions in RIT, we could stop waiting now.
1289         if (this.regionsInTransitionInRS.isEmpty()) {
1290           if (!isOnlineRegionsEmpty()) {
1291             LOG.info("We were exiting though online regions are not empty," +
1292                 " because some regions failed closing");
1293           }
1294           break;
1295         }
1296         if (sleep(200)) {
1297           interrupted = true;
1298         }
1299       }
1300     } finally {
1301       if (interrupted) {
1302         Thread.currentThread().interrupt();
1303       }
1304     }
1305   }
1306 
1307   private boolean sleep(long millis) {
1308     boolean interrupted = false;
1309     try {
1310       Thread.sleep(millis);
1311     } catch (InterruptedException e) {
1312       LOG.warn("Interrupted while sleeping");
1313       interrupted = true;
1314     }
1315     return interrupted;
1316   }
1317 
1318   private void shutdownWAL(final boolean close) {
1319     if (this.walFactory != null) {
1320       try {
1321         if (close) {
1322           walFactory.close();
1323         } else {
1324           walFactory.shutdown();
1325         }
1326       } catch (Throwable e) {
1327         e = RemoteExceptionHandler.checkThrowable(e);
1328         LOG.error("Shutdown / close of WAL failed: " + e);
1329         LOG.debug("Shutdown / close exception details:", e);
1330       }
1331     }
1332   }
1333 
1334   /*
1335    * Run init. Sets up wal and starts up all server threads.
1336    *
1337    * @param c Extra configuration.
1338    */
1339   protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1340   throws IOException {
1341     try {
1342       boolean updateRootDir = false;
1343       for (NameStringPair e : c.getMapEntriesList()) {
1344         String key = e.getName();
1345         // The hostname the master sees us as.
1346         if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1347           String hostnameFromMasterPOV = e.getValue();
1348           this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1349             rpcServices.isa.getPort(), this.startcode);
1350           if (shouldUseThisHostnameInstead() &&
1351               !hostnameFromMasterPOV.equals(useThisHostnameInstead)) {
1352             String msg = "Master passed us a different hostname to use; was=" +
1353                 this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV;
1354             LOG.error(msg);
1355             throw new IOException(msg);
1356           }
1357           if (!shouldUseThisHostnameInstead() &&
1358               !hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) {
1359             String msg = "Master passed us a different hostname to use; was=" +
1360                 rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV;
1361             LOG.error(msg);
1362           }
1363           continue;
1364         }
1365 
1366         String value = e.getValue();
1367         if (key.equals(HConstants.HBASE_DIR)) {
1368           if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) {
1369             updateRootDir = true;
1370           }
1371         }
1372 
1373         if (LOG.isDebugEnabled()) {
1374           LOG.debug("Config from master: " + key + "=" + value);
1375         }
1376         this.conf.set(key, value);
1377       }
1378 
1379       if (updateRootDir) {
1380         // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1381         initializeFileSystem();
1382       }
1383 
1384       // hack! Maps DFSClient => RegionServer for logs.  HDFS made this
1385       // config param for task trackers, but we can piggyback off of it.
1386       if (this.conf.get("mapreduce.task.attempt.id") == null) {
1387         this.conf.set("mapreduce.task.attempt.id", "hb_rs_" +
1388           this.serverName.toString());
1389       }
1390 
1391       // Save it in a file, this will allow to see if we crash
1392       ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1393 
1394       this.cacheConfig = new CacheConfig(conf);
1395       this.walFactory = setupWALAndReplication();
1396       // Init in here rather than in constructor after thread name has been set
1397       this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
1398 
1399       startServiceThreads();
1400       startHeapMemoryManager();
1401       LOG.info("Serving as " + this.serverName +
1402         ", RpcServer on " + rpcServices.isa +
1403         ", sessionid=0x" +
1404         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1405 
1406       // Wake up anyone waiting for this server to online
1407       synchronized (online) {
1408         online.set(true);
1409         online.notifyAll();
1410       }
1411     } catch (Throwable e) {
1412       stop("Failed initialization");
1413       throw convertThrowableToIOE(cleanup(e, "Failed init"),
1414           "Region server startup failed");
1415     } finally {
1416       sleeper.skipSleepCycle();
1417     }
1418   }
1419 
1420   private void startHeapMemoryManager() {
1421     this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher,
1422         this, this.regionServerAccounting);
1423     if (this.hMemManager != null) {
1424       this.hMemManager.start(getChoreService());
1425     }
1426   }
1427 
1428   private void createMyEphemeralNode() throws KeeperException, IOException {
1429     RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1430     rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1431     rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1432     byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1433     ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper,
1434       getMyEphemeralNodePath(), data);
1435   }
1436 
1437   private void deleteMyEphemeralNode() throws KeeperException {
1438     ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1439   }
1440 
1441   @Override
1442   public RegionServerAccounting getRegionServerAccounting() {
1443     return regionServerAccounting;
1444   }
1445 
1446   @Override
1447   public TableLockManager getTableLockManager() {
1448     return tableLockManager;
1449   }
1450 
1451   /*
1452    * @param r Region to get RegionLoad for.
1453    * @param regionLoadBldr the RegionLoad.Builder, can be null
1454    * @param regionSpecifier the RegionSpecifier.Builder, can be null
1455    * @return RegionLoad instance.
1456    *
1457    * @throws IOException
1458    */
1459   private RegionLoad createRegionLoad(final Region r, RegionLoad.Builder regionLoadBldr,
1460       RegionSpecifier.Builder regionSpecifier) throws IOException {
1461     byte[] name = r.getRegionInfo().getRegionName();
1462     int stores = 0;
1463     int storefiles = 0;
1464     int storeUncompressedSizeMB = 0;
1465     int storefileSizeMB = 0;
1466     int memstoreSizeMB = (int) (r.getMemstoreSize() / 1024 / 1024);
1467     int storefileIndexSizeMB = 0;
1468     int rootIndexSizeKB = 0;
1469     int totalStaticIndexSizeKB = 0;
1470     int totalStaticBloomSizeKB = 0;
1471     long totalCompactingKVs = 0;
1472     long currentCompactedKVs = 0;
1473     List<Store> storeList = r.getStores();
1474     stores += storeList.size();
1475     for (Store store : storeList) {
1476       storefiles += store.getStorefilesCount();
1477       storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024);
1478       storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1479       storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
1480       CompactionProgress progress = store.getCompactionProgress();
1481       if (progress != null) {
1482         totalCompactingKVs += progress.totalCompactingKVs;
1483         currentCompactedKVs += progress.currentCompactedKVs;
1484       }
1485       rootIndexSizeKB += (int) (store.getStorefilesIndexSize() / 1024);
1486       totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024);
1487       totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024);
1488     }
1489 
1490     float dataLocality =
1491         r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname());
1492     if (regionLoadBldr == null) {
1493       regionLoadBldr = RegionLoad.newBuilder();
1494     }
1495     if (regionSpecifier == null) {
1496       regionSpecifier = RegionSpecifier.newBuilder();
1497     }
1498     regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1499     regionSpecifier.setValue(ByteStringer.wrap(name));
1500     regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1501       .setStores(stores)
1502       .setStorefiles(storefiles)
1503       .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1504       .setStorefileSizeMB(storefileSizeMB)
1505       .setMemstoreSizeMB(memstoreSizeMB)
1506       .setStorefileIndexSizeMB(storefileIndexSizeMB)
1507       .setRootIndexSizeKB(rootIndexSizeKB)
1508       .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1509       .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1510       .setReadRequestsCount(r.getReadRequestsCount())
1511       .setWriteRequestsCount(r.getWriteRequestsCount())
1512       .setTotalCompactingKVs(totalCompactingKVs)
1513       .setCurrentCompactedKVs(currentCompactedKVs)
1514       .setDataLocality(dataLocality)
1515       .setLastMajorCompactionTs(r.getOldestHfileTs(true));
1516     ((HRegion)r).setCompleteSequenceId(regionLoadBldr);
1517 
1518     return regionLoadBldr.build();
1519   }
1520 
1521   /**
1522    * @param encodedRegionName
1523    * @return An instance of RegionLoad.
1524    */
1525   public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1526     Region r = onlineRegions.get(encodedRegionName);
1527     return r != null ? createRegionLoad(r, null, null) : null;
1528   }
1529 
1530   /*
1531    * Inner class that runs on a long period checking if regions need compaction.
1532    */
1533   private static class CompactionChecker extends ScheduledChore {
1534     private final HRegionServer instance;
1535     private final int majorCompactPriority;
1536     private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1537     private long iteration = 0;
1538 
1539     CompactionChecker(final HRegionServer h, final int sleepTime,
1540         final Stoppable stopper) {
1541       super("CompactionChecker", stopper, sleepTime);
1542       this.instance = h;
1543       LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));
1544 
1545       /* MajorCompactPriority is configurable.
1546        * If not set, the compaction will use default priority.
1547        */
1548       this.majorCompactPriority = this.instance.conf.
1549         getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1550         DEFAULT_PRIORITY);
1551     }
1552 
1553     @Override
1554     protected void chore() {
1555       for (Region r : this.instance.onlineRegions.values()) {
1556         if (r == null)
1557           continue;
1558         for (Store s : r.getStores()) {
1559           try {
1560             long multiplier = s.getCompactionCheckMultiplier();
1561             assert multiplier > 0;
1562             if (iteration % multiplier != 0) continue;
1563             if (s.needsCompaction()) {
1564               // Queue a compaction. Will recognize if major is needed.
1565               this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
1566                   + " requests compaction");
1567             } else if (s.isMajorCompaction()) {
1568               if (majorCompactPriority == DEFAULT_PRIORITY
1569                   || majorCompactPriority > ((HRegion)r).getCompactPriority()) {
1570                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1571                     + " requests major compaction; use default priority", null);
1572               } else {
1573                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1574                     + " requests major compaction; use configured priority",
1575                   this.majorCompactPriority, null, null);
1576               }
1577             }
1578           } catch (IOException e) {
1579             LOG.warn("Failed major compaction check on " + r, e);
1580           }
1581         }
1582       }
1583       iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1584     }
1585   }
1586 
1587   static class PeriodicMemstoreFlusher extends ScheduledChore {
1588     final HRegionServer server;
1589     final static int RANGE_OF_DELAY = 5 * 60 * 1000; // 5 min in milliseconds
1590     final static int MIN_DELAY_TIME = 0; // millisec
1591     public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1592       super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval);
1593       this.server = server;
1594     }
1595 
1596     @Override
1597     protected void chore() {
1598       final StringBuffer whyFlush = new StringBuffer();
1599       for (Region r : this.server.onlineRegions.values()) {
1600         if (r == null) continue;
1601         if (((HRegion)r).shouldFlush(whyFlush)) {
1602           FlushRequester requester = server.getFlushRequester();
1603           if (requester != null) {
1604             long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
1605             LOG.info(getName() + " requesting flush of " +
1606               r.getRegionInfo().getRegionNameAsString() + " because " +
1607               whyFlush.toString() +
1608               " after random delay " + randomDelay + "ms");
1609             //Throttle the flushes by putting a delay. If we don't throttle, and there
1610             //is a balanced write-load on the regions in a table, we might end up
1611             //overwhelming the filesystem with too many flushes at once.
1612             requester.requestDelayedFlush(r, randomDelay, false);
1613           }
1614         }
1615       }
1616     }
1617   }
1618 
1619   /**
1620    * Report the status of the server. A server is online once all the startup is
1621    * completed (setting up filesystem, starting service threads, etc.). This
1622    * method is designed mostly to be useful in tests.
1623    *
1624    * @return true if online, false if not.
1625    */
1626   public boolean isOnline() {
1627     return online.get();
1628   }
1629 
1630   /**
1631    * Setup WAL log and replication if enabled.
1632    * Replication setup is done in here because it wants to be hooked up to WAL.
1633    * @return A WAL instance.
1634    * @throws IOException
1635    */
1636   private WALFactory setupWALAndReplication() throws IOException {
1637     // TODO Replication make assumptions here based on the default filesystem impl
1638     final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1639     final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString());
1640 
1641     Path logdir = new Path(rootDir, logName);
1642     if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1643     if (this.fs.exists(logdir)) {
1644       throw new RegionServerRunningException("Region server has already " +
1645         "created directory at " + this.serverName.toString());
1646     }
1647 
1648     // Instantiate replication manager if replication enabled.  Pass it the
1649     // log directories.
1650     createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
1651 
1652     // listeners the wal factory will add to wals it creates.
1653     final List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1654     listeners.add(new MetricsWAL());
1655     if (this.replicationSourceHandler != null &&
1656         this.replicationSourceHandler.getWALActionsListener() != null) {
1657       // Replication handler is an implementation of WALActionsListener.
1658       listeners.add(this.replicationSourceHandler.getWALActionsListener());
1659     }
1660 
1661     return new WALFactory(conf, listeners, serverName.toString());
1662   }
1663 
1664   /**
1665    * We initialize the roller for the wal that handles meta lazily
1666    * since we don't know if this regionserver will handle it. All calls to
1667    * this method return a reference to the that same roller. As newly referenced
1668    * meta regions are brought online, they will be offered to the roller for maintenance.
1669    * As a part of that registration process, the roller will add itself as a
1670    * listener on the wal.
1671    */
1672   protected LogRoller ensureMetaWALRoller() {
1673     // Using a tmp log roller to ensure metaLogRoller is alive once it is not
1674     // null
1675     LogRoller roller = metawalRoller.get();
1676     if (null == roller) {
1677       LogRoller tmpLogRoller = new LogRoller(this, this);
1678       String n = Thread.currentThread().getName();
1679       Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
1680           n + "-MetaLogRoller", uncaughtExceptionHandler);
1681       if (metawalRoller.compareAndSet(null, tmpLogRoller)) {
1682         roller = tmpLogRoller;
1683       } else {
1684         // Another thread won starting the roller
1685         Threads.shutdown(tmpLogRoller.getThread());
1686         roller = metawalRoller.get();
1687       }
1688     }
1689     return roller;
1690   }
1691 
1692   public MetricsRegionServer getRegionServerMetrics() {
1693     return this.metricsRegionServer;
1694   }
1695 
1696   /**
1697    * @return Master address tracker instance.
1698    */
1699   public MasterAddressTracker getMasterAddressTracker() {
1700     return this.masterAddressTracker;
1701   }
1702 
1703   /*
1704    * Start maintenance Threads, Server, Worker and lease checker threads.
1705    * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1706    * get an unhandled exception. We cannot set the handler on all threads.
1707    * Server's internal Listener thread is off limits. For Server, if an OOME, it
1708    * waits a while then retries. Meantime, a flush or a compaction that tries to
1709    * run should trigger same critical condition and the shutdown will run. On
1710    * its way out, this server will shut down Server. Leases are sort of
1711    * inbetween. It has an internal thread that while it inherits from Chore, it
1712    * keeps its own internal stop mechanism so needs to be stopped by this
1713    * hosting server. Worker logs the exception and exits.
1714    */
1715   private void startServiceThreads() throws IOException {
1716     // Start executor services
1717     this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
1718       conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1719     this.service.startExecutorService(ExecutorType.RS_OPEN_META,
1720       conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1721     this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1722       conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1723     this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
1724       conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1725     if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1726       this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
1727         conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
1728     }
1729     this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
1730        "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
1731 
1732     if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1733       this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
1734         conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1735           conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
1736     }
1737 
1738     Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
1739         uncaughtExceptionHandler);
1740     this.cacheFlusher.start(uncaughtExceptionHandler);
1741 
1742     if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);
1743     if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher);
1744     if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore);
1745     if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore);
1746     if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher);
1747     if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner);
1748 
1749     // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1750     // an unhandled exception, it will just exit.
1751     Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker",
1752       uncaughtExceptionHandler);
1753 
1754     if (this.replicationSourceHandler == this.replicationSinkHandler &&
1755         this.replicationSourceHandler != null) {
1756       this.replicationSourceHandler.startReplicationService();
1757     } else {
1758       if (this.replicationSourceHandler != null) {
1759         this.replicationSourceHandler.startReplicationService();
1760       }
1761       if (this.replicationSinkHandler != null) {
1762         this.replicationSinkHandler.startReplicationService();
1763       }
1764     }
1765 
1766     // Create the log splitting worker and start it
1767     // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
1768     // quite a while inside HConnection layer. The worker won't be available for other
1769     // tasks even after current task is preempted after a split task times out.
1770     Configuration sinkConf = HBaseConfiguration.create(conf);
1771     sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1772       conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
1773     sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1774       conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
1775     sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
1776     this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory);
1777     splitLogWorker.start();
1778   }
1779 
1780   /**
1781    * Puts up the webui.
1782    * @return Returns final port -- maybe different from what we started with.
1783    * @throws IOException
1784    */
1785   private int putUpWebUI() throws IOException {
1786     int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
1787       HConstants.DEFAULT_REGIONSERVER_INFOPORT);
1788     String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
1789 
1790     if(this instanceof HMaster) {
1791       port = conf.getInt(HConstants.MASTER_INFO_PORT,
1792           HConstants.DEFAULT_MASTER_INFOPORT);
1793       addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
1794     }
1795     // -1 is for disabling info server
1796     if (port < 0) return port;
1797 
1798     if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
1799       String msg =
1800           "Failed to start http info server. Address " + addr
1801               + " does not belong to this host. Correct configuration parameter: "
1802               + "hbase.regionserver.info.bindAddress";
1803       LOG.error(msg);
1804       throw new IOException(msg);
1805     }
1806     // check if auto port bind enabled
1807     boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
1808         false);
1809     while (true) {
1810       try {
1811         this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
1812         infoServer.addServlet("dump", "/dump", getDumpServlet());
1813         configureInfoServer();
1814         this.infoServer.start();
1815         break;
1816       } catch (BindException e) {
1817         if (!auto) {
1818           // auto bind disabled throw BindException
1819           LOG.error("Failed binding http info server to port: " + port);
1820           throw e;
1821         }
1822         // auto bind enabled, try to use another port
1823         LOG.info("Failed binding http info server to port: " + port);
1824         port++;
1825       }
1826     }
1827     port = this.infoServer.getPort();
1828     conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
1829     int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT,
1830       HConstants.DEFAULT_MASTER_INFOPORT);
1831     conf.setInt("hbase.master.info.port.orig", masterInfoPort);
1832     conf.setInt(HConstants.MASTER_INFO_PORT, port);
1833     return port;
1834   }
1835 
1836   /*
1837    * Verify that server is healthy
1838    */
1839   private boolean isHealthy() {
1840     if (!fsOk) {
1841       // File system problem
1842       return false;
1843     }
1844     // Verify that all threads are alive
1845     if (!(leases.isAlive()
1846         && cacheFlusher.isAlive() && walRoller.isAlive()
1847         && this.compactionChecker.isScheduled()
1848         && this.periodicFlusher.isScheduled())) {
1849       stop("One or more threads are no longer alive -- stop");
1850       return false;
1851     }
1852     final LogRoller metawalRoller = this.metawalRoller.get();
1853     if (metawalRoller != null && !metawalRoller.isAlive()) {
1854       stop("Meta WAL roller thread is no longer alive -- stop");
1855       return false;
1856     }
1857     return true;
1858   }
1859 
1860   private static final byte[] UNSPECIFIED_REGION = new byte[]{};
1861 
1862   @Override
1863   public WAL getWAL(HRegionInfo regionInfo) throws IOException {
1864     WAL wal;
1865     LogRoller roller = walRoller;
1866     //_ROOT_ and hbase:meta regions have separate WAL.
1867     if (regionInfo != null && regionInfo.isMetaTable() &&
1868         regionInfo.getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
1869       roller = ensureMetaWALRoller();
1870       wal = walFactory.getMetaWAL(regionInfo.getEncodedNameAsBytes());
1871     } else if (regionInfo == null) {
1872       wal = walFactory.getWAL(UNSPECIFIED_REGION);
1873     } else {
1874       wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes());
1875     }
1876     roller.addWAL(wal);
1877     return wal;
1878   }
1879 
1880   @Override
1881   public ClusterConnection getConnection() {
1882     return this.clusterConnection;
1883   }
1884 
1885   @Override
1886   public MetaTableLocator getMetaTableLocator() {
1887     return this.metaTableLocator;
1888   }
1889 
1890   @Override
1891   public void stop(final String msg) {
1892     stop(msg, false);
1893   }
1894 
1895   /**
1896    * Stops the regionserver.
1897    * @param msg Status message
1898    * @param force True if this is a regionserver abort
1899    */
1900   public void stop(final String msg, final boolean force) {
1901     if (!this.stopped) {
1902       if (this.rsHost != null) {
1903         // when forced via abort don't allow CPs to override
1904         try {
1905           this.rsHost.preStop(msg);
1906         } catch (IOException ioe) {
1907           if (!force) {
1908             LOG.warn("The region server did not stop", ioe);
1909             return;
1910           }
1911           LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe);
1912         }
1913       }
1914       this.stopped = true;
1915       LOG.info("STOPPED: " + msg);
1916       // Wakes run() if it is sleeping
1917       sleeper.skipSleepCycle();
1918     }
1919   }
1920 
1921   public void waitForServerOnline(){
1922     while (!isStopped() && !isOnline()) {
1923       synchronized (online) {
1924         try {
1925           online.wait(msgInterval);
1926         } catch (InterruptedException ie) {
1927           Thread.currentThread().interrupt();
1928           break;
1929         }
1930       }
1931     }
1932   }
1933 
1934   @Override
1935   public void postOpenDeployTasks(final Region r) throws KeeperException, IOException {
1936     postOpenDeployTasks(new PostOpenDeployContext(r, -1));
1937   }
1938 
1939   @Override
1940   public void postOpenDeployTasks(final PostOpenDeployContext context)
1941       throws KeeperException, IOException {
1942     Region r = context.getRegion();
1943     long masterSystemTime = context.getMasterSystemTime();
1944     Preconditions.checkArgument(r instanceof HRegion, "r must be an HRegion");
1945     rpcServices.checkOpen();
1946     LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString());
1947     // Do checks to see if we need to compact (references or too many files)
1948     for (Store s : r.getStores()) {
1949       if (s.hasReferences() || s.needsCompaction()) {
1950        this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
1951       }
1952     }
1953     long openSeqNum = r.getOpenSeqNum();
1954     if (openSeqNum == HConstants.NO_SEQNUM) {
1955       // If we opened a region, we should have read some sequence number from it.
1956       LOG.error("No sequence number found when opening " +
1957         r.getRegionInfo().getRegionNameAsString());
1958       openSeqNum = 0;
1959     }
1960 
1961     // Update flushed sequence id of a recovering region in ZK
1962     updateRecoveringRegionLastFlushedSequenceId(r);
1963 
1964     // Update ZK, or META
1965     if (r.getRegionInfo().isMetaRegion()) {
1966       MetaTableLocator.setMetaLocation(getZooKeeper(), serverName, r.getRegionInfo().getReplicaId(),
1967          State.OPEN);
1968     } else if (useZKForAssignment) {
1969       MetaTableAccessor.updateRegionLocation(getConnection(), r.getRegionInfo(),
1970         this.serverName, openSeqNum, masterSystemTime);
1971     }
1972     if (!useZKForAssignment && !reportRegionStateTransition(new RegionStateTransitionContext(
1973         TransitionCode.OPENED, openSeqNum, masterSystemTime, r.getRegionInfo()))) {
1974       throw new IOException("Failed to report opened region to master: "
1975         + r.getRegionInfo().getRegionNameAsString());
1976     }
1977 
1978     triggerFlushInPrimaryRegion((HRegion)r);
1979 
1980     LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
1981   }
1982 
1983   @Override
1984   public boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris) {
1985     return reportRegionStateTransition(code, HConstants.NO_SEQNUM, hris);
1986   }
1987 
1988   @Override
1989   public boolean reportRegionStateTransition(
1990       TransitionCode code, long openSeqNum, HRegionInfo... hris) {
1991     return reportRegionStateTransition(
1992       new RegionStateTransitionContext(code, HConstants.NO_SEQNUM, -1, hris));
1993   }
1994 
1995   @Override
1996   public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
1997     TransitionCode code = context.getCode();
1998     long openSeqNum = context.getOpenSeqNum();
1999     HRegionInfo[] hris = context.getHris();
2000 
2001     ReportRegionStateTransitionRequest.Builder builder =
2002       ReportRegionStateTransitionRequest.newBuilder();
2003     builder.setServer(ProtobufUtil.toServerName(serverName));
2004     RegionStateTransition.Builder transition = builder.addTransitionBuilder();
2005     transition.setTransitionCode(code);
2006     if (code == TransitionCode.OPENED && openSeqNum >= 0) {
2007       transition.setOpenSeqNum(openSeqNum);
2008     }
2009     for (HRegionInfo hri: hris) {
2010       transition.addRegionInfo(HRegionInfo.convert(hri));
2011     }
2012     ReportRegionStateTransitionRequest request = builder.build();
2013     while (keepLooping()) {
2014       RegionServerStatusService.BlockingInterface rss = rssStub;
2015       try {
2016         if (rss == null) {
2017           createRegionServerStatusStub();
2018           continue;
2019         }
2020         ReportRegionStateTransitionResponse response =
2021           rss.reportRegionStateTransition(null, request);
2022         if (response.hasErrorMessage()) {
2023           LOG.info("Failed to transition " + hris[0]
2024             + " to " + code + ": " + response.getErrorMessage());
2025           return false;
2026         }
2027         return true;
2028       } catch (ServiceException se) {
2029         IOException ioe = ProtobufUtil.getRemoteException(se);
2030         LOG.info("Failed to report region transition, will retry", ioe);
2031         if (rssStub == rss) {
2032           rssStub = null;
2033         }
2034       }
2035     }
2036     return false;
2037   }
2038 
2039   /**
2040    * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2041    * block this thread. See RegionReplicaFlushHandler for details.
2042    */
2043   void triggerFlushInPrimaryRegion(final HRegion region) {
2044     if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2045       return;
2046     }
2047     if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) ||
2048         !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(
2049           region.conf)) {
2050       region.setReadsEnabled(true);
2051       return;
2052     }
2053 
2054     region.setReadsEnabled(false); // disable reads before marking the region as opened.
2055     // RegionReplicaFlushHandler might reset this.
2056 
2057     // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2058     this.service.submit(
2059       new RegionReplicaFlushHandler(this, clusterConnection,
2060         rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
2061   }
2062 
2063   @Override
2064   public RpcServerInterface getRpcServer() {
2065     return rpcServices.rpcServer;
2066   }
2067 
2068   @VisibleForTesting
2069   public RSRpcServices getRSRpcServices() {
2070     return rpcServices;
2071   }
2072 
2073   /**
2074    * Cause the server to exit without closing the regions it is serving, the log
2075    * it is using and without notifying the master. Used unit testing and on
2076    * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
2077    *
2078    * @param reason
2079    *          the reason we are aborting
2080    * @param cause
2081    *          the exception that caused the abort, or null
2082    */
2083   @Override
2084   public void abort(final String reason, Throwable cause) {
2085     String msg = "ABORTING region server " + this + ": " + reason;
2086     if (cause != null) {
2087       LOG.fatal(msg, cause);
2088     } else {
2089       LOG.fatal(msg);
2090     }
2091     this.abortRequested = true;
2092     // HBASE-4014: show list of coprocessors that were loaded to help debug
2093     // regionserver crashes.Note that we're implicitly using
2094     // java.util.HashSet's toString() method to print the coprocessor names.
2095     LOG.fatal("RegionServer abort: loaded coprocessors are: " +
2096         CoprocessorHost.getLoadedCoprocessors());
2097     // Try and dump metrics if abort -- might give clue as to how fatal came about....
2098     try {
2099       LOG.info("Dump of metrics as JSON on abort: " + JSONBean.dumpRegionServerMetrics());
2100     } catch (MalformedObjectNameException | IOException e) {
2101       LOG.warn("Failed dumping metrics", e);
2102     }
2103 
2104     // Do our best to report our abort to the master, but this may not work
2105     try {
2106       if (cause != null) {
2107         msg += "\nCause:\n" + StringUtils.stringifyException(cause);
2108       }
2109       // Report to the master but only if we have already registered with the master.
2110       if (rssStub != null && this.serverName != null) {
2111         ReportRSFatalErrorRequest.Builder builder =
2112           ReportRSFatalErrorRequest.newBuilder();
2113         ServerName sn =
2114           ServerName.parseVersionedServerName(this.serverName.getVersionedBytes());
2115         builder.setServer(ProtobufUtil.toServerName(sn));
2116         builder.setErrorMessage(msg);
2117         rssStub.reportRSFatalError(null, builder.build());
2118       }
2119     } catch (Throwable t) {
2120       LOG.warn("Unable to report fatal error to master", t);
2121     }
2122     // shutdown should be run as the internal user
2123     if (User.isHBaseSecurityEnabled(conf)) {
2124       try {
2125         User.runAsLoginUser(new PrivilegedExceptionAction<Object>() {
2126           @Override
2127           public Object run() throws Exception {
2128             stop(reason, true);
2129             return null;
2130           }
2131         });
2132       } catch (IOException neverThrown) {
2133       }
2134     } else {
2135       stop(reason, true);
2136     }
2137   }
2138 
2139   /**
2140    * @see HRegionServer#abort(String, Throwable)
2141    */
2142   public void abort(String reason) {
2143     abort(reason, null);
2144   }
2145 
2146   @Override
2147   public boolean isAborted() {
2148     return this.abortRequested;
2149   }
2150 
2151   /*
2152    * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
2153    * logs but it does close socket in case want to bring up server on old
2154    * hostname+port immediately.
2155    */
2156   protected void kill() {
2157     this.killed = true;
2158     abort("Simulated kill");
2159   }
2160 
2161   /**
2162    * Called on stop/abort before closing the cluster connection and meta locator.
2163    */
2164   protected void sendShutdownInterrupt() {
2165   }
2166 
2167   /**
2168    * Wait on all threads to finish. Presumption is that all closes and stops
2169    * have already been called.
2170    */
2171   protected void stopServiceThreads() {
2172     // clean up the scheduled chores
2173     if (this.choreService != null) choreService.shutdown();
2174     if (this.nonceManagerChore != null) nonceManagerChore.cancel(true);
2175     if (this.compactionChecker != null) compactionChecker.cancel(true);
2176     if (this.periodicFlusher != null) periodicFlusher.cancel(true);
2177     if (this.healthCheckChore != null) healthCheckChore.cancel(true);
2178     if (this.storefileRefresher != null) storefileRefresher.cancel(true);
2179     if (this.movedRegionsCleaner != null) movedRegionsCleaner.cancel(true);
2180 
2181     if (this.cacheFlusher != null) {
2182       this.cacheFlusher.join();
2183     }
2184 
2185     if (this.spanReceiverHost != null) {
2186       this.spanReceiverHost.closeReceivers();
2187     }
2188     if (this.walRoller != null) {
2189       Threads.shutdown(this.walRoller.getThread());
2190     }
2191     final LogRoller metawalRoller = this.metawalRoller.get();
2192     if (metawalRoller != null) {
2193       Threads.shutdown(metawalRoller.getThread());
2194     }
2195     if (this.compactSplitThread != null) {
2196       this.compactSplitThread.join();
2197     }
2198     if (this.service != null) this.service.shutdown();
2199     if (this.replicationSourceHandler != null &&
2200         this.replicationSourceHandler == this.replicationSinkHandler) {
2201       this.replicationSourceHandler.stopReplicationService();
2202     } else {
2203       if (this.replicationSourceHandler != null) {
2204         this.replicationSourceHandler.stopReplicationService();
2205       }
2206       if (this.replicationSinkHandler != null) {
2207         this.replicationSinkHandler.stopReplicationService();
2208       }
2209     }
2210   }
2211 
2212   /**
2213    * @return Return the object that implements the replication
2214    * source service.
2215    */
2216   @VisibleForTesting
2217   public ReplicationSourceService getReplicationSourceService() {
2218     return replicationSourceHandler;
2219   }
2220 
2221   /**
2222    * @return Return the object that implements the replication
2223    * sink service.
2224    */
2225   ReplicationSinkService getReplicationSinkService() {
2226     return replicationSinkHandler;
2227   }
2228 
2229   /**
2230    * Get the current master from ZooKeeper and open the RPC connection to it.
2231    * To get a fresh connection, the current rssStub must be null.
2232    * Method will block until a master is available. You can break from this
2233    * block by requesting the server stop.
2234    *
2235    * @return master + port, or null if server has been stopped
2236    */
2237   @VisibleForTesting
2238   protected synchronized ServerName createRegionServerStatusStub() {
2239     // Create RS stub without refreshing the master node from ZK, use cached data
2240     return createRegionServerStatusStub(false);
2241   }
2242 
2243   /**
2244    * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2245    * connection, the current rssStub must be null. Method will block until a master is available.
2246    * You can break from this block by requesting the server stop.
2247    * @param refresh If true then master address will be read from ZK, otherwise use cached data
2248    * @return master + port, or null if server has been stopped
2249    */
2250   @VisibleForTesting
2251   protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
2252     if (rssStub != null) {
2253       return masterAddressTracker.getMasterAddress();
2254     }
2255     ServerName sn = null;
2256     long previousLogTime = 0;
2257     RegionServerStatusService.BlockingInterface intf = null;
2258     boolean interrupted = false;
2259     try {
2260       while (keepLooping()) {
2261         sn = this.masterAddressTracker.getMasterAddress(refresh);
2262         if (sn == null) {
2263           if (!keepLooping()) {
2264             // give up with no connection.
2265             LOG.debug("No master found and cluster is stopped; bailing out");
2266             return null;
2267           }
2268           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2269             LOG.debug("No master found; retry");
2270             previousLogTime = System.currentTimeMillis();
2271           }
2272           refresh = true; // let's try pull it from ZK directly
2273           if (sleep(200)) {
2274             interrupted = true;
2275           }
2276           continue;
2277         }
2278 
2279         // If we are on the active master, use the shortcut
2280         if (this instanceof HMaster && sn.equals(getServerName())) {
2281           intf = ((HMaster)this).getMasterRpcServices();
2282           break;
2283         }
2284         try {
2285           BlockingRpcChannel channel =
2286             this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(),
2287               shortOperationTimeout);
2288           intf = RegionServerStatusService.newBlockingStub(channel);
2289           break;
2290         } catch (IOException e) {
2291           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2292             e = e instanceof RemoteException ?
2293               ((RemoteException)e).unwrapRemoteException() : e;
2294             if (e instanceof ServerNotRunningYetException) {
2295               LOG.info("Master isn't available yet, retrying");
2296             } else {
2297               LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2298             }
2299             previousLogTime = System.currentTimeMillis();
2300           }
2301           if (sleep(200)) {
2302             interrupted = true;
2303           }
2304         }
2305       }
2306     } finally {
2307       if (interrupted) {
2308         Thread.currentThread().interrupt();
2309       }
2310     }
2311     rssStub = intf;
2312     return sn;
2313   }
2314 
2315   /**
2316    * @return True if we should break loop because cluster is going down or
2317    * this server has been stopped or hdfs has gone bad.
2318    */
2319   private boolean keepLooping() {
2320     return !this.stopped && isClusterUp();
2321   }
2322 
2323   /*
2324    * Let the master know we're here Run initialization using parameters passed
2325    * us by the master.
2326    * @return A Map of key/value configurations we got from the Master else
2327    * null if we failed to register.
2328    * @throws IOException
2329    */
2330   private RegionServerStartupResponse reportForDuty() throws IOException {
2331     ServerName masterServerName = createRegionServerStatusStub(true);
2332     if (masterServerName == null) return null;
2333     RegionServerStartupResponse result = null;
2334     try {
2335       rpcServices.requestCount.set(0);
2336       LOG.info("reportForDuty to master=" + masterServerName + " with port="
2337         + rpcServices.isa.getPort() + ", startcode=" + this.startcode);
2338       long now = EnvironmentEdgeManager.currentTime();
2339       int port = rpcServices.isa.getPort();
2340       RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2341       if (shouldUseThisHostnameInstead()) {
2342         request.setUseThisHostnameInstead(useThisHostnameInstead);
2343       }
2344       request.setPort(port);
2345       request.setServerStartCode(this.startcode);
2346       request.setServerCurrentTime(now);
2347       result = this.rssStub.regionServerStartup(null, request.build());
2348     } catch (ServiceException se) {
2349       IOException ioe = ProtobufUtil.getRemoteException(se);
2350       if (ioe instanceof ClockOutOfSyncException) {
2351         LOG.fatal("Master rejected startup because clock is out of sync", ioe);
2352         // Re-throw IOE will cause RS to abort
2353         throw ioe;
2354       } else if (ioe instanceof ServerNotRunningYetException) {
2355         LOG.debug("Master is not running yet");
2356       } else {
2357         LOG.warn("error telling master we are up", se);
2358       }
2359       rssStub = null;
2360     }
2361     return result;
2362   }
2363 
2364   @Override
2365   public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2366     try {
2367       GetLastFlushedSequenceIdRequest req =
2368           RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2369       RegionServerStatusService.BlockingInterface rss = rssStub;
2370       if (rss == null) { // Try to connect one more time
2371         createRegionServerStatusStub();
2372         rss = rssStub;
2373         if (rss == null) {
2374           // Still no luck, we tried
2375           LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2376           return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2377               .build();
2378         }
2379       }
2380       GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2381       return RegionStoreSequenceIds.newBuilder()
2382           .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2383           .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2384     } catch (ServiceException e) {
2385       LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2386       return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2387           .build();
2388     }
2389   }
2390 
2391   /**
2392    * Closes all regions.  Called on our way out.
2393    * Assumes that its not possible for new regions to be added to onlineRegions
2394    * while this method runs.
2395    */
2396   protected void closeAllRegions(final boolean abort) {
2397     closeUserRegions(abort);
2398     closeMetaTableRegions(abort);
2399   }
2400 
2401   /**
2402    * Close meta region if we carry it
2403    * @param abort Whether we're running an abort.
2404    */
2405   void closeMetaTableRegions(final boolean abort) {
2406     Region meta = null;
2407     this.lock.writeLock().lock();
2408     try {
2409       for (Map.Entry<String, Region> e: onlineRegions.entrySet()) {
2410         HRegionInfo hri = e.getValue().getRegionInfo();
2411         if (hri.isMetaRegion()) {
2412           meta = e.getValue();
2413         }
2414         if (meta != null) break;
2415       }
2416     } finally {
2417       this.lock.writeLock().unlock();
2418     }
2419     if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2420   }
2421 
2422   /**
2423    * Schedule closes on all user regions.
2424    * Should be safe calling multiple times because it wont' close regions
2425    * that are already closed or that are closing.
2426    * @param abort Whether we're running an abort.
2427    */
2428   void closeUserRegions(final boolean abort) {
2429     this.lock.writeLock().lock();
2430     try {
2431       for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
2432         Region r = e.getValue();
2433         if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) {
2434           // Don't update zk with this close transition; pass false.
2435           closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2436         }
2437       }
2438     } finally {
2439       this.lock.writeLock().unlock();
2440     }
2441   }
2442 
2443   /** @return the info server */
2444   public InfoServer getInfoServer() {
2445     return infoServer;
2446   }
2447 
2448   /**
2449    * @return true if a stop has been requested.
2450    */
2451   @Override
2452   public boolean isStopped() {
2453     return this.stopped;
2454   }
2455 
2456   @Override
2457   public boolean isStopping() {
2458     return this.stopping;
2459   }
2460 
2461   @Override
2462   public Map<String, Region> getRecoveringRegions() {
2463     return this.recoveringRegions;
2464   }
2465 
2466   /**
2467    *
2468    * @return the configuration
2469    */
2470   @Override
2471   public Configuration getConfiguration() {
2472     return conf;
2473   }
2474 
2475   /** @return the write lock for the server */
2476   ReentrantReadWriteLock.WriteLock getWriteLock() {
2477     return lock.writeLock();
2478   }
2479 
2480   public int getNumberOfOnlineRegions() {
2481     return this.onlineRegions.size();
2482   }
2483 
2484   boolean isOnlineRegionsEmpty() {
2485     return this.onlineRegions.isEmpty();
2486   }
2487 
2488   /**
2489    * For tests, web ui and metrics.
2490    * This method will only work if HRegionServer is in the same JVM as client;
2491    * HRegion cannot be serialized to cross an rpc.
2492    */
2493   public Collection<Region> getOnlineRegionsLocalContext() {
2494     Collection<Region> regions = this.onlineRegions.values();
2495     return Collections.unmodifiableCollection(regions);
2496   }
2497 
2498   @Override
2499   public void addToOnlineRegions(Region region) {
2500     this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2501     configurationManager.registerObserver(region);
2502   }
2503 
2504   /**
2505    * @return A new Map of online regions sorted by region size with the first entry being the
2506    * biggest.  If two regions are the same size, then the last one found wins; i.e. this method
2507    * may NOT return all regions.
2508    */
2509   SortedMap<Long, Region> getCopyOfOnlineRegionsSortedBySize() {
2510     // we'll sort the regions in reverse
2511     SortedMap<Long, Region> sortedRegions = new TreeMap<Long, Region>(
2512         new Comparator<Long>() {
2513           @Override
2514           public int compare(Long a, Long b) {
2515             return -1 * a.compareTo(b);
2516           }
2517         });
2518     // Copy over all regions. Regions are sorted by size with biggest first.
2519     for (Region region : this.onlineRegions.values()) {
2520       sortedRegions.put(region.getMemstoreSize(), region);
2521     }
2522     return sortedRegions;
2523   }
2524 
2525   /**
2526    * @return time stamp in millis of when this region server was started
2527    */
2528   public long getStartcode() {
2529     return this.startcode;
2530   }
2531 
2532   /** @return reference to FlushRequester */
2533   @Override
2534   public FlushRequester getFlushRequester() {
2535     return this.cacheFlusher;
2536   }
2537 
2538   /**
2539    * Get the top N most loaded regions this server is serving so we can tell the
2540    * master which regions it can reallocate if we're overloaded. TODO: actually
2541    * calculate which regions are most loaded. (Right now, we're just grabbing
2542    * the first N regions being served regardless of load.)
2543    */
2544   protected HRegionInfo[] getMostLoadedRegions() {
2545     ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2546     for (Region r : onlineRegions.values()) {
2547       if (!r.isAvailable()) {
2548         continue;
2549       }
2550       if (regions.size() < numRegionsToReport) {
2551         regions.add(r.getRegionInfo());
2552       } else {
2553         break;
2554       }
2555     }
2556     return regions.toArray(new HRegionInfo[regions.size()]);
2557   }
2558 
2559   @Override
2560   public Leases getLeases() {
2561     return leases;
2562   }
2563 
2564   /**
2565    * @return Return the rootDir.
2566    */
2567   protected Path getRootDir() {
2568     return rootDir;
2569   }
2570 
2571   /**
2572    * @return Return the fs.
2573    */
2574   @Override
2575   public FileSystem getFileSystem() {
2576     return fs;
2577   }
2578 
2579   @Override
2580   public String toString() {
2581     return getServerName().toString();
2582   }
2583 
2584   /**
2585    * Interval at which threads should run
2586    *
2587    * @return the interval
2588    */
2589   public int getThreadWakeFrequency() {
2590     return threadWakeFrequency;
2591   }
2592 
2593   @Override
2594   public ZooKeeperWatcher getZooKeeper() {
2595     return zooKeeper;
2596   }
2597 
2598   @Override
2599   public BaseCoordinatedStateManager getCoordinatedStateManager() {
2600     return csm;
2601   }
2602 
2603   @Override
2604   public ServerName getServerName() {
2605     return serverName;
2606   }
2607 
2608   @Override
2609   public CompactionRequestor getCompactionRequester() {
2610     return this.compactSplitThread;
2611   }
2612 
2613   public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
2614     return this.rsHost;
2615   }
2616 
2617   @Override
2618   public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2619     return this.regionsInTransitionInRS;
2620   }
2621 
2622   @Override
2623   public ExecutorService getExecutorService() {
2624     return service;
2625   }
2626 
2627   @Override
2628   public ChoreService getChoreService() {
2629     return choreService;
2630   }
2631   
2632   @Override
2633   public RegionServerQuotaManager getRegionServerQuotaManager() {
2634     return rsQuotaManager;
2635   }
2636 
2637   //
2638   // Main program and support routines
2639   //
2640 
2641   /**
2642    * Load the replication service objects, if any
2643    */
2644   static private void createNewReplicationInstance(Configuration conf,
2645     HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
2646 
2647     // If replication is not enabled, then return immediately.
2648     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
2649         HConstants.REPLICATION_ENABLE_DEFAULT)) {
2650       return;
2651     }
2652 
2653     // read in the name of the source replication class from the config file.
2654     String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2655                                HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2656 
2657     // read in the name of the sink replication class from the config file.
2658     String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2659                              HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2660 
2661     // If both the sink and the source class names are the same, then instantiate
2662     // only one object.
2663     if (sourceClassname.equals(sinkClassname)) {
2664       server.replicationSourceHandler = (ReplicationSourceService)
2665                                          newReplicationInstance(sourceClassname,
2666                                          conf, server, fs, logDir, oldLogDir);
2667       server.replicationSinkHandler = (ReplicationSinkService)
2668                                          server.replicationSourceHandler;
2669     } else {
2670       server.replicationSourceHandler = (ReplicationSourceService)
2671                                          newReplicationInstance(sourceClassname,
2672                                          conf, server, fs, logDir, oldLogDir);
2673       server.replicationSinkHandler = (ReplicationSinkService)
2674                                          newReplicationInstance(sinkClassname,
2675                                          conf, server, fs, logDir, oldLogDir);
2676     }
2677   }
2678 
2679   static private ReplicationService newReplicationInstance(String classname,
2680     Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
2681     Path oldLogDir) throws IOException{
2682 
2683     Class<?> clazz = null;
2684     try {
2685       ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2686       clazz = Class.forName(classname, true, classLoader);
2687     } catch (java.lang.ClassNotFoundException nfe) {
2688       throw new IOException("Could not find class for " + classname);
2689     }
2690 
2691     // create an instance of the replication object.
2692     ReplicationService service = (ReplicationService)
2693                               ReflectionUtils.newInstance(clazz, conf);
2694     service.initialize(server, fs, logDir, oldLogDir);
2695     return service;
2696   }
2697 
2698   /**
2699    * Utility for constructing an instance of the passed HRegionServer class.
2700    *
2701    * @param regionServerClass
2702    * @param conf2
2703    * @return HRegionServer instance.
2704    */
2705   public static HRegionServer constructRegionServer(
2706       Class<? extends HRegionServer> regionServerClass,
2707       final Configuration conf2, CoordinatedStateManager cp) {
2708     try {
2709       Constructor<? extends HRegionServer> c = regionServerClass
2710           .getConstructor(Configuration.class, CoordinatedStateManager.class);
2711       return c.newInstance(conf2, cp);
2712     } catch (Exception e) {
2713       throw new RuntimeException("Failed construction of " + "Regionserver: "
2714           + regionServerClass.toString(), e);
2715     }
2716   }
2717 
2718   /**
2719    * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2720    */
2721   public static void main(String[] args) throws Exception {
2722     VersionInfo.logVersion();
2723     Configuration conf = HBaseConfiguration.create();
2724     @SuppressWarnings("unchecked")
2725     Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2726         .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2727 
2728     new HRegionServerCommandLine(regionServerClass).doMain(args);
2729   }
2730 
2731   /**
2732    * Gets the online regions of the specified table.
2733    * This method looks at the in-memory onlineRegions.  It does not go to <code>hbase:meta</code>.
2734    * Only returns <em>online</em> regions.  If a region on this table has been
2735    * closed during a disable, etc., it will not be included in the returned list.
2736    * So, the returned list may not necessarily be ALL regions in this table, its
2737    * all the ONLINE regions in the table.
2738    * @param tableName
2739    * @return Online regions from <code>tableName</code>
2740    */
2741   @Override
2742   public List<Region> getOnlineRegions(TableName tableName) {
2743      List<Region> tableRegions = new ArrayList<Region>();
2744      synchronized (this.onlineRegions) {
2745        for (Region region: this.onlineRegions.values()) {
2746          HRegionInfo regionInfo = region.getRegionInfo();
2747          if(regionInfo.getTable().equals(tableName)) {
2748            tableRegions.add(region);
2749          }
2750        }
2751      }
2752      return tableRegions;
2753    }
2754   
2755   /**
2756    * Gets the online tables in this RS.
2757    * This method looks at the in-memory onlineRegions.
2758    * @return all the online tables in this RS
2759    */
2760   @Override
2761   public Set<TableName> getOnlineTables() {
2762     Set<TableName> tables = new HashSet<TableName>();
2763     synchronized (this.onlineRegions) {
2764       for (Region region: this.onlineRegions.values()) {
2765         tables.add(region.getTableDesc().getTableName());
2766       }
2767     }
2768     return tables;
2769   }
2770 
2771   // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
2772   public String[] getRegionServerCoprocessors() {
2773     TreeSet<String> coprocessors = new TreeSet<String>();
2774     try {
2775       coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
2776     } catch (IOException exception) {
2777       LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
2778           "skipping.");
2779       LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2780     }
2781     Collection<Region> regions = getOnlineRegionsLocalContext();
2782     for (Region region: regions) {
2783       coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2784       try {
2785         coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
2786       } catch (IOException exception) {
2787         LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
2788             "; skipping.");
2789         LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2790       }
2791     }
2792     return coprocessors.toArray(new String[coprocessors.size()]);
2793   }
2794 
2795   /**
2796    * Try to close the region, logs a warning on failure but continues.
2797    * @param region Region to close
2798    */
2799   private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
2800     try {
2801       CloseRegionCoordination.CloseRegionDetails details =
2802         csm.getCloseRegionCoordination().getDetaultDetails();
2803       if (!closeRegion(region.getEncodedName(), abort, details, null)) {
2804         LOG.warn("Failed to close " + region.getRegionNameAsString() +
2805             " - ignoring and continuing");
2806       }
2807     } catch (IOException e) {
2808       LOG.warn("Failed to close " + region.getRegionNameAsString() +
2809           " - ignoring and continuing", e);
2810     }
2811   }
2812 
2813   /**
2814    * Close asynchronously a region, can be called from the master or internally by the regionserver
2815    * when stopping. If called from the master, the region will update the znode status.
2816    *
2817    * <p>
2818    * If an opening was in progress, this method will cancel it, but will not start a new close. The
2819    * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
2820    * </p>
2821 
2822    * <p>
2823    *   If a close was in progress, this new request will be ignored, and an exception thrown.
2824    * </p>
2825    *
2826    * @param encodedName Region to close
2827    * @param abort True if we are aborting
2828    * @param crd details about closing region coordination-coordinated task
2829    * @return True if closed a region.
2830    * @throws NotServingRegionException if the region is not online
2831    * @throws RegionAlreadyInTransitionException if the region is already closing
2832    */
2833   protected boolean closeRegion(String encodedName, final boolean abort,
2834       CloseRegionCoordination.CloseRegionDetails crd, final ServerName sn)
2835       throws NotServingRegionException, RegionAlreadyInTransitionException {
2836     //Check for permissions to close.
2837     Region actualRegion = this.getFromOnlineRegions(encodedName);
2838     // Can be null if we're calling close on a region that's not online
2839     if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2840       try {
2841         actualRegion.getCoprocessorHost().preClose(false);
2842       } catch (IOException exp) {
2843         LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2844         return false;
2845       }
2846     }
2847 
2848     final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(encodedName.getBytes(),
2849         Boolean.FALSE);
2850 
2851     if (Boolean.TRUE.equals(previous)) {
2852       LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
2853           "trying to OPEN. Cancelling OPENING.");
2854       if (!regionsInTransitionInRS.replace(encodedName.getBytes(), previous, Boolean.FALSE)){
2855         // The replace failed. That should be an exceptional case, but theoretically it can happen.
2856         // We're going to try to do a standard close then.
2857         LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
2858             " Doing a standard close now");
2859         return closeRegion(encodedName, abort, crd, sn);
2860       }
2861       // Let's get the region from the online region list again
2862       actualRegion = this.getFromOnlineRegions(encodedName);
2863       if (actualRegion == null) { // If already online, we still need to close it.
2864         LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
2865         // The master deletes the znode when it receives this exception.
2866         throw new RegionAlreadyInTransitionException("The region " + encodedName +
2867           " was opening but not yet served. Opening is cancelled.");
2868       }
2869     } else if (Boolean.FALSE.equals(previous)) {
2870       LOG.info("Received CLOSE for the region: " + encodedName +
2871         ", which we are already trying to CLOSE, but not completed yet");
2872       // The master will retry till the region is closed. We need to do this since
2873       // the region could fail to close somehow. If we mark the region closed in master
2874       // while it is not, there could be data loss.
2875       // If the region stuck in closing for a while, and master runs out of retries,
2876       // master will move the region to failed_to_close. Later on, if the region
2877       // is indeed closed, master can properly re-assign it.
2878       throw new RegionAlreadyInTransitionException("The region " + encodedName +
2879         " was already closing. New CLOSE request is ignored.");
2880     }
2881 
2882     if (actualRegion == null) {
2883       LOG.error("Received CLOSE for a region which is not online, and we're not opening.");
2884       this.regionsInTransitionInRS.remove(encodedName.getBytes());
2885       // The master deletes the znode when it receives this exception.
2886       throw new NotServingRegionException("The region " + encodedName +
2887           " is not online, and is not opening.");
2888     }
2889 
2890     CloseRegionHandler crh;
2891     final HRegionInfo hri = actualRegion.getRegionInfo();
2892     if (hri.isMetaRegion()) {
2893       crh = new CloseMetaHandler(this, this, hri, abort,
2894         csm.getCloseRegionCoordination(), crd);
2895     } else {
2896       crh = new CloseRegionHandler(this, this, hri, abort,
2897         csm.getCloseRegionCoordination(), crd, sn);
2898     }
2899     this.service.submit(crh);
2900     return true;
2901   }
2902 
2903    /**
2904    * @param regionName
2905    * @return HRegion for the passed binary <code>regionName</code> or null if
2906    *         named region is not member of the online regions.
2907    */
2908   public Region getOnlineRegion(final byte[] regionName) {
2909     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2910     return this.onlineRegions.get(encodedRegionName);
2911   }
2912 
2913   public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) {
2914     return this.regionFavoredNodesMap.get(encodedRegionName);
2915   }
2916 
2917   @Override
2918   public Region getFromOnlineRegions(final String encodedRegionName) {
2919     return this.onlineRegions.get(encodedRegionName);
2920   }
2921 
2922 
2923   @Override
2924   public boolean removeFromOnlineRegions(final Region r, ServerName destination) {
2925     Region toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
2926     if (destination != null) {
2927       long closeSeqNum = r.getMaxFlushedSeqId();
2928       if (closeSeqNum == HConstants.NO_SEQNUM) {
2929         // No edits in WAL for this region; get the sequence number when the region was opened.
2930         closeSeqNum = r.getOpenSeqNum();
2931         if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0;
2932       }
2933       addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
2934     }
2935     this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
2936     return toReturn != null;
2937   }
2938 
2939   /**
2940    * Protected utility method for safely obtaining an HRegion handle.
2941    *
2942    * @param regionName
2943    *          Name of online {@link Region} to return
2944    * @return {@link Region} for <code>regionName</code>
2945    * @throws NotServingRegionException
2946    */
2947   protected Region getRegion(final byte[] regionName)
2948       throws NotServingRegionException {
2949     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2950     return getRegionByEncodedName(regionName, encodedRegionName);
2951   }
2952 
2953   public Region getRegionByEncodedName(String encodedRegionName)
2954       throws NotServingRegionException {
2955     return getRegionByEncodedName(null, encodedRegionName);
2956   }
2957 
2958   protected Region getRegionByEncodedName(byte[] regionName, String encodedRegionName)
2959     throws NotServingRegionException {
2960     Region region = this.onlineRegions.get(encodedRegionName);
2961     if (region == null) {
2962       MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
2963       if (moveInfo != null) {
2964         throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
2965       }
2966       Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
2967       String regionNameStr = regionName == null?
2968         encodedRegionName: Bytes.toStringBinary(regionName);
2969       if (isOpening != null && isOpening.booleanValue()) {
2970         throw new RegionOpeningException("Region " + regionNameStr +
2971           " is opening on " + this.serverName);
2972       }
2973       throw new NotServingRegionException("Region " + regionNameStr +
2974         " is not online on " + this.serverName);
2975     }
2976     return region;
2977   }
2978 
2979   /*
2980    * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
2981    * IOE if it isn't already.
2982    *
2983    * @param t Throwable
2984    *
2985    * @param msg Message to log in error. Can be null.
2986    *
2987    * @return Throwable converted to an IOE; methods can only let out IOEs.
2988    */
2989   private Throwable cleanup(final Throwable t, final String msg) {
2990     // Don't log as error if NSRE; NSRE is 'normal' operation.
2991     if (t instanceof NotServingRegionException) {
2992       LOG.debug("NotServingRegionException; " + t.getMessage());
2993       return t;
2994     }
2995     if (msg == null) {
2996       LOG.error("", RemoteExceptionHandler.checkThrowable(t));
2997     } else {
2998       LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
2999     }
3000     if (!rpcServices.checkOOME(t)) {
3001       checkFileSystem();
3002     }
3003     return t;
3004   }
3005 
3006   /*
3007    * @param t
3008    *
3009    * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3010    *
3011    * @return Make <code>t</code> an IOE if it isn't already.
3012    */
3013   protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
3014     return (t instanceof IOException ? (IOException) t : msg == null
3015         || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
3016   }
3017 
3018   /**
3019    * Checks to see if the file system is still accessible. If not, sets
3020    * abortRequested and stopRequested
3021    *
3022    * @return false if file system is not available
3023    */
3024   public boolean checkFileSystem() {
3025     if (this.fsOk && this.fs != null) {
3026       try {
3027         FSUtils.checkFileSystemAvailable(this.fs);
3028       } catch (IOException e) {
3029         abort("File System not available", e);
3030         this.fsOk = false;
3031       }
3032     }
3033     return this.fsOk;
3034   }
3035 
3036   @Override
3037   public void updateRegionFavoredNodesMapping(String encodedRegionName,
3038       List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3039     InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
3040     // Refer to the comment on the declaration of regionFavoredNodesMap on why
3041     // it is a map of region name to InetSocketAddress[]
3042     for (int i = 0; i < favoredNodes.size(); i++) {
3043       addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
3044           favoredNodes.get(i).getPort());
3045     }
3046     regionFavoredNodesMap.put(encodedRegionName, addr);
3047   }
3048 
3049   /**
3050    * Return the favored nodes for a region given its encoded name. Look at the
3051    * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[]
3052    * @param encodedRegionName
3053    * @return array of favored locations
3054    */
3055   @Override
3056   public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3057     return regionFavoredNodesMap.get(encodedRegionName);
3058   }
3059 
3060   @Override
3061   public ServerNonceManager getNonceManager() {
3062     return this.nonceManager;
3063   }
3064 
3065   private static class MovedRegionInfo {
3066     private final ServerName serverName;
3067     private final long seqNum;
3068     private final long ts;
3069 
3070     public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3071       this.serverName = serverName;
3072       this.seqNum = closeSeqNum;
3073       ts = EnvironmentEdgeManager.currentTime();
3074      }
3075 
3076     public ServerName getServerName() {
3077       return serverName;
3078     }
3079 
3080     public long getSeqNum() {
3081       return seqNum;
3082     }
3083 
3084     public long getMoveTime() {
3085       return ts;
3086     }
3087   }
3088 
3089   // This map will contains all the regions that we closed for a move.
3090   //  We add the time it was moved as we don't want to keep too old information
3091   protected Map<String, MovedRegionInfo> movedRegions =
3092       new ConcurrentHashMap<String, MovedRegionInfo>(3000);
3093 
3094   // We need a timeout. If not there is a risk of giving a wrong information: this would double
3095   //  the number of network calls instead of reducing them.
3096   private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3097 
3098   protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
3099     if (ServerName.isSameHostnameAndPort(destination, this.getServerName())) {
3100       LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3101       return;
3102     }
3103     LOG.info("Adding moved region record: "
3104       + encodedName + " to " + destination + " as of " + closeSeqNum);
3105     movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3106   }
3107 
3108   void removeFromMovedRegions(String encodedName) {
3109     movedRegions.remove(encodedName);
3110   }
3111 
3112   private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
3113     MovedRegionInfo dest = movedRegions.get(encodedRegionName);
3114 
3115     long now = EnvironmentEdgeManager.currentTime();
3116     if (dest != null) {
3117       if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
3118         return dest;
3119       } else {
3120         movedRegions.remove(encodedRegionName);
3121       }
3122     }
3123 
3124     return null;
3125   }
3126 
3127   /**
3128    * Remove the expired entries from the moved regions list.
3129    */
3130   protected void cleanMovedRegions() {
3131     final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
3132     Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();
3133 
3134     while (it.hasNext()){
3135       Map.Entry<String, MovedRegionInfo> e = it.next();
3136       if (e.getValue().getMoveTime() < cutOff) {
3137         it.remove();
3138       }
3139     }
3140   }
3141 
3142   /*
3143    * Use this to allow tests to override and schedule more frequently.
3144    */
3145 
3146   protected int movedRegionCleanerPeriod() {
3147         return TIMEOUT_REGION_MOVED;
3148   }
3149 
3150   /**
3151    * Creates a Chore thread to clean the moved region cache.
3152    */
3153 
3154   protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable {
3155     private HRegionServer regionServer;
3156     Stoppable stoppable;
3157 
3158     private MovedRegionsCleaner(
3159       HRegionServer regionServer, Stoppable stoppable){
3160       super("MovedRegionsCleaner for region " + regionServer, stoppable,
3161           regionServer.movedRegionCleanerPeriod());
3162       this.regionServer = regionServer;
3163       this.stoppable = stoppable;
3164     }
3165 
3166     static MovedRegionsCleaner create(HRegionServer rs){
3167       Stoppable stoppable = new Stoppable() {
3168         private volatile boolean isStopped = false;
3169         @Override public void stop(String why) { isStopped = true;}
3170         @Override public boolean isStopped() {return isStopped;}
3171       };
3172 
3173       return new MovedRegionsCleaner(rs, stoppable);
3174     }
3175 
3176     @Override
3177     protected void chore() {
3178       regionServer.cleanMovedRegions();
3179     }
3180 
3181     @Override
3182     public void stop(String why) {
3183       stoppable.stop(why);
3184     }
3185 
3186     @Override
3187     public boolean isStopped() {
3188       return stoppable.isStopped();
3189     }
3190   }
3191 
3192   private String getMyEphemeralNodePath() {
3193     return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
3194   }
3195 
3196   private boolean isHealthCheckerConfigured() {
3197     String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3198     return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
3199   }
3200 
3201   /**
3202    * @return the underlying {@link CompactSplitThread} for the servers
3203    */
3204   public CompactSplitThread getCompactSplitThread() {
3205     return this.compactSplitThread;
3206   }
3207 
3208   /**
3209    * A helper function to store the last flushed sequence Id with the previous failed RS for a
3210    * recovering region. The Id is used to skip wal edits which are flushed. Since the flushed
3211    * sequence id is only valid for each RS, we associate the Id with corresponding failed RS.
3212    * @throws KeeperException
3213    * @throws IOException
3214    */
3215   private void updateRecoveringRegionLastFlushedSequenceId(Region r) throws KeeperException,
3216       IOException {
3217     if (!r.isRecovering()) {
3218       // return immdiately for non-recovering regions
3219       return;
3220     }
3221 
3222     HRegionInfo regionInfo = r.getRegionInfo();
3223     ZooKeeperWatcher zkw = getZooKeeper();
3224     String previousRSName = this.getLastFailedRSFromZK(regionInfo.getEncodedName());
3225     Map<byte[], Long> maxSeqIdInStores = r.getMaxStoreSeqId();
3226     long minSeqIdForLogReplay = -1;
3227     for (Long storeSeqIdForReplay : maxSeqIdInStores.values()) {
3228       if (minSeqIdForLogReplay == -1 || storeSeqIdForReplay < minSeqIdForLogReplay) {
3229         minSeqIdForLogReplay = storeSeqIdForReplay;
3230       }
3231     }
3232 
3233     try {
3234       long lastRecordedFlushedSequenceId = -1;
3235       String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode,
3236         regionInfo.getEncodedName());
3237       // recovering-region level
3238       byte[] data;
3239       try {
3240         data = ZKUtil.getData(zkw, nodePath);
3241       } catch (InterruptedException e) {
3242         throw new InterruptedIOException();
3243       }
3244       if (data != null) {
3245         lastRecordedFlushedSequenceId = ZKSplitLog.parseLastFlushedSequenceIdFrom(data);
3246       }
3247       if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
3248         ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
3249       }
3250       if (previousRSName != null) {
3251         // one level deeper for the failed RS
3252         nodePath = ZKUtil.joinZNode(nodePath, previousRSName);
3253         ZKUtil.setData(zkw, nodePath,
3254           ZKUtil.regionSequenceIdsToByteArray(minSeqIdForLogReplay, maxSeqIdInStores));
3255         LOG.debug("Update last flushed sequence id of region " + regionInfo.getEncodedName() +
3256           " for " + previousRSName);
3257       } else {
3258         LOG.warn("Can't find failed region server for recovering region " +
3259             regionInfo.getEncodedName());
3260       }
3261     } catch (NoNodeException ignore) {
3262       LOG.debug("Region " + regionInfo.getEncodedName() +
3263         " must have completed recovery because its recovery znode has been removed", ignore);
3264     }
3265   }
3266 
3267   /**
3268    * Return the last failed RS name under /hbase/recovering-regions/encodedRegionName
3269    * @param encodedRegionName
3270    * @throws KeeperException
3271    */
3272   private String getLastFailedRSFromZK(String encodedRegionName) throws KeeperException {
3273     String result = null;
3274     long maxZxid = 0;
3275     ZooKeeperWatcher zkw = this.getZooKeeper();
3276     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
3277     List<String> failedServers = ZKUtil.listChildrenNoWatch(zkw, nodePath);
3278     if (failedServers == null || failedServers.isEmpty()) {
3279       return result;
3280     }
3281     for (String failedServer : failedServers) {
3282       String rsPath = ZKUtil.joinZNode(nodePath, failedServer);
3283       Stat stat = new Stat();
3284       ZKUtil.getDataNoWatch(zkw, rsPath, stat);
3285       if (maxZxid < stat.getCzxid()) {
3286         maxZxid = stat.getCzxid();
3287         result = failedServer;
3288       }
3289     }
3290     return result;
3291   }
3292 
3293   public CoprocessorServiceResponse execRegionServerService(
3294       @SuppressWarnings("UnusedParameters") final RpcController controller,
3295       final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3296     try {
3297       ServerRpcController serviceController = new ServerRpcController();
3298       CoprocessorServiceCall call = serviceRequest.getCall();
3299       String serviceName = call.getServiceName();
3300       String methodName = call.getMethodName();
3301       if (!coprocessorServiceHandlers.containsKey(serviceName)) {
3302         throw new UnknownProtocolException(null,
3303             "No registered coprocessor service found for name " + serviceName);
3304       }
3305       Service service = coprocessorServiceHandlers.get(serviceName);
3306       Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
3307       Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3308       if (methodDesc == null) {
3309         throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName
3310             + " called on service " + serviceName);
3311       }
3312       Message.Builder builderForType = service.getRequestPrototype(methodDesc).newBuilderForType();
3313       ProtobufUtil.mergeFrom(builderForType, call.getRequest());
3314       Message request = builderForType.build();
3315       final Message.Builder responseBuilder =
3316           service.getResponsePrototype(methodDesc).newBuilderForType();
3317       service.callMethod(methodDesc, serviceController, request, new RpcCallback<Message>() {
3318         @Override
3319         public void run(Message message) {
3320           if (message != null) {
3321             responseBuilder.mergeFrom(message);
3322           }
3323         }
3324       });
3325       IOException exception = ResponseConverter.getControllerException(serviceController);
3326       if (exception != null) {
3327         throw exception;
3328       }
3329       Message execResult = responseBuilder.build();
3330       ClientProtos.CoprocessorServiceResponse.Builder builder =
3331           ClientProtos.CoprocessorServiceResponse.newBuilder();
3332       builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
3333         HConstants.EMPTY_BYTE_ARRAY));
3334       builder.setValue(builder.getValueBuilder().setName(execResult.getClass().getName())
3335           .setValue(execResult.toByteString()));
3336       return builder.build();
3337     } catch (IOException ie) {
3338       throw new ServiceException(ie);
3339     }
3340   }
3341 
3342   /**
3343    * @return The cache config instance used by the regionserver.
3344    */
3345   public CacheConfig getCacheConfig() {
3346     return this.cacheConfig;
3347   }
3348 
3349   /**
3350    * @return : Returns the ConfigurationManager object for testing purposes.
3351    */
3352   protected ConfigurationManager getConfigurationManager() {
3353     return configurationManager;
3354   }
3355 
3356   /**
3357    * @return Return table descriptors implementation.
3358    */
3359   public TableDescriptors getTableDescriptors() {
3360     return this.tableDescriptors;
3361   }
3362 
3363   /**
3364    * Reload the configuration from disk.
3365    */
3366   public void updateConfiguration() {
3367     LOG.info("Reloading the configuration from disk.");
3368     // Reload the configuration from disk.
3369     conf.reloadConfiguration();
3370     configurationManager.notifyAllObservers(conf);
3371   }
3372 
3373   @Override
3374   public HeapMemoryManager getHeapMemoryManager() {
3375     return hMemManager;
3376   }
3377 
3378   @Override
3379   public double getCompactionPressure() {
3380     double max = 0;
3381     for (Region region : onlineRegions.values()) {
3382       for (Store store : region.getStores()) {
3383         double normCount = store.getCompactionPressure();
3384         if (normCount > max) {
3385           max = normCount;
3386         }
3387       }
3388     }
3389     return max;
3390   }
3391 
3392   /**
3393    * For testing
3394    * @return whether all wal roll request finished for this regionserver
3395    */
3396   @VisibleForTesting
3397   public boolean walRollRequestFinished() {
3398     return this.walRoller.walRollFinished();
3399   }
3400 }