View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.lang.Thread.UncaughtExceptionHandler;
24  import java.lang.management.ManagementFactory;
25  import java.lang.management.MemoryUsage;
26  import java.lang.reflect.Constructor;
27  import java.net.BindException;
28  import java.net.InetAddress;
29  import java.net.InetSocketAddress;
30  import java.util.ArrayList;
31  import java.util.Collection;
32  import java.util.Collections;
33  import java.util.Comparator;
34  import java.util.HashMap;
35  import java.util.HashSet;
36  import java.util.Iterator;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.Map.Entry;
40  import java.util.Set;
41  import java.util.SortedMap;
42  import java.util.TreeMap;
43  import java.util.TreeSet;
44  import java.util.concurrent.ConcurrentHashMap;
45  import java.util.concurrent.ConcurrentMap;
46  import java.util.concurrent.ConcurrentSkipListMap;
47  import java.util.concurrent.atomic.AtomicBoolean;
48  import java.util.concurrent.atomic.AtomicReference;
49  import java.util.concurrent.locks.ReentrantReadWriteLock;
50  
51  import javax.management.MalformedObjectNameException;
52  import javax.management.ObjectName;
53  import javax.servlet.http.HttpServlet;
54  
55  import org.apache.commons.lang.SystemUtils;
56  import org.apache.commons.lang.math.RandomUtils;
57  import org.apache.commons.logging.Log;
58  import org.apache.commons.logging.LogFactory;
59  import org.apache.hadoop.conf.Configuration;
60  import org.apache.hadoop.fs.FileSystem;
61  import org.apache.hadoop.fs.Path;
62  import org.apache.hadoop.hbase.ChoreService;
63  import org.apache.hadoop.hbase.ClockOutOfSyncException;
64  import org.apache.hadoop.hbase.CoordinatedStateManager;
65  import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
66  import org.apache.hadoop.hbase.HBaseConfiguration;
67  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
68  import org.apache.hadoop.hbase.HConstants;
69  import org.apache.hadoop.hbase.HRegionInfo;
70  import org.apache.hadoop.hbase.HealthCheckChore;
71  import org.apache.hadoop.hbase.MetaTableAccessor;
72  import org.apache.hadoop.hbase.NotServingRegionException;
73  import org.apache.hadoop.hbase.RemoteExceptionHandler;
74  import org.apache.hadoop.hbase.ScheduledChore;
75  import org.apache.hadoop.hbase.ServerName;
76  import org.apache.hadoop.hbase.Stoppable;
77  import org.apache.hadoop.hbase.TableDescriptors;
78  import org.apache.hadoop.hbase.TableName;
79  import org.apache.hadoop.hbase.YouAreDeadException;
80  import org.apache.hadoop.hbase.ZNodeClearer;
81  import org.apache.hadoop.hbase.classification.InterfaceAudience;
82  import org.apache.hadoop.hbase.client.ClusterConnection;
83  import org.apache.hadoop.hbase.client.ConnectionUtils;
84  import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
85  import org.apache.hadoop.hbase.conf.ConfigurationManager;
86  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
87  import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
88  import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
89  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
90  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
91  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
92  import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
93  import org.apache.hadoop.hbase.executor.ExecutorService;
94  import org.apache.hadoop.hbase.executor.ExecutorType;
95  import org.apache.hadoop.hbase.fs.HFileSystem;
96  import org.apache.hadoop.hbase.http.InfoServer;
97  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
98  import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
99  import org.apache.hadoop.hbase.ipc.RpcClient;
100 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
101 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
102 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
103 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
104 import org.apache.hadoop.hbase.ipc.ServerRpcController;
105 import org.apache.hadoop.hbase.master.HMaster;
106 import org.apache.hadoop.hbase.master.RegionState.State;
107 import org.apache.hadoop.hbase.master.TableLockManager;
108 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
109 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
110 import org.apache.hadoop.hbase.protobuf.RequestConverter;
111 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
112 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
113 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
114 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
115 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
116 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
117 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
118 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
119 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
120 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor.Builder;
121 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
122 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
123 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
124 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
125 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
126 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
127 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
128 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
129 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
130 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
131 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
132 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
133 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
134 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
135 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
136 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
137 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
138 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
139 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
140 import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
141 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
142 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
143 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
144 import org.apache.hadoop.hbase.security.Superusers;
145 import org.apache.hadoop.hbase.security.UserProvider;
146 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
147 import org.apache.hadoop.hbase.util.Addressing;
148 import org.apache.hadoop.hbase.util.ByteStringer;
149 import org.apache.hadoop.hbase.util.Bytes;
150 import org.apache.hadoop.hbase.util.CompressionTest;
151 import org.apache.hadoop.hbase.util.ConfigUtil;
152 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
153 import org.apache.hadoop.hbase.util.FSTableDescriptors;
154 import org.apache.hadoop.hbase.util.FSUtils;
155 import org.apache.hadoop.hbase.util.HasThread;
156 import org.apache.hadoop.hbase.util.JSONBean;
157 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
158 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
159 import org.apache.hadoop.hbase.util.Sleeper;
160 import org.apache.hadoop.hbase.util.Threads;
161 import org.apache.hadoop.hbase.util.VersionInfo;
162 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
163 import org.apache.hadoop.hbase.wal.WAL;
164 import org.apache.hadoop.hbase.wal.WALFactory;
165 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
166 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
167 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
168 import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
169 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
170 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
171 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
172 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
173 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
174 import org.apache.hadoop.ipc.RemoteException;
175 import org.apache.hadoop.metrics.util.MBeanUtil;
176 import org.apache.hadoop.util.ReflectionUtils;
177 import org.apache.hadoop.util.StringUtils;
178 import org.apache.zookeeper.KeeperException;
179 import org.apache.zookeeper.KeeperException.NoNodeException;
180 import org.apache.zookeeper.data.Stat;
181 
182 import com.google.common.annotations.VisibleForTesting;
183 import com.google.common.base.Preconditions;
184 import com.google.common.collect.Maps;
185 import com.google.protobuf.BlockingRpcChannel;
186 import com.google.protobuf.Descriptors;
187 import com.google.protobuf.Message;
188 import com.google.protobuf.RpcCallback;
189 import com.google.protobuf.RpcController;
190 import com.google.protobuf.Service;
191 import com.google.protobuf.ServiceException;
192 import sun.misc.Signal;
193 import sun.misc.SignalHandler;
194 
195 /**
196  * HRegionServer makes a set of HRegions available to clients. It checks in with
197  * the HMaster. There are many HRegionServers in a single HBase deployment.
198  */
199 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
200 @SuppressWarnings("deprecation")
201 public class HRegionServer extends HasThread implements
202     RegionServerServices, LastSequenceId {
203 
204   private static final Log LOG = LogFactory.getLog(HRegionServer.class);
205 
206   /*
207    * Strings to be used in forming the exception message for
208    * RegionsAlreadyInTransitionException.
209    */
210   protected static final String OPEN = "OPEN";
211   protected static final String CLOSE = "CLOSE";
212 
213   //RegionName vs current action in progress
214   //true - if open region action in progress
215   //false - if close region action in progress
216   protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
217     new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
218 
219   // Cache flushing
220   protected MemStoreFlusher cacheFlusher;
221 
222   protected HeapMemoryManager hMemManager;
223 
224   /**
225    * Cluster connection to be shared by services.
226    * Initialized at server startup and closed when server shuts down.
227    * Clients must never close it explicitly.
228    */
229   protected ClusterConnection clusterConnection;
230 
231   /*
232    * Long-living meta table locator, which is created when the server is started and stopped
233    * when server shuts down. References to this locator shall be used to perform according
234    * operations in EventHandlers. Primary reason for this decision is to make it mockable
235    * for tests.
236    */
237   protected MetaTableLocator metaTableLocator;
238 
239   // Watch if a region is out of recovering state from ZooKeeper
240   @SuppressWarnings("unused")
241   private RecoveringRegionWatcher recoveringRegionWatcher;
242 
243   /**
244    * Go here to get table descriptors.
245    */
246   protected TableDescriptors tableDescriptors;
247 
248   // Replication services. If no replication, this handler will be null.
249   protected ReplicationSourceService replicationSourceHandler;
250   protected ReplicationSinkService replicationSinkHandler;
251 
252   // Compactions
253   public CompactSplitThread compactSplitThread;
254 
255   /**
256    * Map of regions currently being served by this region server. Key is the
257    * encoded region name.  All access should be synchronized.
258    */
259   protected final Map<String, Region> onlineRegions = new ConcurrentHashMap<String, Region>();
260 
261   /**
262    * Map of encoded region names to the DataNode locations they should be hosted on
263    * We store the value as InetSocketAddress since this is used only in HDFS
264    * API (create() that takes favored nodes as hints for placing file blocks).
265    * We could have used ServerName here as the value class, but we'd need to
266    * convert it to InetSocketAddress at some point before the HDFS API call, and
267    * it seems a bit weird to store ServerName since ServerName refers to RegionServers
268    * and here we really mean DataNode locations.
269    */
270   protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
271       new ConcurrentHashMap<String, InetSocketAddress[]>();
272 
273   /**
274    * Set of regions currently being in recovering state which means it can accept writes(edits from
275    * previous failed region server) but not reads. A recovering region is also an online region.
276    */
277   protected final Map<String, Region> recoveringRegions = Collections
278       .synchronizedMap(new HashMap<String, Region>());
279 
280   // Leases
281   protected Leases leases;
282 
283   // Instance of the hbase executor service.
284   protected ExecutorService service;
285 
286   // If false, the file system has become unavailable
287   protected volatile boolean fsOk;
288   protected HFileSystem fs;
289 
290   // Set when a report to the master comes back with a message asking us to
291   // shutdown. Also set by call to stop when debugging or running unit tests
292   // of HRegionServer in isolation.
293   private volatile boolean stopped = false;
294 
295   // Go down hard. Used if file system becomes unavailable and also in
296   // debugging and unit tests.
297   private volatile boolean abortRequested;
298 
299   ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
300 
301   // A state before we go into stopped state.  At this stage we're closing user
302   // space regions.
303   private boolean stopping = false;
304 
305   private volatile boolean killed = false;
306 
307   protected final Configuration conf;
308 
309   private Path rootDir;
310 
311   protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
312 
313   final int numRetries;
314   protected final int threadWakeFrequency;
315   protected final int msgInterval;
316 
317   protected final int numRegionsToReport;
318 
319   // Stub to do region server status calls against the master.
320   private volatile RegionServerStatusService.BlockingInterface rssStub;
321   // RPC client. Used to make the stub above that does region server status checking.
322   RpcClient rpcClient;
323 
324   private RpcRetryingCallerFactory rpcRetryingCallerFactory;
325   private RpcControllerFactory rpcControllerFactory;
326 
327   private UncaughtExceptionHandler uncaughtExceptionHandler;
328 
329   // Info server. Default access so can be used by unit tests. REGIONSERVER
330   // is name of the webapp and the attribute name used stuffing this instance
331   // into web context.
332   protected InfoServer infoServer;
333   private JvmPauseMonitor pauseMonitor;
334 
335   /** region server process name */
336   public static final String REGIONSERVER = "regionserver";
337 
338   MetricsRegionServer metricsRegionServer;
339   private SpanReceiverHost spanReceiverHost;
340 
341   /**
342    * ChoreService used to schedule tasks that we want to run periodically
343    */
344   private final ChoreService choreService;
345 
346   /*
347    * Check for compactions requests.
348    */
349   ScheduledChore compactionChecker;
350 
351   /*
352    * Check for flushes
353    */
354   ScheduledChore periodicFlusher;
355 
356   protected volatile WALFactory walFactory;
357 
358   // WAL roller. log is protected rather than private to avoid
359   // eclipse warning when accessed by inner classes
360   final LogRoller walRoller;
361   // Lazily initialized if this RegionServer hosts a meta table.
362   final AtomicReference<LogRoller> metawalRoller = new AtomicReference<LogRoller>();
363 
364   // flag set after we're done setting up server threads
365   final AtomicBoolean online = new AtomicBoolean(false);
366 
367   // zookeeper connection and watcher
368   protected ZooKeeperWatcher zooKeeper;
369 
370   // master address tracker
371   private MasterAddressTracker masterAddressTracker;
372 
373   // Cluster Status Tracker
374   protected ClusterStatusTracker clusterStatusTracker;
375 
376   // Log Splitting Worker
377   private SplitLogWorker splitLogWorker;
378 
379   // A sleeper that sleeps for msgInterval.
380   protected final Sleeper sleeper;
381 
382   private final int operationTimeout;
383   private final int shortOperationTimeout;
384 
385   private final RegionServerAccounting regionServerAccounting;
386 
387   // Cache configuration and block cache reference
388   protected CacheConfig cacheConfig;
389 
390   /** The health check chore. */
391   private HealthCheckChore healthCheckChore;
392 
393   /** The nonce manager chore. */
394   private ScheduledChore nonceManagerChore;
395 
396   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
397 
398   /**
399    * The server name the Master sees us as.  Its made from the hostname the
400    * master passes us, port, and server startcode. Gets set after registration
401    * against  Master.
402    */
403   protected ServerName serverName;
404 
405   /*
406    * hostname specified by hostname config
407    */
408   protected String useThisHostnameInstead;
409 
410   // key to the config parameter of server hostname
411   // the specification of server hostname is optional. The hostname should be resolvable from
412   // both master and region server
413   @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
414   final static String RS_HOSTNAME_KEY = "hbase.regionserver.hostname";
415   @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
416   protected final static String MASTER_HOSTNAME_KEY = "hbase.master.hostname";
417 
418   /**
419    * This servers startcode.
420    */
421   protected final long startcode;
422 
423   /**
424    * Unique identifier for the cluster we are a part of.
425    */
426   private String clusterId;
427 
428   /**
429    * MX Bean for RegionServerInfo
430    */
431   private ObjectName mxBean = null;
432 
433   /**
434    * Chore to clean periodically the moved region list
435    */
436   private MovedRegionsCleaner movedRegionsCleaner;
437 
438   // chore for refreshing store files for secondary regions
439   private StorefileRefresherChore storefileRefresher;
440 
441   private RegionServerCoprocessorHost rsHost;
442 
443   private RegionServerProcedureManagerHost rspmHost;
444   
445   private RegionServerQuotaManager rsQuotaManager;
446 
447   // Table level lock manager for locking for region operations
448   protected TableLockManager tableLockManager;
449 
450   /**
451    * Nonce manager. Nonces are used to make operations like increment and append idempotent
452    * in the case where client doesn't receive the response from a successful operation and
453    * retries. We track the successful ops for some time via a nonce sent by client and handle
454    * duplicate operations (currently, by failing them; in future we might use MVCC to return
455    * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
456    * HBASE-3787) are:
457    * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
458    *   of past records. If we don't read the records, we don't read and recover the nonces.
459    *   Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
460    * - There's no WAL recovery during normal region move, so nonces will not be transfered.
461    * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
462    * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
463    * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
464    * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
465    * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
466    * latest nonce in it expired. It can also be recovered during move.
467    */
468   final ServerNonceManager nonceManager;
469 
470   private UserProvider userProvider;
471 
472   protected final RSRpcServices rpcServices;
473 
474   protected BaseCoordinatedStateManager csm;
475 
476   private final boolean useZKForAssignment;
477 
478   /**
479    * Configuration manager is used to register/deregister and notify the configuration observers
480    * when the regionserver is notified that there was a change in the on disk configs.
481    */
482   protected final ConfigurationManager configurationManager;
483 
484   /**
485    * Starts a HRegionServer at the default location.
486    */
487   public HRegionServer(Configuration conf) throws IOException, InterruptedException {
488     this(conf, CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
489   }
490 
491   /**
492    * Starts a HRegionServer at the default location
493    * @param csm implementation of CoordinatedStateManager to be used
494    */
495   public HRegionServer(Configuration conf, CoordinatedStateManager csm)
496       throws IOException, InterruptedException {
497     super("RegionServer");  // thread name
498     this.fsOk = true;
499     this.conf = conf;
500     checkCodecs(this.conf);
501     this.userProvider = UserProvider.instantiate(conf);
502     FSUtils.setupShortCircuitRead(this.conf);
503     // Disable usage of meta replicas in the regionserver
504     this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
505 
506     // Config'ed params
507     this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
508         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
509     this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
510     this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
511 
512     this.sleeper = new Sleeper(this.msgInterval, this);
513 
514     boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
515     this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
516 
517     this.numRegionsToReport = conf.getInt(
518       "hbase.regionserver.numregionstoreport", 10);
519 
520     this.operationTimeout = conf.getInt(
521       HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
522       HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
523 
524     this.shortOperationTimeout = conf.getInt(
525       HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
526       HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
527 
528     this.abortRequested = false;
529     this.stopped = false;
530 
531     rpcServices = createRpcServices();
532     this.startcode = System.currentTimeMillis();
533     if (this instanceof HMaster) {
534       useThisHostnameInstead = conf.get(MASTER_HOSTNAME_KEY);
535     } else {
536       useThisHostnameInstead = conf.get(RS_HOSTNAME_KEY);
537     }
538     String hostName = shouldUseThisHostnameInstead() ? useThisHostnameInstead :
539       rpcServices.isa.getHostName();
540     serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode);
541 
542     rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
543     rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
544 
545     // login the zookeeper client principal (if using security)
546     ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
547       HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
548     // login the server principal (if using secure Hadoop)
549     login(userProvider, hostName);
550     // init superusers and add the server principal (if using security)
551     // or process owner as default super user.
552     Superusers.initialize(conf);
553 
554     regionServerAccounting = new RegionServerAccounting();
555     uncaughtExceptionHandler = new UncaughtExceptionHandler() {
556       @Override
557       public void uncaughtException(Thread t, Throwable e) {
558         abort("Uncaught exception in service thread " + t.getName(), e);
559       }
560     };
561 
562     useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
563 
564     initializeFileSystem();
565 
566     service = new ExecutorService(getServerName().toShortString());
567     spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
568 
569     // Some unit tests don't need a cluster, so no zookeeper at all
570     if (!conf.getBoolean("hbase.testing.nocluster", false)) {
571       // Open connection to zookeeper and set primary watcher
572       zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
573         rpcServices.isa.getPort(), this, canCreateBaseZNode());
574 
575       this.csm = (BaseCoordinatedStateManager) csm;
576       this.csm.initialize(this);
577       this.csm.start();
578 
579       tableLockManager = TableLockManager.createTableLockManager(
580         conf, zooKeeper, serverName);
581 
582       masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
583       masterAddressTracker.start();
584 
585       clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
586       clusterStatusTracker.start();
587     }
588     this.configurationManager = new ConfigurationManager();
589 
590     rpcServices.start();
591     putUpWebUI();
592     this.walRoller = new LogRoller(this, this);
593     this.choreService = new ChoreService(getServerName().toString(), true);
594 
595     if (!SystemUtils.IS_OS_WINDOWS) {
596       Signal.handle(new Signal("HUP"), new SignalHandler() {
597         public void handle(Signal signal) {
598           getConfiguration().reloadConfiguration();
599           configurationManager.notifyAllObservers(getConfiguration());
600         }
601       });
602     }
603   }
604 
605   private void initializeFileSystem() throws IOException {
606     // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
607     // underlying hadoop hdfs accessors will be going against wrong filesystem
608     // (unless all is set to defaults).
609     FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
610     // Get fs instance used by this RS.  Do we use checksum verification in the hbase? If hbase
611     // checksum verification enabled, then automatically switch off hdfs checksum verification.
612     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
613     this.fs = new HFileSystem(this.conf, useHBaseChecksum);
614     this.rootDir = FSUtils.getRootDir(this.conf);
615     this.tableDescriptors = new FSTableDescriptors(
616       this.conf, this.fs, this.rootDir, !canUpdateTableDescriptor(), false);
617   }
618 
619   /*
620    * Returns true if configured hostname should be used
621    */
622   protected boolean shouldUseThisHostnameInstead() {
623     return useThisHostnameInstead != null && !useThisHostnameInstead.isEmpty();
624   }
625 
626   protected void login(UserProvider user, String host) throws IOException {
627     user.login("hbase.regionserver.keytab.file",
628       "hbase.regionserver.kerberos.principal", host);
629   }
630 
631   protected void waitForMasterActive(){
632   }
633 
634   protected String getProcessName() {
635     return REGIONSERVER;
636   }
637 
638   protected boolean canCreateBaseZNode() {
639     return false;
640   }
641 
642   protected boolean canUpdateTableDescriptor() {
643     return false;
644   }
645 
646   protected RSRpcServices createRpcServices() throws IOException {
647     return new RSRpcServices(this);
648   }
649 
650   protected void configureInfoServer() {
651     infoServer.addServlet("rs-status", "/rs-status", RSStatusServlet.class);
652     infoServer.setAttribute(REGIONSERVER, this);
653   }
654 
655   protected Class<? extends HttpServlet> getDumpServlet() {
656     return RSDumpServlet.class;
657   }
658 
659   @Override
660   public boolean registerService(Service instance) {
661     /*
662      * No stacking of instances is allowed for a single service name
663      */
664     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
665     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
666       LOG.error("Coprocessor service " + serviceDesc.getFullName()
667           + " already registered, rejecting request from " + instance);
668       return false;
669     }
670 
671     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
672     if (LOG.isDebugEnabled()) {
673       LOG.debug("Registered regionserver coprocessor service: service="+serviceDesc.getFullName());
674     }
675     return true;
676   }
677 
678   /**
679    * Create a 'smarter' HConnection, one that is capable of by-passing RPC if the request is to
680    * the local server. Safe to use going to local or remote server.
681    * Create this instance in a method can be intercepted and mocked in tests.
682    * @throws IOException
683    */
684   @VisibleForTesting
685   protected ClusterConnection createClusterConnection() throws IOException {
686     // Create a cluster connection that when appropriate, can short-circuit and go directly to the
687     // local server if the request is to the local server bypassing RPC. Can be used for both local
688     // and remote invocations.
689     return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(),
690       serverName, rpcServices, rpcServices);
691   }
692 
693   /**
694    * Run test on configured codecs to make sure supporting libs are in place.
695    * @param c
696    * @throws IOException
697    */
698   private static void checkCodecs(final Configuration c) throws IOException {
699     // check to see if the codec list is available:
700     String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
701     if (codecs == null) return;
702     for (String codec : codecs) {
703       if (!CompressionTest.testCompression(codec)) {
704         throw new IOException("Compression codec " + codec +
705           " not supported, aborting RS construction");
706       }
707     }
708   }
709 
710   public String getClusterId() {
711     return this.clusterId;
712   }
713 
714   /**
715    * Setup our cluster connection if not already initialized.
716    * @throws IOException
717    */
718   protected synchronized void setupClusterConnection() throws IOException {
719     if (clusterConnection == null) {
720       clusterConnection = createClusterConnection();
721       metaTableLocator = new MetaTableLocator();
722     }
723   }
724 
725   /**
726    * All initialization needed before we go register with Master.
727    *
728    * @throws IOException
729    * @throws InterruptedException
730    */
731   private void preRegistrationInitialization(){
732     try {
733       setupClusterConnection();
734 
735       // Health checker thread.
736       if (isHealthCheckerConfigured()) {
737         int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
738           HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
739         healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
740       }
741       this.pauseMonitor = new JvmPauseMonitor(conf);
742       pauseMonitor.start();
743 
744       initializeZooKeeper();
745       if (!isStopped() && !isAborted()) {
746         initializeThreads();
747       }
748     } catch (Throwable t) {
749       // Call stop if error or process will stick around for ever since server
750       // puts up non-daemon threads.
751       this.rpcServices.stop();
752       abort("Initialization of RS failed.  Hence aborting RS.", t);
753     }
754   }
755 
756   /**
757    * Bring up connection to zk ensemble and then wait until a master for this
758    * cluster and then after that, wait until cluster 'up' flag has been set.
759    * This is the order in which master does things.
760    * Finally open long-living server short-circuit connection.
761    * @throws IOException
762    * @throws InterruptedException
763    */
764   private void initializeZooKeeper() throws IOException, InterruptedException {
765     // Create the master address tracker, register with zk, and start it.  Then
766     // block until a master is available.  No point in starting up if no master
767     // running.
768     blockAndCheckIfStopped(this.masterAddressTracker);
769 
770     // Wait on cluster being up.  Master will set this flag up in zookeeper
771     // when ready.
772     blockAndCheckIfStopped(this.clusterStatusTracker);
773 
774     // Retrieve clusterId
775     // Since cluster status is now up
776     // ID should have already been set by HMaster
777     try {
778       clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
779       if (clusterId == null) {
780         this.abort("Cluster ID has not been set");
781       }
782       LOG.info("ClusterId : "+clusterId);
783     } catch (KeeperException e) {
784       this.abort("Failed to retrieve Cluster ID",e);
785     }
786 
787     // In case colocated master, wait here till it's active.
788     // So backup masters won't start as regionservers.
789     // This is to avoid showing backup masters as regionservers
790     // in master web UI, or assigning any region to them.
791     waitForMasterActive();
792     if (isStopped() || isAborted()) {
793       return; // No need for further initialization
794     }
795 
796     // watch for snapshots and other procedures
797     try {
798       rspmHost = new RegionServerProcedureManagerHost();
799       rspmHost.loadProcedures(conf);
800       rspmHost.initialize(this);
801     } catch (KeeperException e) {
802       this.abort("Failed to reach zk cluster when creating procedure handler.", e);
803     }
804     // register watcher for recovering regions
805     this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
806   }
807 
808   /**
809    * Utilty method to wait indefinitely on a znode availability while checking
810    * if the region server is shut down
811    * @param tracker znode tracker to use
812    * @throws IOException any IO exception, plus if the RS is stopped
813    * @throws InterruptedException
814    */
815   private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker)
816       throws IOException, InterruptedException {
817     while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
818       if (this.stopped) {
819         throw new IOException("Received the shutdown message while waiting.");
820       }
821     }
822   }
823 
824   /**
825    * @return False if cluster shutdown in progress
826    */
827   private boolean isClusterUp() {
828     return clusterStatusTracker != null && clusterStatusTracker.isClusterUp();
829   }
830 
831   private void initializeThreads() throws IOException {
832     // Cache flushing thread.
833     this.cacheFlusher = new MemStoreFlusher(conf, this);
834 
835     // Compaction thread
836     this.compactSplitThread = new CompactSplitThread(this);
837 
838     // Background thread to check for compactions; needed if region has not gotten updates
839     // in a while. It will take care of not checking too frequently on store-by-store basis.
840     this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
841     this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
842     this.leases = new Leases(this.threadWakeFrequency);
843 
844     // Create the thread to clean the moved regions list
845     movedRegionsCleaner = MovedRegionsCleaner.create(this);
846 
847     if (this.nonceManager != null) {
848       // Create the scheduled chore that cleans up nonces.
849       nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
850     }
851 
852     // Setup the Quota Manager
853     rsQuotaManager = new RegionServerQuotaManager(this);
854     
855     // Setup RPC client for master communication
856     rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
857         rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
858 
859     boolean onlyMetaRefresh = false;
860     int storefileRefreshPeriod = conf.getInt(
861         StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
862       , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
863     if (storefileRefreshPeriod == 0) {
864       storefileRefreshPeriod = conf.getInt(
865           StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
866           StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
867       onlyMetaRefresh = true;
868     }
869     if (storefileRefreshPeriod > 0) {
870       this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
871           onlyMetaRefresh, this, this);
872     }
873     registerConfigurationObservers();
874   }
875 
876   private void registerConfigurationObservers() {
877     // Registering the compactSplitThread object with the ConfigurationManager.
878     configurationManager.registerObserver(this.compactSplitThread);
879     configurationManager.registerObserver(this.rpcServices);
880   }
881 
882   /**
883    * The HRegionServer sticks in this loop until closed.
884    */
885   @Override
886   public void run() {
887     try {
888       // Do pre-registration initializations; zookeeper, lease threads, etc.
889       preRegistrationInitialization();
890     } catch (Throwable e) {
891       abort("Fatal exception during initialization", e);
892     }
893 
894     try {
895       if (!isStopped() && !isAborted()) {
896         ShutdownHook.install(conf, fs, this, Thread.currentThread());
897         // Set our ephemeral znode up in zookeeper now we have a name.
898         createMyEphemeralNode();
899         // Initialize the RegionServerCoprocessorHost now that our ephemeral
900         // node was created, in case any coprocessors want to use ZooKeeper
901         this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
902       }
903 
904       // Try and register with the Master; tell it we are here.  Break if
905       // server is stopped or the clusterup flag is down or hdfs went wacky.
906       while (keepLooping()) {
907         RegionServerStartupResponse w = reportForDuty();
908         if (w == null) {
909           LOG.warn("reportForDuty failed; sleeping and then retrying.");
910           this.sleeper.sleep();
911         } else {
912           handleReportForDutyResponse(w);
913           break;
914         }
915       }
916 
917       if (!isStopped() && isHealthy()){
918         // start the snapshot handler and other procedure handlers,
919         // since the server is ready to run
920         rspmHost.start();
921       }
922       
923       // Start the Quota Manager
924       if (this.rsQuotaManager != null) {
925         rsQuotaManager.start(getRpcServer().getScheduler());
926       }
927 
928       // We registered with the Master.  Go into run mode.
929       long lastMsg = System.currentTimeMillis();
930       long oldRequestCount = -1;
931       // The main run loop.
932       while (!isStopped() && isHealthy()) {
933         if (!isClusterUp()) {
934           if (isOnlineRegionsEmpty()) {
935             stop("Exiting; cluster shutdown set and not carrying any regions");
936           } else if (!this.stopping) {
937             this.stopping = true;
938             LOG.info("Closing user regions");
939             closeUserRegions(this.abortRequested);
940           } else if (this.stopping) {
941             boolean allUserRegionsOffline = areAllUserRegionsOffline();
942             if (allUserRegionsOffline) {
943               // Set stopped if no more write requests tp meta tables
944               // since last time we went around the loop.  Any open
945               // meta regions will be closed on our way out.
946               if (oldRequestCount == getWriteRequestCount()) {
947                 stop("Stopped; only catalog regions remaining online");
948                 break;
949               }
950               oldRequestCount = getWriteRequestCount();
951             } else {
952               // Make sure all regions have been closed -- some regions may
953               // have not got it because we were splitting at the time of
954               // the call to closeUserRegions.
955               closeUserRegions(this.abortRequested);
956             }
957             LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
958           }
959         }
960         long now = System.currentTimeMillis();
961         if ((now - lastMsg) >= msgInterval) {
962           tryRegionServerReport(lastMsg, now);
963           lastMsg = System.currentTimeMillis();
964         }
965         if (!isStopped() && !isAborted()) {
966           this.sleeper.sleep();
967         }
968       } // for
969     } catch (Throwable t) {
970       if (!rpcServices.checkOOME(t)) {
971         String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
972         abort(prefix + t.getMessage(), t);
973       }
974     }
975     // Run shutdown.
976     if (mxBean != null) {
977       MBeanUtil.unregisterMBean(mxBean);
978       mxBean = null;
979     }
980     if (this.leases != null) this.leases.closeAfterLeasesExpire();
981     if (this.splitLogWorker != null) {
982       splitLogWorker.stop();
983     }
984     if (this.infoServer != null) {
985       LOG.info("Stopping infoServer");
986       try {
987         this.infoServer.stop();
988       } catch (Exception e) {
989         LOG.error("Failed to stop infoServer", e);
990       }
991     }
992     // Send cache a shutdown.
993     if (cacheConfig != null && cacheConfig.isBlockCacheEnabled()) {
994       cacheConfig.getBlockCache().shutdown();
995     }
996 
997     if (movedRegionsCleaner != null) {
998       movedRegionsCleaner.stop("Region Server stopping");
999     }
1000 
1001     // Send interrupts to wake up threads if sleeping so they notice shutdown.
1002     // TODO: Should we check they are alive? If OOME could have exited already
1003     if (this.hMemManager != null) this.hMemManager.stop();
1004     if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
1005     if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
1006     if (this.compactionChecker != null) this.compactionChecker.cancel(true);
1007     if (this.healthCheckChore != null) this.healthCheckChore.cancel(true);
1008     if (this.nonceManagerChore != null) this.nonceManagerChore.cancel(true);
1009     if (this.storefileRefresher != null) this.storefileRefresher.cancel(true);
1010     sendShutdownInterrupt();
1011 
1012     // Stop the quota manager
1013     if (rsQuotaManager != null) {
1014       rsQuotaManager.stop();
1015     }
1016     
1017     // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
1018     if (rspmHost != null) {
1019       rspmHost.stop(this.abortRequested || this.killed);
1020     }
1021 
1022     if (this.killed) {
1023       // Just skip out w/o closing regions.  Used when testing.
1024     } else if (abortRequested) {
1025       if (this.fsOk) {
1026         closeUserRegions(abortRequested); // Don't leave any open file handles
1027       }
1028       LOG.info("aborting server " + this.serverName);
1029     } else {
1030       closeUserRegions(abortRequested);
1031       LOG.info("stopping server " + this.serverName);
1032     }
1033 
1034     // so callers waiting for meta without timeout can stop
1035     if (this.metaTableLocator != null) this.metaTableLocator.stop();
1036     if (this.clusterConnection != null && !clusterConnection.isClosed()) {
1037       try {
1038         this.clusterConnection.close();
1039       } catch (IOException e) {
1040         // Although the {@link Closeable} interface throws an {@link
1041         // IOException}, in reality, the implementation would never do that.
1042         LOG.warn("Attempt to close server's short circuit HConnection failed.", e);
1043       }
1044     }
1045 
1046     // Closing the compactSplit thread before closing meta regions
1047     if (!this.killed && containsMetaTableRegions()) {
1048       if (!abortRequested || this.fsOk) {
1049         if (this.compactSplitThread != null) {
1050           this.compactSplitThread.join();
1051           this.compactSplitThread = null;
1052         }
1053         closeMetaTableRegions(abortRequested);
1054       }
1055     }
1056 
1057     if (!this.killed && this.fsOk) {
1058       waitOnAllRegionsToClose(abortRequested);
1059       LOG.info("stopping server " + this.serverName +
1060         "; all regions closed.");
1061     }
1062 
1063     //fsOk flag may be changed when closing regions throws exception.
1064     if (this.fsOk) {
1065       shutdownWAL(!abortRequested);
1066     }
1067 
1068     // Make sure the proxy is down.
1069     if (this.rssStub != null) {
1070       this.rssStub = null;
1071     }
1072     if (this.rpcClient != null) {
1073       this.rpcClient.close();
1074     }
1075     if (this.leases != null) {
1076       this.leases.close();
1077     }
1078     if (this.pauseMonitor != null) {
1079       this.pauseMonitor.stop();
1080     }
1081 
1082     if (!killed) {
1083       stopServiceThreads();
1084     }
1085 
1086     if (this.rpcServices != null) {
1087       this.rpcServices.stop();
1088     }
1089 
1090     try {
1091       deleteMyEphemeralNode();
1092     } catch (KeeperException.NoNodeException nn) {
1093     } catch (KeeperException e) {
1094       LOG.warn("Failed deleting my ephemeral node", e);
1095     }
1096     // We may have failed to delete the znode at the previous step, but
1097     //  we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1098     ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1099 
1100     if (this.zooKeeper != null) {
1101       this.zooKeeper.close();
1102     }
1103     LOG.info("stopping server " + this.serverName +
1104       "; zookeeper connection closed.");
1105 
1106     LOG.info(Thread.currentThread().getName() + " exiting");
1107   }
1108 
1109   private boolean containsMetaTableRegions() {
1110     return onlineRegions.containsKey(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1111   }
1112 
1113   private boolean areAllUserRegionsOffline() {
1114     if (getNumberOfOnlineRegions() > 2) return false;
1115     boolean allUserRegionsOffline = true;
1116     for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
1117       if (!e.getValue().getRegionInfo().isMetaTable()) {
1118         allUserRegionsOffline = false;
1119         break;
1120       }
1121     }
1122     return allUserRegionsOffline;
1123   }
1124 
1125   /**
1126    * @return Current write count for all online regions.
1127    */
1128   private long getWriteRequestCount() {
1129     long writeCount = 0;
1130     for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
1131       writeCount += e.getValue().getWriteRequestsCount();
1132     }
1133     return writeCount;
1134   }
1135 
1136   @VisibleForTesting
1137   protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1138   throws IOException {
1139     RegionServerStatusService.BlockingInterface rss = rssStub;
1140     if (rss == null) {
1141       // the current server could be stopping.
1142       return;
1143     }
1144     ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1145     try {
1146       RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1147       ServerName sn = ServerName.parseVersionedServerName(
1148         this.serverName.getVersionedBytes());
1149       request.setServer(ProtobufUtil.toServerName(sn));
1150       request.setLoad(sl);
1151       rss.regionServerReport(null, request.build());
1152     } catch (ServiceException se) {
1153       IOException ioe = ProtobufUtil.getRemoteException(se);
1154       if (ioe instanceof YouAreDeadException) {
1155         // This will be caught and handled as a fatal error in run()
1156         throw ioe;
1157       }
1158       if (rssStub == rss) {
1159         rssStub = null;
1160       }
1161       // Couldn't connect to the master, get location from zk and reconnect
1162       // Method blocks until new master is found or we are stopped
1163       createRegionServerStatusStub(true);
1164     }
1165   }
1166 
1167   ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1168       throws IOException {
1169     // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1170     // per second, and other metrics  As long as metrics are part of ServerLoad it's best to use
1171     // the wrapper to compute those numbers in one place.
1172     // In the long term most of these should be moved off of ServerLoad and the heart beat.
1173     // Instead they should be stored in an HBase table so that external visibility into HBase is
1174     // improved; Additionally the load balancer will be able to take advantage of a more complete
1175     // history.
1176     MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1177     Collection<Region> regions = getOnlineRegionsLocalContext();
1178     long usedMemory = -1L;
1179     long maxMemory = -1L;
1180     final MemoryUsage usage = HeapMemorySizeUtil.safeGetHeapMemoryUsage();
1181     if (usage != null) {
1182       usedMemory = usage.getUsed();
1183       maxMemory = usage.getMax();
1184     }
1185 
1186     ClusterStatusProtos.ServerLoad.Builder serverLoad =
1187       ClusterStatusProtos.ServerLoad.newBuilder();
1188     serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1189     serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
1190     serverLoad.setUsedHeapMB((int)(usedMemory / 1024 / 1024));
1191     serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
1192     Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1193     Builder coprocessorBuilder = Coprocessor.newBuilder();
1194     for (String coprocessor : coprocessors) {
1195       serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1196     }
1197     RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1198     RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1199     for (Region region : regions) {
1200       if (region.getCoprocessorHost() != null) {
1201         Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1202         Iterator<String> iterator = regionCoprocessors.iterator();
1203         while (iterator.hasNext()) {
1204           serverLoad.addCoprocessors(coprocessorBuilder.setName(iterator.next()).build());
1205         }
1206       }
1207       serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1208       for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1209           .getCoprocessors()) {
1210         serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1211       }
1212     }
1213     serverLoad.setReportStartTime(reportStartTime);
1214     serverLoad.setReportEndTime(reportEndTime);
1215     if (this.infoServer != null) {
1216       serverLoad.setInfoServerPort(this.infoServer.getPort());
1217     } else {
1218       serverLoad.setInfoServerPort(-1);
1219     }
1220 
1221     // for the replicationLoad purpose. Only need to get from one service
1222     // either source or sink will get the same info
1223     ReplicationSourceService rsources = getReplicationSourceService();
1224 
1225     if (rsources != null) {
1226       // always refresh first to get the latest value
1227       ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
1228       if (rLoad != null) {
1229         serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1230         for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) {
1231           serverLoad.addReplLoadSource(rLS);
1232         }
1233       }
1234     }
1235 
1236     return serverLoad.build();
1237   }
1238 
1239   String getOnlineRegionsAsPrintableString() {
1240     StringBuilder sb = new StringBuilder();
1241     for (Region r: this.onlineRegions.values()) {
1242       if (sb.length() > 0) sb.append(", ");
1243       sb.append(r.getRegionInfo().getEncodedName());
1244     }
1245     return sb.toString();
1246   }
1247 
1248   /**
1249    * Wait on regions close.
1250    */
1251   private void waitOnAllRegionsToClose(final boolean abort) {
1252     // Wait till all regions are closed before going out.
1253     int lastCount = -1;
1254     long previousLogTime = 0;
1255     Set<String> closedRegions = new HashSet<String>();
1256     boolean interrupted = false;
1257     try {
1258       while (!isOnlineRegionsEmpty()) {
1259         int count = getNumberOfOnlineRegions();
1260         // Only print a message if the count of regions has changed.
1261         if (count != lastCount) {
1262           // Log every second at most
1263           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1264             previousLogTime = System.currentTimeMillis();
1265             lastCount = count;
1266             LOG.info("Waiting on " + count + " regions to close");
1267             // Only print out regions still closing if a small number else will
1268             // swamp the log.
1269             if (count < 10 && LOG.isDebugEnabled()) {
1270               LOG.debug(this.onlineRegions);
1271             }
1272           }
1273         }
1274         // Ensure all user regions have been sent a close. Use this to
1275         // protect against the case where an open comes in after we start the
1276         // iterator of onlineRegions to close all user regions.
1277         for (Map.Entry<String, Region> e : this.onlineRegions.entrySet()) {
1278           HRegionInfo hri = e.getValue().getRegionInfo();
1279           if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1280               && !closedRegions.contains(hri.getEncodedName())) {
1281             closedRegions.add(hri.getEncodedName());
1282             // Don't update zk with this close transition; pass false.
1283             closeRegionIgnoreErrors(hri, abort);
1284               }
1285         }
1286         // No regions in RIT, we could stop waiting now.
1287         if (this.regionsInTransitionInRS.isEmpty()) {
1288           if (!isOnlineRegionsEmpty()) {
1289             LOG.info("We were exiting though online regions are not empty," +
1290                 " because some regions failed closing");
1291           }
1292           break;
1293         }
1294         if (sleep(200)) {
1295           interrupted = true;
1296         }
1297       }
1298     } finally {
1299       if (interrupted) {
1300         Thread.currentThread().interrupt();
1301       }
1302     }
1303   }
1304 
1305   private boolean sleep(long millis) {
1306     boolean interrupted = false;
1307     try {
1308       Thread.sleep(millis);
1309     } catch (InterruptedException e) {
1310       LOG.warn("Interrupted while sleeping");
1311       interrupted = true;
1312     }
1313     return interrupted;
1314   }
1315 
1316   private void shutdownWAL(final boolean close) {
1317     if (this.walFactory != null) {
1318       try {
1319         if (close) {
1320           walFactory.close();
1321         } else {
1322           walFactory.shutdown();
1323         }
1324       } catch (Throwable e) {
1325         e = RemoteExceptionHandler.checkThrowable(e);
1326         LOG.error("Shutdown / close of WAL failed: " + e);
1327         LOG.debug("Shutdown / close exception details:", e);
1328       }
1329     }
1330   }
1331 
1332   /*
1333    * Run init. Sets up wal and starts up all server threads.
1334    *
1335    * @param c Extra configuration.
1336    */
1337   protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1338   throws IOException {
1339     try {
1340       boolean updateRootDir = false;
1341       for (NameStringPair e : c.getMapEntriesList()) {
1342         String key = e.getName();
1343         // The hostname the master sees us as.
1344         if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1345           String hostnameFromMasterPOV = e.getValue();
1346           this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1347             rpcServices.isa.getPort(), this.startcode);
1348           if (shouldUseThisHostnameInstead() &&
1349               !hostnameFromMasterPOV.equals(useThisHostnameInstead)) {
1350             String msg = "Master passed us a different hostname to use; was=" +
1351                 this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV;
1352             LOG.error(msg);
1353             throw new IOException(msg);
1354           }
1355           if (!shouldUseThisHostnameInstead() &&
1356               !hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) {
1357             String msg = "Master passed us a different hostname to use; was=" +
1358                 rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV;
1359             LOG.error(msg);
1360           }
1361           continue;
1362         }
1363 
1364         String value = e.getValue();
1365         if (key.equals(HConstants.HBASE_DIR)) {
1366           if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) {
1367             updateRootDir = true;
1368           }
1369         }
1370 
1371         if (LOG.isDebugEnabled()) {
1372           LOG.debug("Config from master: " + key + "=" + value);
1373         }
1374         this.conf.set(key, value);
1375       }
1376 
1377       if (updateRootDir) {
1378         // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1379         initializeFileSystem();
1380       }
1381 
1382       // hack! Maps DFSClient => RegionServer for logs.  HDFS made this
1383       // config param for task trackers, but we can piggyback off of it.
1384       if (this.conf.get("mapreduce.task.attempt.id") == null) {
1385         this.conf.set("mapreduce.task.attempt.id", "hb_rs_" +
1386           this.serverName.toString());
1387       }
1388 
1389       // Save it in a file, this will allow to see if we crash
1390       ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1391 
1392       this.cacheConfig = new CacheConfig(conf);
1393       this.walFactory = setupWALAndReplication();
1394       // Init in here rather than in constructor after thread name has been set
1395       this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
1396 
1397       startServiceThreads();
1398       startHeapMemoryManager();
1399       LOG.info("Serving as " + this.serverName +
1400         ", RpcServer on " + rpcServices.isa +
1401         ", sessionid=0x" +
1402         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1403 
1404       // Wake up anyone waiting for this server to online
1405       synchronized (online) {
1406         online.set(true);
1407         online.notifyAll();
1408       }
1409     } catch (Throwable e) {
1410       stop("Failed initialization");
1411       throw convertThrowableToIOE(cleanup(e, "Failed init"),
1412           "Region server startup failed");
1413     } finally {
1414       sleeper.skipSleepCycle();
1415     }
1416   }
1417 
1418   private void startHeapMemoryManager() {
1419     this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher,
1420         this, this.regionServerAccounting);
1421     if (this.hMemManager != null) {
1422       this.hMemManager.start(getChoreService());
1423     }
1424   }
1425 
1426   private void createMyEphemeralNode() throws KeeperException, IOException {
1427     RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1428     rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1429     rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1430     byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1431     ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper,
1432       getMyEphemeralNodePath(), data);
1433   }
1434 
1435   private void deleteMyEphemeralNode() throws KeeperException {
1436     ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1437   }
1438 
1439   @Override
1440   public RegionServerAccounting getRegionServerAccounting() {
1441     return regionServerAccounting;
1442   }
1443 
1444   @Override
1445   public TableLockManager getTableLockManager() {
1446     return tableLockManager;
1447   }
1448 
1449   /*
1450    * @param r Region to get RegionLoad for.
1451    * @param regionLoadBldr the RegionLoad.Builder, can be null
1452    * @param regionSpecifier the RegionSpecifier.Builder, can be null
1453    * @return RegionLoad instance.
1454    *
1455    * @throws IOException
1456    */
1457   private RegionLoad createRegionLoad(final Region r, RegionLoad.Builder regionLoadBldr,
1458       RegionSpecifier.Builder regionSpecifier) throws IOException {
1459     byte[] name = r.getRegionInfo().getRegionName();
1460     int stores = 0;
1461     int storefiles = 0;
1462     int storeUncompressedSizeMB = 0;
1463     int storefileSizeMB = 0;
1464     int memstoreSizeMB = (int) (r.getMemstoreSize() / 1024 / 1024);
1465     int storefileIndexSizeMB = 0;
1466     int rootIndexSizeKB = 0;
1467     int totalStaticIndexSizeKB = 0;
1468     int totalStaticBloomSizeKB = 0;
1469     long totalCompactingKVs = 0;
1470     long currentCompactedKVs = 0;
1471     List<Store> storeList = r.getStores();
1472     stores += storeList.size();
1473     for (Store store : storeList) {
1474       storefiles += store.getStorefilesCount();
1475       storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024);
1476       storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1477       storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
1478       CompactionProgress progress = store.getCompactionProgress();
1479       if (progress != null) {
1480         totalCompactingKVs += progress.totalCompactingKVs;
1481         currentCompactedKVs += progress.currentCompactedKVs;
1482       }
1483       rootIndexSizeKB += (int) (store.getStorefilesIndexSize() / 1024);
1484       totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024);
1485       totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024);
1486     }
1487 
1488     float dataLocality =
1489         r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname());
1490     if (regionLoadBldr == null) {
1491       regionLoadBldr = RegionLoad.newBuilder();
1492     }
1493     if (regionSpecifier == null) {
1494       regionSpecifier = RegionSpecifier.newBuilder();
1495     }
1496     regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1497     regionSpecifier.setValue(ByteStringer.wrap(name));
1498     regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1499       .setStores(stores)
1500       .setStorefiles(storefiles)
1501       .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1502       .setStorefileSizeMB(storefileSizeMB)
1503       .setMemstoreSizeMB(memstoreSizeMB)
1504       .setStorefileIndexSizeMB(storefileIndexSizeMB)
1505       .setRootIndexSizeKB(rootIndexSizeKB)
1506       .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1507       .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1508       .setReadRequestsCount(r.getReadRequestsCount())
1509       .setWriteRequestsCount(r.getWriteRequestsCount())
1510       .setTotalCompactingKVs(totalCompactingKVs)
1511       .setCurrentCompactedKVs(currentCompactedKVs)
1512       .setDataLocality(dataLocality)
1513       .setLastMajorCompactionTs(r.getOldestHfileTs(true));
1514     ((HRegion)r).setCompleteSequenceId(regionLoadBldr);
1515 
1516     return regionLoadBldr.build();
1517   }
1518 
1519   /**
1520    * @param encodedRegionName
1521    * @return An instance of RegionLoad.
1522    */
1523   public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1524     Region r = onlineRegions.get(encodedRegionName);
1525     return r != null ? createRegionLoad(r, null, null) : null;
1526   }
1527 
1528   /*
1529    * Inner class that runs on a long period checking if regions need compaction.
1530    */
1531   private static class CompactionChecker extends ScheduledChore {
1532     private final HRegionServer instance;
1533     private final int majorCompactPriority;
1534     private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1535     private long iteration = 0;
1536 
1537     CompactionChecker(final HRegionServer h, final int sleepTime,
1538         final Stoppable stopper) {
1539       super("CompactionChecker", stopper, sleepTime);
1540       this.instance = h;
1541       LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));
1542 
1543       /* MajorCompactPriority is configurable.
1544        * If not set, the compaction will use default priority.
1545        */
1546       this.majorCompactPriority = this.instance.conf.
1547         getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1548         DEFAULT_PRIORITY);
1549     }
1550 
1551     @Override
1552     protected void chore() {
1553       for (Region r : this.instance.onlineRegions.values()) {
1554         if (r == null)
1555           continue;
1556         for (Store s : r.getStores()) {
1557           try {
1558             long multiplier = s.getCompactionCheckMultiplier();
1559             assert multiplier > 0;
1560             if (iteration % multiplier != 0) continue;
1561             if (s.needsCompaction()) {
1562               // Queue a compaction. Will recognize if major is needed.
1563               this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
1564                   + " requests compaction");
1565             } else if (s.isMajorCompaction()) {
1566               if (majorCompactPriority == DEFAULT_PRIORITY
1567                   || majorCompactPriority > ((HRegion)r).getCompactPriority()) {
1568                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1569                     + " requests major compaction; use default priority", null);
1570               } else {
1571                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1572                     + " requests major compaction; use configured priority",
1573                   this.majorCompactPriority, null, null);
1574               }
1575             }
1576           } catch (IOException e) {
1577             LOG.warn("Failed major compaction check on " + r, e);
1578           }
1579         }
1580       }
1581       iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1582     }
1583   }
1584 
1585   static class PeriodicMemstoreFlusher extends ScheduledChore {
1586     final HRegionServer server;
1587     final static int RANGE_OF_DELAY = 5 * 60 * 1000; // 5 min in milliseconds
1588     final static int MIN_DELAY_TIME = 0; // millisec
1589     public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1590       super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval);
1591       this.server = server;
1592     }
1593 
1594     @Override
1595     protected void chore() {
1596       final StringBuffer whyFlush = new StringBuffer();
1597       for (Region r : this.server.onlineRegions.values()) {
1598         if (r == null) continue;
1599         if (((HRegion)r).shouldFlush(whyFlush)) {
1600           FlushRequester requester = server.getFlushRequester();
1601           if (requester != null) {
1602             long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
1603             LOG.info(getName() + " requesting flush of " +
1604               r.getRegionInfo().getRegionNameAsString() + " because " +
1605               whyFlush.toString() +
1606               " after random delay " + randomDelay + "ms");
1607             //Throttle the flushes by putting a delay. If we don't throttle, and there
1608             //is a balanced write-load on the regions in a table, we might end up
1609             //overwhelming the filesystem with too many flushes at once.
1610             requester.requestDelayedFlush(r, randomDelay, false);
1611           }
1612         }
1613       }
1614     }
1615   }
1616 
1617   /**
1618    * Report the status of the server. A server is online once all the startup is
1619    * completed (setting up filesystem, starting service threads, etc.). This
1620    * method is designed mostly to be useful in tests.
1621    *
1622    * @return true if online, false if not.
1623    */
1624   public boolean isOnline() {
1625     return online.get();
1626   }
1627 
1628   /**
1629    * Setup WAL log and replication if enabled.
1630    * Replication setup is done in here because it wants to be hooked up to WAL.
1631    * @return A WAL instance.
1632    * @throws IOException
1633    */
1634   private WALFactory setupWALAndReplication() throws IOException {
1635     // TODO Replication make assumptions here based on the default filesystem impl
1636     final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1637     final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString());
1638 
1639     Path logdir = new Path(rootDir, logName);
1640     if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1641     if (this.fs.exists(logdir)) {
1642       throw new RegionServerRunningException("Region server has already " +
1643         "created directory at " + this.serverName.toString());
1644     }
1645 
1646     // Instantiate replication manager if replication enabled.  Pass it the
1647     // log directories.
1648     createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
1649 
1650     // listeners the wal factory will add to wals it creates.
1651     final List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1652     listeners.add(new MetricsWAL());
1653     if (this.replicationSourceHandler != null &&
1654         this.replicationSourceHandler.getWALActionsListener() != null) {
1655       // Replication handler is an implementation of WALActionsListener.
1656       listeners.add(this.replicationSourceHandler.getWALActionsListener());
1657     }
1658 
1659     return new WALFactory(conf, listeners, serverName.toString());
1660   }
1661 
1662   /**
1663    * We initialize the roller for the wal that handles meta lazily
1664    * since we don't know if this regionserver will handle it. All calls to
1665    * this method return a reference to the that same roller. As newly referenced
1666    * meta regions are brought online, they will be offered to the roller for maintenance.
1667    * As a part of that registration process, the roller will add itself as a
1668    * listener on the wal.
1669    */
1670   protected LogRoller ensureMetaWALRoller() {
1671     // Using a tmp log roller to ensure metaLogRoller is alive once it is not
1672     // null
1673     LogRoller roller = metawalRoller.get();
1674     if (null == roller) {
1675       LogRoller tmpLogRoller = new LogRoller(this, this);
1676       String n = Thread.currentThread().getName();
1677       Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
1678           n + "-MetaLogRoller", uncaughtExceptionHandler);
1679       if (metawalRoller.compareAndSet(null, tmpLogRoller)) {
1680         roller = tmpLogRoller;
1681       } else {
1682         // Another thread won starting the roller
1683         Threads.shutdown(tmpLogRoller.getThread());
1684         roller = metawalRoller.get();
1685       }
1686     }
1687     return roller;
1688   }
1689 
1690   public MetricsRegionServer getRegionServerMetrics() {
1691     return this.metricsRegionServer;
1692   }
1693 
1694   /**
1695    * @return Master address tracker instance.
1696    */
1697   public MasterAddressTracker getMasterAddressTracker() {
1698     return this.masterAddressTracker;
1699   }
1700 
1701   /*
1702    * Start maintenance Threads, Server, Worker and lease checker threads.
1703    * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1704    * get an unhandled exception. We cannot set the handler on all threads.
1705    * Server's internal Listener thread is off limits. For Server, if an OOME, it
1706    * waits a while then retries. Meantime, a flush or a compaction that tries to
1707    * run should trigger same critical condition and the shutdown will run. On
1708    * its way out, this server will shut down Server. Leases are sort of
1709    * inbetween. It has an internal thread that while it inherits from Chore, it
1710    * keeps its own internal stop mechanism so needs to be stopped by this
1711    * hosting server. Worker logs the exception and exits.
1712    */
1713   private void startServiceThreads() throws IOException {
1714     // Start executor services
1715     this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
1716       conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1717     this.service.startExecutorService(ExecutorType.RS_OPEN_META,
1718       conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1719     this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1720       conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1721     this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
1722       conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1723     if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1724       this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
1725         conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
1726     }
1727     this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
1728        "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
1729 
1730     if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1731       this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
1732         conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1733           conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
1734     }
1735 
1736     Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
1737         uncaughtExceptionHandler);
1738     this.cacheFlusher.start(uncaughtExceptionHandler);
1739 
1740     if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);
1741     if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher);
1742     if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore);
1743     if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore);
1744     if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher);
1745     if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner);
1746 
1747     // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1748     // an unhandled exception, it will just exit.
1749     Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker",
1750       uncaughtExceptionHandler);
1751 
1752     if (this.replicationSourceHandler == this.replicationSinkHandler &&
1753         this.replicationSourceHandler != null) {
1754       this.replicationSourceHandler.startReplicationService();
1755     } else {
1756       if (this.replicationSourceHandler != null) {
1757         this.replicationSourceHandler.startReplicationService();
1758       }
1759       if (this.replicationSinkHandler != null) {
1760         this.replicationSinkHandler.startReplicationService();
1761       }
1762     }
1763 
1764     // Create the log splitting worker and start it
1765     // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
1766     // quite a while inside HConnection layer. The worker won't be available for other
1767     // tasks even after current task is preempted after a split task times out.
1768     Configuration sinkConf = HBaseConfiguration.create(conf);
1769     sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1770       conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
1771     sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1772       conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
1773     sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
1774     this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory);
1775     splitLogWorker.start();
1776   }
1777 
1778   /**
1779    * Puts up the webui.
1780    * @return Returns final port -- maybe different from what we started with.
1781    * @throws IOException
1782    */
1783   private int putUpWebUI() throws IOException {
1784     int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
1785       HConstants.DEFAULT_REGIONSERVER_INFOPORT);
1786     String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
1787 
1788     if(this instanceof HMaster) {
1789       port = conf.getInt(HConstants.MASTER_INFO_PORT,
1790           HConstants.DEFAULT_MASTER_INFOPORT);
1791       addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
1792     }
1793     // -1 is for disabling info server
1794     if (port < 0) return port;
1795 
1796     if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
1797       String msg =
1798           "Failed to start http info server. Address " + addr
1799               + " does not belong to this host. Correct configuration parameter: "
1800               + "hbase.regionserver.info.bindAddress";
1801       LOG.error(msg);
1802       throw new IOException(msg);
1803     }
1804     // check if auto port bind enabled
1805     boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
1806         false);
1807     while (true) {
1808       try {
1809         this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
1810         infoServer.addServlet("dump", "/dump", getDumpServlet());
1811         configureInfoServer();
1812         this.infoServer.start();
1813         break;
1814       } catch (BindException e) {
1815         if (!auto) {
1816           // auto bind disabled throw BindException
1817           LOG.error("Failed binding http info server to port: " + port);
1818           throw e;
1819         }
1820         // auto bind enabled, try to use another port
1821         LOG.info("Failed binding http info server to port: " + port);
1822         port++;
1823       }
1824     }
1825     port = this.infoServer.getPort();
1826     conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
1827     int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT,
1828       HConstants.DEFAULT_MASTER_INFOPORT);
1829     conf.setInt("hbase.master.info.port.orig", masterInfoPort);
1830     conf.setInt(HConstants.MASTER_INFO_PORT, port);
1831     return port;
1832   }
1833 
1834   /*
1835    * Verify that server is healthy
1836    */
1837   private boolean isHealthy() {
1838     if (!fsOk) {
1839       // File system problem
1840       return false;
1841     }
1842     // Verify that all threads are alive
1843     if (!(leases.isAlive()
1844         && cacheFlusher.isAlive() && walRoller.isAlive()
1845         && this.compactionChecker.isScheduled()
1846         && this.periodicFlusher.isScheduled())) {
1847       stop("One or more threads are no longer alive -- stop");
1848       return false;
1849     }
1850     final LogRoller metawalRoller = this.metawalRoller.get();
1851     if (metawalRoller != null && !metawalRoller.isAlive()) {
1852       stop("Meta WAL roller thread is no longer alive -- stop");
1853       return false;
1854     }
1855     return true;
1856   }
1857 
1858   private static final byte[] UNSPECIFIED_REGION = new byte[]{};
1859 
1860   @Override
1861   public WAL getWAL(HRegionInfo regionInfo) throws IOException {
1862     WAL wal;
1863     LogRoller roller = walRoller;
1864     //_ROOT_ and hbase:meta regions have separate WAL.
1865     if (regionInfo != null && regionInfo.isMetaTable() &&
1866         regionInfo.getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
1867       roller = ensureMetaWALRoller();
1868       wal = walFactory.getMetaWAL(regionInfo.getEncodedNameAsBytes());
1869     } else if (regionInfo == null) {
1870       wal = walFactory.getWAL(UNSPECIFIED_REGION);
1871     } else {
1872       wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes());
1873     }
1874     roller.addWAL(wal);
1875     return wal;
1876   }
1877 
1878   @Override
1879   public ClusterConnection getConnection() {
1880     return this.clusterConnection;
1881   }
1882 
1883   @Override
1884   public MetaTableLocator getMetaTableLocator() {
1885     return this.metaTableLocator;
1886   }
1887 
1888   @Override
1889   public void stop(final String msg) {
1890     if (!this.stopped) {
1891       try {
1892         if (this.rsHost != null) {
1893           this.rsHost.preStop(msg);
1894         }
1895         this.stopped = true;
1896         LOG.info("STOPPED: " + msg);
1897         // Wakes run() if it is sleeping
1898         sleeper.skipSleepCycle();
1899       } catch (IOException exp) {
1900         LOG.warn("The region server did not stop", exp);
1901       }
1902     }
1903   }
1904 
1905   public void waitForServerOnline(){
1906     while (!isStopped() && !isOnline()) {
1907       synchronized (online) {
1908         try {
1909           online.wait(msgInterval);
1910         } catch (InterruptedException ie) {
1911           Thread.currentThread().interrupt();
1912           break;
1913         }
1914       }
1915     }
1916   }
1917 
1918   @Override
1919   public void postOpenDeployTasks(final Region r) throws KeeperException, IOException {
1920     postOpenDeployTasks(new PostOpenDeployContext(r, -1));
1921   }
1922 
1923   @Override
1924   public void postOpenDeployTasks(final PostOpenDeployContext context)
1925       throws KeeperException, IOException {
1926     Region r = context.getRegion();
1927     long masterSystemTime = context.getMasterSystemTime();
1928     Preconditions.checkArgument(r instanceof HRegion, "r must be an HRegion");
1929     rpcServices.checkOpen();
1930     LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString());
1931     // Do checks to see if we need to compact (references or too many files)
1932     for (Store s : r.getStores()) {
1933       if (s.hasReferences() || s.needsCompaction()) {
1934        this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
1935       }
1936     }
1937     long openSeqNum = r.getOpenSeqNum();
1938     if (openSeqNum == HConstants.NO_SEQNUM) {
1939       // If we opened a region, we should have read some sequence number from it.
1940       LOG.error("No sequence number found when opening " +
1941         r.getRegionInfo().getRegionNameAsString());
1942       openSeqNum = 0;
1943     }
1944 
1945     // Update flushed sequence id of a recovering region in ZK
1946     updateRecoveringRegionLastFlushedSequenceId(r);
1947 
1948     // Update ZK, or META
1949     if (r.getRegionInfo().isMetaRegion()) {
1950       MetaTableLocator.setMetaLocation(getZooKeeper(), serverName, r.getRegionInfo().getReplicaId(),
1951          State.OPEN);
1952     } else if (useZKForAssignment) {
1953       MetaTableAccessor.updateRegionLocation(getConnection(), r.getRegionInfo(),
1954         this.serverName, openSeqNum, masterSystemTime);
1955     }
1956     if (!useZKForAssignment && !reportRegionStateTransition(new RegionStateTransitionContext(
1957         TransitionCode.OPENED, openSeqNum, masterSystemTime, r.getRegionInfo()))) {
1958       throw new IOException("Failed to report opened region to master: "
1959         + r.getRegionInfo().getRegionNameAsString());
1960     }
1961 
1962     triggerFlushInPrimaryRegion((HRegion)r);
1963 
1964     LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
1965   }
1966 
1967   @Override
1968   public boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris) {
1969     return reportRegionStateTransition(code, HConstants.NO_SEQNUM, hris);
1970   }
1971 
1972   @Override
1973   public boolean reportRegionStateTransition(
1974       TransitionCode code, long openSeqNum, HRegionInfo... hris) {
1975     return reportRegionStateTransition(
1976       new RegionStateTransitionContext(code, HConstants.NO_SEQNUM, -1, hris));
1977   }
1978 
1979   @Override
1980   public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
1981     TransitionCode code = context.getCode();
1982     long openSeqNum = context.getOpenSeqNum();
1983     HRegionInfo[] hris = context.getHris();
1984 
1985     ReportRegionStateTransitionRequest.Builder builder =
1986       ReportRegionStateTransitionRequest.newBuilder();
1987     builder.setServer(ProtobufUtil.toServerName(serverName));
1988     RegionStateTransition.Builder transition = builder.addTransitionBuilder();
1989     transition.setTransitionCode(code);
1990     if (code == TransitionCode.OPENED && openSeqNum >= 0) {
1991       transition.setOpenSeqNum(openSeqNum);
1992     }
1993     for (HRegionInfo hri: hris) {
1994       transition.addRegionInfo(HRegionInfo.convert(hri));
1995     }
1996     ReportRegionStateTransitionRequest request = builder.build();
1997     while (keepLooping()) {
1998       RegionServerStatusService.BlockingInterface rss = rssStub;
1999       try {
2000         if (rss == null) {
2001           createRegionServerStatusStub();
2002           continue;
2003         }
2004         ReportRegionStateTransitionResponse response =
2005           rss.reportRegionStateTransition(null, request);
2006         if (response.hasErrorMessage()) {
2007           LOG.info("Failed to transition " + hris[0]
2008             + " to " + code + ": " + response.getErrorMessage());
2009           return false;
2010         }
2011         return true;
2012       } catch (ServiceException se) {
2013         IOException ioe = ProtobufUtil.getRemoteException(se);
2014         LOG.info("Failed to report region transition, will retry", ioe);
2015         if (rssStub == rss) {
2016           rssStub = null;
2017         }
2018       }
2019     }
2020     return false;
2021   }
2022 
2023   /**
2024    * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2025    * block this thread. See RegionReplicaFlushHandler for details.
2026    */
2027   void triggerFlushInPrimaryRegion(final HRegion region) {
2028     if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2029       return;
2030     }
2031     if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) ||
2032         !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(
2033           region.conf)) {
2034       region.setReadsEnabled(true);
2035       return;
2036     }
2037 
2038     region.setReadsEnabled(false); // disable reads before marking the region as opened.
2039     // RegionReplicaFlushHandler might reset this.
2040 
2041     // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2042     this.service.submit(
2043       new RegionReplicaFlushHandler(this, clusterConnection,
2044         rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
2045   }
2046 
2047   @Override
2048   public RpcServerInterface getRpcServer() {
2049     return rpcServices.rpcServer;
2050   }
2051 
2052   @VisibleForTesting
2053   public RSRpcServices getRSRpcServices() {
2054     return rpcServices;
2055   }
2056 
2057   /**
2058    * Cause the server to exit without closing the regions it is serving, the log
2059    * it is using and without notifying the master. Used unit testing and on
2060    * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
2061    *
2062    * @param reason
2063    *          the reason we are aborting
2064    * @param cause
2065    *          the exception that caused the abort, or null
2066    */
2067   @Override
2068   public void abort(String reason, Throwable cause) {
2069     String msg = "ABORTING region server " + this + ": " + reason;
2070     if (cause != null) {
2071       LOG.fatal(msg, cause);
2072     } else {
2073       LOG.fatal(msg);
2074     }
2075     this.abortRequested = true;
2076     // HBASE-4014: show list of coprocessors that were loaded to help debug
2077     // regionserver crashes.Note that we're implicitly using
2078     // java.util.HashSet's toString() method to print the coprocessor names.
2079     LOG.fatal("RegionServer abort: loaded coprocessors are: " +
2080         CoprocessorHost.getLoadedCoprocessors());
2081     // Try and dump metrics if abort -- might give clue as to how fatal came about....
2082     try {
2083       LOG.info("Dump of metrics as JSON on abort: " + JSONBean.dumpRegionServerMetrics());
2084     } catch (MalformedObjectNameException | IOException e) {
2085       LOG.warn("Failed dumping metrics", e);
2086     }
2087 
2088     // Do our best to report our abort to the master, but this may not work
2089     try {
2090       if (cause != null) {
2091         msg += "\nCause:\n" + StringUtils.stringifyException(cause);
2092       }
2093       // Report to the master but only if we have already registered with the master.
2094       if (rssStub != null && this.serverName != null) {
2095         ReportRSFatalErrorRequest.Builder builder =
2096           ReportRSFatalErrorRequest.newBuilder();
2097         ServerName sn =
2098           ServerName.parseVersionedServerName(this.serverName.getVersionedBytes());
2099         builder.setServer(ProtobufUtil.toServerName(sn));
2100         builder.setErrorMessage(msg);
2101         rssStub.reportRSFatalError(null, builder.build());
2102       }
2103     } catch (Throwable t) {
2104       LOG.warn("Unable to report fatal error to master", t);
2105     }
2106     stop(reason);
2107   }
2108 
2109   /**
2110    * @see HRegionServer#abort(String, Throwable)
2111    */
2112   public void abort(String reason) {
2113     abort(reason, null);
2114   }
2115 
2116   @Override
2117   public boolean isAborted() {
2118     return this.abortRequested;
2119   }
2120 
2121   /*
2122    * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
2123    * logs but it does close socket in case want to bring up server on old
2124    * hostname+port immediately.
2125    */
2126   protected void kill() {
2127     this.killed = true;
2128     abort("Simulated kill");
2129   }
2130 
2131   /**
2132    * Called on stop/abort before closing the cluster connection and meta locator.
2133    */
2134   protected void sendShutdownInterrupt() {
2135   }
2136 
2137   /**
2138    * Wait on all threads to finish. Presumption is that all closes and stops
2139    * have already been called.
2140    */
2141   protected void stopServiceThreads() {
2142     // clean up the scheduled chores
2143     if (this.choreService != null) choreService.shutdown();
2144     if (this.nonceManagerChore != null) nonceManagerChore.cancel(true);
2145     if (this.compactionChecker != null) compactionChecker.cancel(true);
2146     if (this.periodicFlusher != null) periodicFlusher.cancel(true);
2147     if (this.healthCheckChore != null) healthCheckChore.cancel(true);
2148     if (this.storefileRefresher != null) storefileRefresher.cancel(true);
2149     if (this.movedRegionsCleaner != null) movedRegionsCleaner.cancel(true);
2150 
2151     if (this.cacheFlusher != null) {
2152       this.cacheFlusher.join();
2153     }
2154 
2155     if (this.spanReceiverHost != null) {
2156       this.spanReceiverHost.closeReceivers();
2157     }
2158     if (this.walRoller != null) {
2159       Threads.shutdown(this.walRoller.getThread());
2160     }
2161     final LogRoller metawalRoller = this.metawalRoller.get();
2162     if (metawalRoller != null) {
2163       Threads.shutdown(metawalRoller.getThread());
2164     }
2165     if (this.compactSplitThread != null) {
2166       this.compactSplitThread.join();
2167     }
2168     if (this.service != null) this.service.shutdown();
2169     if (this.replicationSourceHandler != null &&
2170         this.replicationSourceHandler == this.replicationSinkHandler) {
2171       this.replicationSourceHandler.stopReplicationService();
2172     } else {
2173       if (this.replicationSourceHandler != null) {
2174         this.replicationSourceHandler.stopReplicationService();
2175       }
2176       if (this.replicationSinkHandler != null) {
2177         this.replicationSinkHandler.stopReplicationService();
2178       }
2179     }
2180   }
2181 
2182   /**
2183    * @return Return the object that implements the replication
2184    * source service.
2185    */
2186   ReplicationSourceService getReplicationSourceService() {
2187     return replicationSourceHandler;
2188   }
2189 
2190   /**
2191    * @return Return the object that implements the replication
2192    * sink service.
2193    */
2194   ReplicationSinkService getReplicationSinkService() {
2195     return replicationSinkHandler;
2196   }
2197 
2198   /**
2199    * Get the current master from ZooKeeper and open the RPC connection to it.
2200    * To get a fresh connection, the current rssStub must be null.
2201    * Method will block until a master is available. You can break from this
2202    * block by requesting the server stop.
2203    *
2204    * @return master + port, or null if server has been stopped
2205    */
2206   @VisibleForTesting
2207   protected synchronized ServerName createRegionServerStatusStub() {
2208     // Create RS stub without refreshing the master node from ZK, use cached data
2209     return createRegionServerStatusStub(false);
2210   }
2211 
2212   /**
2213    * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2214    * connection, the current rssStub must be null. Method will block until a master is available.
2215    * You can break from this block by requesting the server stop.
2216    * @param refresh If true then master address will be read from ZK, otherwise use cached data
2217    * @return master + port, or null if server has been stopped
2218    */
2219   @VisibleForTesting
2220   protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
2221     if (rssStub != null) {
2222       return masterAddressTracker.getMasterAddress();
2223     }
2224     ServerName sn = null;
2225     long previousLogTime = 0;
2226     RegionServerStatusService.BlockingInterface intf = null;
2227     boolean interrupted = false;
2228     try {
2229       while (keepLooping()) {
2230         sn = this.masterAddressTracker.getMasterAddress(refresh);
2231         if (sn == null) {
2232           if (!keepLooping()) {
2233             // give up with no connection.
2234             LOG.debug("No master found and cluster is stopped; bailing out");
2235             return null;
2236           }
2237           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2238             LOG.debug("No master found; retry");
2239             previousLogTime = System.currentTimeMillis();
2240           }
2241           refresh = true; // let's try pull it from ZK directly
2242           if (sleep(200)) {
2243             interrupted = true;
2244           }
2245           continue;
2246         }
2247 
2248         // If we are on the active master, use the shortcut
2249         if (this instanceof HMaster && sn.equals(getServerName())) {
2250           intf = ((HMaster)this).getMasterRpcServices();
2251           break;
2252         }
2253         try {
2254           BlockingRpcChannel channel =
2255             this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(),
2256               shortOperationTimeout);
2257           intf = RegionServerStatusService.newBlockingStub(channel);
2258           break;
2259         } catch (IOException e) {
2260           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2261             e = e instanceof RemoteException ?
2262               ((RemoteException)e).unwrapRemoteException() : e;
2263             if (e instanceof ServerNotRunningYetException) {
2264               LOG.info("Master isn't available yet, retrying");
2265             } else {
2266               LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2267             }
2268             previousLogTime = System.currentTimeMillis();
2269           }
2270           if (sleep(200)) {
2271             interrupted = true;
2272           }
2273         }
2274       }
2275     } finally {
2276       if (interrupted) {
2277         Thread.currentThread().interrupt();
2278       }
2279     }
2280     rssStub = intf;
2281     return sn;
2282   }
2283 
2284   /**
2285    * @return True if we should break loop because cluster is going down or
2286    * this server has been stopped or hdfs has gone bad.
2287    */
2288   private boolean keepLooping() {
2289     return !this.stopped && isClusterUp();
2290   }
2291 
2292   /*
2293    * Let the master know we're here Run initialization using parameters passed
2294    * us by the master.
2295    * @return A Map of key/value configurations we got from the Master else
2296    * null if we failed to register.
2297    * @throws IOException
2298    */
2299   private RegionServerStartupResponse reportForDuty() throws IOException {
2300     ServerName masterServerName = createRegionServerStatusStub(true);
2301     if (masterServerName == null) return null;
2302     RegionServerStartupResponse result = null;
2303     try {
2304       rpcServices.requestCount.set(0);
2305       LOG.info("reportForDuty to master=" + masterServerName + " with port="
2306         + rpcServices.isa.getPort() + ", startcode=" + this.startcode);
2307       long now = EnvironmentEdgeManager.currentTime();
2308       int port = rpcServices.isa.getPort();
2309       RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2310       if (shouldUseThisHostnameInstead()) {
2311         request.setUseThisHostnameInstead(useThisHostnameInstead);
2312       }
2313       request.setPort(port);
2314       request.setServerStartCode(this.startcode);
2315       request.setServerCurrentTime(now);
2316       result = this.rssStub.regionServerStartup(null, request.build());
2317     } catch (ServiceException se) {
2318       IOException ioe = ProtobufUtil.getRemoteException(se);
2319       if (ioe instanceof ClockOutOfSyncException) {
2320         LOG.fatal("Master rejected startup because clock is out of sync", ioe);
2321         // Re-throw IOE will cause RS to abort
2322         throw ioe;
2323       } else if (ioe instanceof ServerNotRunningYetException) {
2324         LOG.debug("Master is not running yet");
2325       } else {
2326         LOG.warn("error telling master we are up", se);
2327       }
2328       rssStub = null;
2329     }
2330     return result;
2331   }
2332 
2333   @Override
2334   public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2335     try {
2336       GetLastFlushedSequenceIdRequest req =
2337           RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2338       RegionServerStatusService.BlockingInterface rss = rssStub;
2339       if (rss == null) { // Try to connect one more time
2340         createRegionServerStatusStub();
2341         rss = rssStub;
2342         if (rss == null) {
2343           // Still no luck, we tried
2344           LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2345           return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2346               .build();
2347         }
2348       }
2349       GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2350       return RegionStoreSequenceIds.newBuilder()
2351           .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2352           .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2353     } catch (ServiceException e) {
2354       LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2355       return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2356           .build();
2357     }
2358   }
2359 
2360   /**
2361    * Closes all regions.  Called on our way out.
2362    * Assumes that its not possible for new regions to be added to onlineRegions
2363    * while this method runs.
2364    */
2365   protected void closeAllRegions(final boolean abort) {
2366     closeUserRegions(abort);
2367     closeMetaTableRegions(abort);
2368   }
2369 
2370   /**
2371    * Close meta region if we carry it
2372    * @param abort Whether we're running an abort.
2373    */
2374   void closeMetaTableRegions(final boolean abort) {
2375     Region meta = null;
2376     this.lock.writeLock().lock();
2377     try {
2378       for (Map.Entry<String, Region> e: onlineRegions.entrySet()) {
2379         HRegionInfo hri = e.getValue().getRegionInfo();
2380         if (hri.isMetaRegion()) {
2381           meta = e.getValue();
2382         }
2383         if (meta != null) break;
2384       }
2385     } finally {
2386       this.lock.writeLock().unlock();
2387     }
2388     if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2389   }
2390 
2391   /**
2392    * Schedule closes on all user regions.
2393    * Should be safe calling multiple times because it wont' close regions
2394    * that are already closed or that are closing.
2395    * @param abort Whether we're running an abort.
2396    */
2397   void closeUserRegions(final boolean abort) {
2398     this.lock.writeLock().lock();
2399     try {
2400       for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
2401         Region r = e.getValue();
2402         if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) {
2403           // Don't update zk with this close transition; pass false.
2404           closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2405         }
2406       }
2407     } finally {
2408       this.lock.writeLock().unlock();
2409     }
2410   }
2411 
2412   /** @return the info server */
2413   public InfoServer getInfoServer() {
2414     return infoServer;
2415   }
2416 
2417   /**
2418    * @return true if a stop has been requested.
2419    */
2420   @Override
2421   public boolean isStopped() {
2422     return this.stopped;
2423   }
2424 
2425   @Override
2426   public boolean isStopping() {
2427     return this.stopping;
2428   }
2429 
2430   @Override
2431   public Map<String, Region> getRecoveringRegions() {
2432     return this.recoveringRegions;
2433   }
2434 
2435   /**
2436    *
2437    * @return the configuration
2438    */
2439   @Override
2440   public Configuration getConfiguration() {
2441     return conf;
2442   }
2443 
2444   /** @return the write lock for the server */
2445   ReentrantReadWriteLock.WriteLock getWriteLock() {
2446     return lock.writeLock();
2447   }
2448 
2449   public int getNumberOfOnlineRegions() {
2450     return this.onlineRegions.size();
2451   }
2452 
2453   boolean isOnlineRegionsEmpty() {
2454     return this.onlineRegions.isEmpty();
2455   }
2456 
2457   /**
2458    * For tests, web ui and metrics.
2459    * This method will only work if HRegionServer is in the same JVM as client;
2460    * HRegion cannot be serialized to cross an rpc.
2461    */
2462   public Collection<Region> getOnlineRegionsLocalContext() {
2463     Collection<Region> regions = this.onlineRegions.values();
2464     return Collections.unmodifiableCollection(regions);
2465   }
2466 
2467   @Override
2468   public void addToOnlineRegions(Region region) {
2469     this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2470     configurationManager.registerObserver(region);
2471   }
2472 
2473   /**
2474    * @return A new Map of online regions sorted by region size with the first entry being the
2475    * biggest.  If two regions are the same size, then the last one found wins; i.e. this method
2476    * may NOT return all regions.
2477    */
2478   SortedMap<Long, Region> getCopyOfOnlineRegionsSortedBySize() {
2479     // we'll sort the regions in reverse
2480     SortedMap<Long, Region> sortedRegions = new TreeMap<Long, Region>(
2481         new Comparator<Long>() {
2482           @Override
2483           public int compare(Long a, Long b) {
2484             return -1 * a.compareTo(b);
2485           }
2486         });
2487     // Copy over all regions. Regions are sorted by size with biggest first.
2488     for (Region region : this.onlineRegions.values()) {
2489       sortedRegions.put(region.getMemstoreSize(), region);
2490     }
2491     return sortedRegions;
2492   }
2493 
2494   /**
2495    * @return time stamp in millis of when this region server was started
2496    */
2497   public long getStartcode() {
2498     return this.startcode;
2499   }
2500 
2501   /** @return reference to FlushRequester */
2502   @Override
2503   public FlushRequester getFlushRequester() {
2504     return this.cacheFlusher;
2505   }
2506 
2507   /**
2508    * Get the top N most loaded regions this server is serving so we can tell the
2509    * master which regions it can reallocate if we're overloaded. TODO: actually
2510    * calculate which regions are most loaded. (Right now, we're just grabbing
2511    * the first N regions being served regardless of load.)
2512    */
2513   protected HRegionInfo[] getMostLoadedRegions() {
2514     ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2515     for (Region r : onlineRegions.values()) {
2516       if (!r.isAvailable()) {
2517         continue;
2518       }
2519       if (regions.size() < numRegionsToReport) {
2520         regions.add(r.getRegionInfo());
2521       } else {
2522         break;
2523       }
2524     }
2525     return regions.toArray(new HRegionInfo[regions.size()]);
2526   }
2527 
2528   @Override
2529   public Leases getLeases() {
2530     return leases;
2531   }
2532 
2533   /**
2534    * @return Return the rootDir.
2535    */
2536   protected Path getRootDir() {
2537     return rootDir;
2538   }
2539 
2540   /**
2541    * @return Return the fs.
2542    */
2543   @Override
2544   public FileSystem getFileSystem() {
2545     return fs;
2546   }
2547 
2548   @Override
2549   public String toString() {
2550     return getServerName().toString();
2551   }
2552 
2553   /**
2554    * Interval at which threads should run
2555    *
2556    * @return the interval
2557    */
2558   public int getThreadWakeFrequency() {
2559     return threadWakeFrequency;
2560   }
2561 
2562   @Override
2563   public ZooKeeperWatcher getZooKeeper() {
2564     return zooKeeper;
2565   }
2566 
2567   @Override
2568   public BaseCoordinatedStateManager getCoordinatedStateManager() {
2569     return csm;
2570   }
2571 
2572   @Override
2573   public ServerName getServerName() {
2574     return serverName;
2575   }
2576 
2577   @Override
2578   public CompactionRequestor getCompactionRequester() {
2579     return this.compactSplitThread;
2580   }
2581 
2582   public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
2583     return this.rsHost;
2584   }
2585 
2586   @Override
2587   public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2588     return this.regionsInTransitionInRS;
2589   }
2590 
2591   @Override
2592   public ExecutorService getExecutorService() {
2593     return service;
2594   }
2595 
2596   @Override
2597   public ChoreService getChoreService() {
2598     return choreService;
2599   }
2600   
2601   @Override
2602   public RegionServerQuotaManager getRegionServerQuotaManager() {
2603     return rsQuotaManager;
2604   }
2605 
2606   //
2607   // Main program and support routines
2608   //
2609 
2610   /**
2611    * Load the replication service objects, if any
2612    */
2613   static private void createNewReplicationInstance(Configuration conf,
2614     HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
2615 
2616     // If replication is not enabled, then return immediately.
2617     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
2618         HConstants.REPLICATION_ENABLE_DEFAULT)) {
2619       return;
2620     }
2621 
2622     // read in the name of the source replication class from the config file.
2623     String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2624                                HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2625 
2626     // read in the name of the sink replication class from the config file.
2627     String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2628                              HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2629 
2630     // If both the sink and the source class names are the same, then instantiate
2631     // only one object.
2632     if (sourceClassname.equals(sinkClassname)) {
2633       server.replicationSourceHandler = (ReplicationSourceService)
2634                                          newReplicationInstance(sourceClassname,
2635                                          conf, server, fs, logDir, oldLogDir);
2636       server.replicationSinkHandler = (ReplicationSinkService)
2637                                          server.replicationSourceHandler;
2638     } else {
2639       server.replicationSourceHandler = (ReplicationSourceService)
2640                                          newReplicationInstance(sourceClassname,
2641                                          conf, server, fs, logDir, oldLogDir);
2642       server.replicationSinkHandler = (ReplicationSinkService)
2643                                          newReplicationInstance(sinkClassname,
2644                                          conf, server, fs, logDir, oldLogDir);
2645     }
2646   }
2647 
2648   static private ReplicationService newReplicationInstance(String classname,
2649     Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
2650     Path oldLogDir) throws IOException{
2651 
2652     Class<?> clazz = null;
2653     try {
2654       ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2655       clazz = Class.forName(classname, true, classLoader);
2656     } catch (java.lang.ClassNotFoundException nfe) {
2657       throw new IOException("Could not find class for " + classname);
2658     }
2659 
2660     // create an instance of the replication object.
2661     ReplicationService service = (ReplicationService)
2662                               ReflectionUtils.newInstance(clazz, conf);
2663     service.initialize(server, fs, logDir, oldLogDir);
2664     return service;
2665   }
2666 
2667   /**
2668    * Utility for constructing an instance of the passed HRegionServer class.
2669    *
2670    * @param regionServerClass
2671    * @param conf2
2672    * @return HRegionServer instance.
2673    */
2674   public static HRegionServer constructRegionServer(
2675       Class<? extends HRegionServer> regionServerClass,
2676       final Configuration conf2, CoordinatedStateManager cp) {
2677     try {
2678       Constructor<? extends HRegionServer> c = regionServerClass
2679           .getConstructor(Configuration.class, CoordinatedStateManager.class);
2680       return c.newInstance(conf2, cp);
2681     } catch (Exception e) {
2682       throw new RuntimeException("Failed construction of " + "Regionserver: "
2683           + regionServerClass.toString(), e);
2684     }
2685   }
2686 
2687   /**
2688    * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2689    */
2690   public static void main(String[] args) throws Exception {
2691     VersionInfo.logVersion();
2692     Configuration conf = HBaseConfiguration.create();
2693     @SuppressWarnings("unchecked")
2694     Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2695         .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2696 
2697     new HRegionServerCommandLine(regionServerClass).doMain(args);
2698   }
2699 
2700   /**
2701    * Gets the online regions of the specified table.
2702    * This method looks at the in-memory onlineRegions.  It does not go to <code>hbase:meta</code>.
2703    * Only returns <em>online</em> regions.  If a region on this table has been
2704    * closed during a disable, etc., it will not be included in the returned list.
2705    * So, the returned list may not necessarily be ALL regions in this table, its
2706    * all the ONLINE regions in the table.
2707    * @param tableName
2708    * @return Online regions from <code>tableName</code>
2709    */
2710   @Override
2711   public List<Region> getOnlineRegions(TableName tableName) {
2712      List<Region> tableRegions = new ArrayList<Region>();
2713      synchronized (this.onlineRegions) {
2714        for (Region region: this.onlineRegions.values()) {
2715          HRegionInfo regionInfo = region.getRegionInfo();
2716          if(regionInfo.getTable().equals(tableName)) {
2717            tableRegions.add(region);
2718          }
2719        }
2720      }
2721      return tableRegions;
2722    }
2723   
2724   /**
2725    * Gets the online tables in this RS.
2726    * This method looks at the in-memory onlineRegions.
2727    * @return all the online tables in this RS
2728    */
2729   @Override
2730   public Set<TableName> getOnlineTables() {
2731     Set<TableName> tables = new HashSet<TableName>();
2732     synchronized (this.onlineRegions) {
2733       for (Region region: this.onlineRegions.values()) {
2734         tables.add(region.getTableDesc().getTableName());
2735       }
2736     }
2737     return tables;
2738   }
2739 
2740   // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
2741   public String[] getRegionServerCoprocessors() {
2742     TreeSet<String> coprocessors = new TreeSet<String>();
2743     try {
2744       coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
2745     } catch (IOException exception) {
2746       LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
2747           "skipping.");
2748       LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2749     }
2750     Collection<Region> regions = getOnlineRegionsLocalContext();
2751     for (Region region: regions) {
2752       coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2753       try {
2754         coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
2755       } catch (IOException exception) {
2756         LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
2757             "; skipping.");
2758         LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2759       }
2760     }
2761     return coprocessors.toArray(new String[coprocessors.size()]);
2762   }
2763 
2764   /**
2765    * Try to close the region, logs a warning on failure but continues.
2766    * @param region Region to close
2767    */
2768   private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
2769     try {
2770       CloseRegionCoordination.CloseRegionDetails details =
2771         csm.getCloseRegionCoordination().getDetaultDetails();
2772       if (!closeRegion(region.getEncodedName(), abort, details, null)) {
2773         LOG.warn("Failed to close " + region.getRegionNameAsString() +
2774             " - ignoring and continuing");
2775       }
2776     } catch (IOException e) {
2777       LOG.warn("Failed to close " + region.getRegionNameAsString() +
2778           " - ignoring and continuing", e);
2779     }
2780   }
2781 
2782   /**
2783    * Close asynchronously a region, can be called from the master or internally by the regionserver
2784    * when stopping. If called from the master, the region will update the znode status.
2785    *
2786    * <p>
2787    * If an opening was in progress, this method will cancel it, but will not start a new close. The
2788    * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
2789    * </p>
2790 
2791    * <p>
2792    *   If a close was in progress, this new request will be ignored, and an exception thrown.
2793    * </p>
2794    *
2795    * @param encodedName Region to close
2796    * @param abort True if we are aborting
2797    * @param crd details about closing region coordination-coordinated task
2798    * @return True if closed a region.
2799    * @throws NotServingRegionException if the region is not online
2800    * @throws RegionAlreadyInTransitionException if the region is already closing
2801    */
2802   protected boolean closeRegion(String encodedName, final boolean abort,
2803       CloseRegionCoordination.CloseRegionDetails crd, final ServerName sn)
2804       throws NotServingRegionException, RegionAlreadyInTransitionException {
2805     //Check for permissions to close.
2806     Region actualRegion = this.getFromOnlineRegions(encodedName);
2807     // Can be null if we're calling close on a region that's not online
2808     if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2809       try {
2810         actualRegion.getCoprocessorHost().preClose(false);
2811       } catch (IOException exp) {
2812         LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2813         return false;
2814       }
2815     }
2816 
2817     final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(encodedName.getBytes(),
2818         Boolean.FALSE);
2819 
2820     if (Boolean.TRUE.equals(previous)) {
2821       LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
2822           "trying to OPEN. Cancelling OPENING.");
2823       if (!regionsInTransitionInRS.replace(encodedName.getBytes(), previous, Boolean.FALSE)){
2824         // The replace failed. That should be an exceptional case, but theoretically it can happen.
2825         // We're going to try to do a standard close then.
2826         LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
2827             " Doing a standard close now");
2828         return closeRegion(encodedName, abort, crd, sn);
2829       }
2830       // Let's get the region from the online region list again
2831       actualRegion = this.getFromOnlineRegions(encodedName);
2832       if (actualRegion == null) { // If already online, we still need to close it.
2833         LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
2834         // The master deletes the znode when it receives this exception.
2835         throw new RegionAlreadyInTransitionException("The region " + encodedName +
2836           " was opening but not yet served. Opening is cancelled.");
2837       }
2838     } else if (Boolean.FALSE.equals(previous)) {
2839       LOG.info("Received CLOSE for the region: " + encodedName +
2840         ", which we are already trying to CLOSE, but not completed yet");
2841       // The master will retry till the region is closed. We need to do this since
2842       // the region could fail to close somehow. If we mark the region closed in master
2843       // while it is not, there could be data loss.
2844       // If the region stuck in closing for a while, and master runs out of retries,
2845       // master will move the region to failed_to_close. Later on, if the region
2846       // is indeed closed, master can properly re-assign it.
2847       throw new RegionAlreadyInTransitionException("The region " + encodedName +
2848         " was already closing. New CLOSE request is ignored.");
2849     }
2850 
2851     if (actualRegion == null) {
2852       LOG.error("Received CLOSE for a region which is not online, and we're not opening.");
2853       this.regionsInTransitionInRS.remove(encodedName.getBytes());
2854       // The master deletes the znode when it receives this exception.
2855       throw new NotServingRegionException("The region " + encodedName +
2856           " is not online, and is not opening.");
2857     }
2858 
2859     CloseRegionHandler crh;
2860     final HRegionInfo hri = actualRegion.getRegionInfo();
2861     if (hri.isMetaRegion()) {
2862       crh = new CloseMetaHandler(this, this, hri, abort,
2863         csm.getCloseRegionCoordination(), crd);
2864     } else {
2865       crh = new CloseRegionHandler(this, this, hri, abort,
2866         csm.getCloseRegionCoordination(), crd, sn);
2867     }
2868     this.service.submit(crh);
2869     return true;
2870   }
2871 
2872    /**
2873    * @param regionName
2874    * @return HRegion for the passed binary <code>regionName</code> or null if
2875    *         named region is not member of the online regions.
2876    */
2877   public Region getOnlineRegion(final byte[] regionName) {
2878     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2879     return this.onlineRegions.get(encodedRegionName);
2880   }
2881 
2882   public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) {
2883     return this.regionFavoredNodesMap.get(encodedRegionName);
2884   }
2885 
2886   @Override
2887   public Region getFromOnlineRegions(final String encodedRegionName) {
2888     return this.onlineRegions.get(encodedRegionName);
2889   }
2890 
2891 
2892   @Override
2893   public boolean removeFromOnlineRegions(final Region r, ServerName destination) {
2894     Region toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
2895     if (destination != null) {
2896       long closeSeqNum = r.getMaxFlushedSeqId();
2897       if (closeSeqNum == HConstants.NO_SEQNUM) {
2898         // No edits in WAL for this region; get the sequence number when the region was opened.
2899         closeSeqNum = r.getOpenSeqNum();
2900         if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0;
2901       }
2902       addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
2903     }
2904     this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
2905     return toReturn != null;
2906   }
2907 
2908   /**
2909    * Protected utility method for safely obtaining an HRegion handle.
2910    *
2911    * @param regionName
2912    *          Name of online {@link Region} to return
2913    * @return {@link Region} for <code>regionName</code>
2914    * @throws NotServingRegionException
2915    */
2916   protected Region getRegion(final byte[] regionName)
2917       throws NotServingRegionException {
2918     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2919     return getRegionByEncodedName(regionName, encodedRegionName);
2920   }
2921 
2922   public Region getRegionByEncodedName(String encodedRegionName)
2923       throws NotServingRegionException {
2924     return getRegionByEncodedName(null, encodedRegionName);
2925   }
2926 
2927   protected Region getRegionByEncodedName(byte[] regionName, String encodedRegionName)
2928     throws NotServingRegionException {
2929     Region region = this.onlineRegions.get(encodedRegionName);
2930     if (region == null) {
2931       MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
2932       if (moveInfo != null) {
2933         throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
2934       }
2935       Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
2936       String regionNameStr = regionName == null?
2937         encodedRegionName: Bytes.toStringBinary(regionName);
2938       if (isOpening != null && isOpening.booleanValue()) {
2939         throw new RegionOpeningException("Region " + regionNameStr +
2940           " is opening on " + this.serverName);
2941       }
2942       throw new NotServingRegionException("Region " + regionNameStr +
2943         " is not online on " + this.serverName);
2944     }
2945     return region;
2946   }
2947 
2948   /*
2949    * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
2950    * IOE if it isn't already.
2951    *
2952    * @param t Throwable
2953    *
2954    * @param msg Message to log in error. Can be null.
2955    *
2956    * @return Throwable converted to an IOE; methods can only let out IOEs.
2957    */
2958   private Throwable cleanup(final Throwable t, final String msg) {
2959     // Don't log as error if NSRE; NSRE is 'normal' operation.
2960     if (t instanceof NotServingRegionException) {
2961       LOG.debug("NotServingRegionException; " + t.getMessage());
2962       return t;
2963     }
2964     if (msg == null) {
2965       LOG.error("", RemoteExceptionHandler.checkThrowable(t));
2966     } else {
2967       LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
2968     }
2969     if (!rpcServices.checkOOME(t)) {
2970       checkFileSystem();
2971     }
2972     return t;
2973   }
2974 
2975   /*
2976    * @param t
2977    *
2978    * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
2979    *
2980    * @return Make <code>t</code> an IOE if it isn't already.
2981    */
2982   protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
2983     return (t instanceof IOException ? (IOException) t : msg == null
2984         || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
2985   }
2986 
2987   /**
2988    * Checks to see if the file system is still accessible. If not, sets
2989    * abortRequested and stopRequested
2990    *
2991    * @return false if file system is not available
2992    */
2993   public boolean checkFileSystem() {
2994     if (this.fsOk && this.fs != null) {
2995       try {
2996         FSUtils.checkFileSystemAvailable(this.fs);
2997       } catch (IOException e) {
2998         abort("File System not available", e);
2999         this.fsOk = false;
3000       }
3001     }
3002     return this.fsOk;
3003   }
3004 
3005   @Override
3006   public void updateRegionFavoredNodesMapping(String encodedRegionName,
3007       List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3008     InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
3009     // Refer to the comment on the declaration of regionFavoredNodesMap on why
3010     // it is a map of region name to InetSocketAddress[]
3011     for (int i = 0; i < favoredNodes.size(); i++) {
3012       addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
3013           favoredNodes.get(i).getPort());
3014     }
3015     regionFavoredNodesMap.put(encodedRegionName, addr);
3016   }
3017 
3018   /**
3019    * Return the favored nodes for a region given its encoded name. Look at the
3020    * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[]
3021    * @param encodedRegionName
3022    * @return array of favored locations
3023    */
3024   @Override
3025   public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3026     return regionFavoredNodesMap.get(encodedRegionName);
3027   }
3028 
3029   @Override
3030   public ServerNonceManager getNonceManager() {
3031     return this.nonceManager;
3032   }
3033 
3034   private static class MovedRegionInfo {
3035     private final ServerName serverName;
3036     private final long seqNum;
3037     private final long ts;
3038 
3039     public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3040       this.serverName = serverName;
3041       this.seqNum = closeSeqNum;
3042       ts = EnvironmentEdgeManager.currentTime();
3043      }
3044 
3045     public ServerName getServerName() {
3046       return serverName;
3047     }
3048 
3049     public long getSeqNum() {
3050       return seqNum;
3051     }
3052 
3053     public long getMoveTime() {
3054       return ts;
3055     }
3056   }
3057 
3058   // This map will contains all the regions that we closed for a move.
3059   //  We add the time it was moved as we don't want to keep too old information
3060   protected Map<String, MovedRegionInfo> movedRegions =
3061       new ConcurrentHashMap<String, MovedRegionInfo>(3000);
3062 
3063   // We need a timeout. If not there is a risk of giving a wrong information: this would double
3064   //  the number of network calls instead of reducing them.
3065   private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3066 
3067   protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
3068     if (ServerName.isSameHostnameAndPort(destination, this.getServerName())) {
3069       LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3070       return;
3071     }
3072     LOG.info("Adding moved region record: "
3073       + encodedName + " to " + destination + " as of " + closeSeqNum);
3074     movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3075   }
3076 
3077   void removeFromMovedRegions(String encodedName) {
3078     movedRegions.remove(encodedName);
3079   }
3080 
3081   private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
3082     MovedRegionInfo dest = movedRegions.get(encodedRegionName);
3083 
3084     long now = EnvironmentEdgeManager.currentTime();
3085     if (dest != null) {
3086       if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
3087         return dest;
3088       } else {
3089         movedRegions.remove(encodedRegionName);
3090       }
3091     }
3092 
3093     return null;
3094   }
3095 
3096   /**
3097    * Remove the expired entries from the moved regions list.
3098    */
3099   protected void cleanMovedRegions() {
3100     final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
3101     Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();
3102 
3103     while (it.hasNext()){
3104       Map.Entry<String, MovedRegionInfo> e = it.next();
3105       if (e.getValue().getMoveTime() < cutOff) {
3106         it.remove();
3107       }
3108     }
3109   }
3110 
3111   /*
3112    * Use this to allow tests to override and schedule more frequently.
3113    */
3114 
3115   protected int movedRegionCleanerPeriod() {
3116         return TIMEOUT_REGION_MOVED;
3117   }
3118 
3119   /**
3120    * Creates a Chore thread to clean the moved region cache.
3121    */
3122 
3123   protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable {
3124     private HRegionServer regionServer;
3125     Stoppable stoppable;
3126 
3127     private MovedRegionsCleaner(
3128       HRegionServer regionServer, Stoppable stoppable){
3129       super("MovedRegionsCleaner for region " + regionServer, stoppable,
3130           regionServer.movedRegionCleanerPeriod());
3131       this.regionServer = regionServer;
3132       this.stoppable = stoppable;
3133     }
3134 
3135     static MovedRegionsCleaner create(HRegionServer rs){
3136       Stoppable stoppable = new Stoppable() {
3137         private volatile boolean isStopped = false;
3138         @Override public void stop(String why) { isStopped = true;}
3139         @Override public boolean isStopped() {return isStopped;}
3140       };
3141 
3142       return new MovedRegionsCleaner(rs, stoppable);
3143     }
3144 
3145     @Override
3146     protected void chore() {
3147       regionServer.cleanMovedRegions();
3148     }
3149 
3150     @Override
3151     public void stop(String why) {
3152       stoppable.stop(why);
3153     }
3154 
3155     @Override
3156     public boolean isStopped() {
3157       return stoppable.isStopped();
3158     }
3159   }
3160 
3161   private String getMyEphemeralNodePath() {
3162     return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
3163   }
3164 
3165   private boolean isHealthCheckerConfigured() {
3166     String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3167     return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
3168   }
3169 
3170   /**
3171    * @return the underlying {@link CompactSplitThread} for the servers
3172    */
3173   public CompactSplitThread getCompactSplitThread() {
3174     return this.compactSplitThread;
3175   }
3176 
3177   /**
3178    * A helper function to store the last flushed sequence Id with the previous failed RS for a
3179    * recovering region. The Id is used to skip wal edits which are flushed. Since the flushed
3180    * sequence id is only valid for each RS, we associate the Id with corresponding failed RS.
3181    * @throws KeeperException
3182    * @throws IOException
3183    */
3184   private void updateRecoveringRegionLastFlushedSequenceId(Region r) throws KeeperException,
3185       IOException {
3186     if (!r.isRecovering()) {
3187       // return immdiately for non-recovering regions
3188       return;
3189     }
3190 
3191     HRegionInfo regionInfo = r.getRegionInfo();
3192     ZooKeeperWatcher zkw = getZooKeeper();
3193     String previousRSName = this.getLastFailedRSFromZK(regionInfo.getEncodedName());
3194     Map<byte[], Long> maxSeqIdInStores = r.getMaxStoreSeqId();
3195     long minSeqIdForLogReplay = -1;
3196     for (Long storeSeqIdForReplay : maxSeqIdInStores.values()) {
3197       if (minSeqIdForLogReplay == -1 || storeSeqIdForReplay < minSeqIdForLogReplay) {
3198         minSeqIdForLogReplay = storeSeqIdForReplay;
3199       }
3200     }
3201 
3202     try {
3203       long lastRecordedFlushedSequenceId = -1;
3204       String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode,
3205         regionInfo.getEncodedName());
3206       // recovering-region level
3207       byte[] data;
3208       try {
3209         data = ZKUtil.getData(zkw, nodePath);
3210       } catch (InterruptedException e) {
3211         throw new InterruptedIOException();
3212       }
3213       if (data != null) {
3214         lastRecordedFlushedSequenceId = ZKSplitLog.parseLastFlushedSequenceIdFrom(data);
3215       }
3216       if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
3217         ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
3218       }
3219       if (previousRSName != null) {
3220         // one level deeper for the failed RS
3221         nodePath = ZKUtil.joinZNode(nodePath, previousRSName);
3222         ZKUtil.setData(zkw, nodePath,
3223           ZKUtil.regionSequenceIdsToByteArray(minSeqIdForLogReplay, maxSeqIdInStores));
3224         LOG.debug("Update last flushed sequence id of region " + regionInfo.getEncodedName() +
3225           " for " + previousRSName);
3226       } else {
3227         LOG.warn("Can't find failed region server for recovering region " +
3228             regionInfo.getEncodedName());
3229       }
3230     } catch (NoNodeException ignore) {
3231       LOG.debug("Region " + regionInfo.getEncodedName() +
3232         " must have completed recovery because its recovery znode has been removed", ignore);
3233     }
3234   }
3235 
3236   /**
3237    * Return the last failed RS name under /hbase/recovering-regions/encodedRegionName
3238    * @param encodedRegionName
3239    * @throws KeeperException
3240    */
3241   private String getLastFailedRSFromZK(String encodedRegionName) throws KeeperException {
3242     String result = null;
3243     long maxZxid = 0;
3244     ZooKeeperWatcher zkw = this.getZooKeeper();
3245     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
3246     List<String> failedServers = ZKUtil.listChildrenNoWatch(zkw, nodePath);
3247     if (failedServers == null || failedServers.isEmpty()) {
3248       return result;
3249     }
3250     for (String failedServer : failedServers) {
3251       String rsPath = ZKUtil.joinZNode(nodePath, failedServer);
3252       Stat stat = new Stat();
3253       ZKUtil.getDataNoWatch(zkw, rsPath, stat);
3254       if (maxZxid < stat.getCzxid()) {
3255         maxZxid = stat.getCzxid();
3256         result = failedServer;
3257       }
3258     }
3259     return result;
3260   }
3261 
3262   public CoprocessorServiceResponse execRegionServerService(
3263       @SuppressWarnings("UnusedParameters") final RpcController controller,
3264       final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3265     try {
3266       ServerRpcController serviceController = new ServerRpcController();
3267       CoprocessorServiceCall call = serviceRequest.getCall();
3268       String serviceName = call.getServiceName();
3269       String methodName = call.getMethodName();
3270       if (!coprocessorServiceHandlers.containsKey(serviceName)) {
3271         throw new UnknownProtocolException(null,
3272             "No registered coprocessor service found for name " + serviceName);
3273       }
3274       Service service = coprocessorServiceHandlers.get(serviceName);
3275       Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
3276       Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3277       if (methodDesc == null) {
3278         throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName
3279             + " called on service " + serviceName);
3280       }
3281       Message.Builder builderForType = service.getRequestPrototype(methodDesc).newBuilderForType();
3282       ProtobufUtil.mergeFrom(builderForType, call.getRequest());
3283       Message request = builderForType.build();
3284       final Message.Builder responseBuilder =
3285           service.getResponsePrototype(methodDesc).newBuilderForType();
3286       service.callMethod(methodDesc, serviceController, request, new RpcCallback<Message>() {
3287         @Override
3288         public void run(Message message) {
3289           if (message != null) {
3290             responseBuilder.mergeFrom(message);
3291           }
3292         }
3293       });
3294       IOException exception = ResponseConverter.getControllerException(serviceController);
3295       if (exception != null) {
3296         throw exception;
3297       }
3298       Message execResult = responseBuilder.build();
3299       ClientProtos.CoprocessorServiceResponse.Builder builder =
3300           ClientProtos.CoprocessorServiceResponse.newBuilder();
3301       builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
3302         HConstants.EMPTY_BYTE_ARRAY));
3303       builder.setValue(builder.getValueBuilder().setName(execResult.getClass().getName())
3304           .setValue(execResult.toByteString()));
3305       return builder.build();
3306     } catch (IOException ie) {
3307       throw new ServiceException(ie);
3308     }
3309   }
3310 
3311   /**
3312    * @return The cache config instance used by the regionserver.
3313    */
3314   public CacheConfig getCacheConfig() {
3315     return this.cacheConfig;
3316   }
3317 
3318   /**
3319    * @return : Returns the ConfigurationManager object for testing purposes.
3320    */
3321   protected ConfigurationManager getConfigurationManager() {
3322     return configurationManager;
3323   }
3324 
3325   /**
3326    * @return Return table descriptors implementation.
3327    */
3328   public TableDescriptors getTableDescriptors() {
3329     return this.tableDescriptors;
3330   }
3331 
3332   /**
3333    * Reload the configuration from disk.
3334    */
3335   public void updateConfiguration() {
3336     LOG.info("Reloading the configuration from disk.");
3337     // Reload the configuration from disk.
3338     conf.reloadConfiguration();
3339     configurationManager.notifyAllObservers(conf);
3340   }
3341 
3342   @Override
3343   public HeapMemoryManager getHeapMemoryManager() {
3344     return hMemManager;
3345   }
3346 
3347   @Override
3348   public double getCompactionPressure() {
3349     double max = 0;
3350     for (Region region : onlineRegions.values()) {
3351       for (Store store : region.getStores()) {
3352         double normCount = store.getCompactionPressure();
3353         if (normCount > max) {
3354           max = normCount;
3355         }
3356       }
3357     }
3358     return max;
3359   }
3360 
3361   /**
3362    * For testing
3363    * @return whether all wal roll request finished for this regionserver
3364    */
3365   @VisibleForTesting
3366   public boolean walRollRequestFinished() {
3367     return this.walRoller.walRollFinished();
3368   }
3369 }