View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.lang.Thread.UncaughtExceptionHandler;
24  import java.lang.management.ManagementFactory;
25  import java.lang.management.MemoryUsage;
26  import java.lang.reflect.Constructor;
27  import java.net.BindException;
28  import java.net.InetAddress;
29  import java.net.InetSocketAddress;
30  import java.util.ArrayList;
31  import java.util.Collection;
32  import java.util.Collections;
33  import java.util.Comparator;
34  import java.util.HashMap;
35  import java.util.HashSet;
36  import java.util.Iterator;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.Map.Entry;
40  import java.util.Set;
41  import java.util.SortedMap;
42  import java.util.TreeMap;
43  import java.util.TreeSet;
44  import java.util.concurrent.ConcurrentHashMap;
45  import java.util.concurrent.ConcurrentMap;
46  import java.util.concurrent.ConcurrentSkipListMap;
47  import java.util.concurrent.atomic.AtomicBoolean;
48  import java.util.concurrent.atomic.AtomicReference;
49  import java.util.concurrent.locks.ReentrantReadWriteLock;
50  
51  import javax.management.MalformedObjectNameException;
52  import javax.management.ObjectName;
53  import javax.servlet.http.HttpServlet;
54  
55  import org.apache.commons.lang.math.RandomUtils;
56  import org.apache.commons.logging.Log;
57  import org.apache.commons.logging.LogFactory;
58  import org.apache.hadoop.conf.Configuration;
59  import org.apache.hadoop.fs.FileSystem;
60  import org.apache.hadoop.fs.Path;
61  import org.apache.hadoop.hbase.ChoreService;
62  import org.apache.hadoop.hbase.ClockOutOfSyncException;
63  import org.apache.hadoop.hbase.CoordinatedStateManager;
64  import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
65  import org.apache.hadoop.hbase.HBaseConfiguration;
66  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
67  import org.apache.hadoop.hbase.HConstants;
68  import org.apache.hadoop.hbase.HRegionInfo;
69  import org.apache.hadoop.hbase.HealthCheckChore;
70  import org.apache.hadoop.hbase.MetaTableAccessor;
71  import org.apache.hadoop.hbase.NotServingRegionException;
72  import org.apache.hadoop.hbase.ScheduledChore;
73  import org.apache.hadoop.hbase.ServerName;
74  import org.apache.hadoop.hbase.Stoppable;
75  import org.apache.hadoop.hbase.TableDescriptors;
76  import org.apache.hadoop.hbase.TableName;
77  import org.apache.hadoop.hbase.YouAreDeadException;
78  import org.apache.hadoop.hbase.ZNodeClearer;
79  import org.apache.hadoop.hbase.classification.InterfaceAudience;
80  import org.apache.hadoop.hbase.client.ClusterConnection;
81  import org.apache.hadoop.hbase.client.ConnectionFactory;
82  import org.apache.hadoop.hbase.client.ConnectionUtils;
83  import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
84  import org.apache.hadoop.hbase.conf.ConfigurationManager;
85  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
86  import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
87  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
88  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
89  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
90  import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
91  import org.apache.hadoop.hbase.executor.ExecutorService;
92  import org.apache.hadoop.hbase.executor.ExecutorType;
93  import org.apache.hadoop.hbase.fs.HFileSystem;
94  import org.apache.hadoop.hbase.http.InfoServer;
95  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
96  import org.apache.hadoop.hbase.io.hfile.HFile;
97  import org.apache.hadoop.hbase.ipc.RpcClient;
98  import org.apache.hadoop.hbase.ipc.RpcClientFactory;
99  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
100 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
101 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
102 import org.apache.hadoop.hbase.ipc.ServerRpcController;
103 import org.apache.hadoop.hbase.master.HMaster;
104 import org.apache.hadoop.hbase.master.RegionState.State;
105 import org.apache.hadoop.hbase.master.TableLockManager;
106 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
107 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
108 import org.apache.hadoop.hbase.protobuf.RequestConverter;
109 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
110 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
111 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
112 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
113 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
114 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
115 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
116 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
117 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
118 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
119 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
120 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
121 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
122 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
123 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
124 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
125 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
126 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
127 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
128 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
129 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
130 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
131 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
132 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
133 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
134 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
135 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
136 import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
137 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
138 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
139 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
140 import org.apache.hadoop.hbase.security.UserProvider;
141 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
142 import org.apache.hadoop.hbase.util.Addressing;
143 import org.apache.hadoop.hbase.util.ByteStringer;
144 import org.apache.hadoop.hbase.util.Bytes;
145 import org.apache.hadoop.hbase.util.CompressionTest;
146 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
147 import org.apache.hadoop.hbase.util.FSTableDescriptors;
148 import org.apache.hadoop.hbase.util.FSUtils;
149 import org.apache.hadoop.hbase.util.HasThread;
150 import org.apache.hadoop.hbase.util.JSONBean;
151 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
152 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
153 import org.apache.hadoop.hbase.util.Sleeper;
154 import org.apache.hadoop.hbase.util.Threads;
155 import org.apache.hadoop.hbase.util.VersionInfo;
156 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
157 import org.apache.hadoop.hbase.wal.WAL;
158 import org.apache.hadoop.hbase.wal.WALFactory;
159 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
160 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
161 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
162 import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
163 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
164 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
165 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
166 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
167 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
168 import org.apache.hadoop.ipc.RemoteException;
169 import org.apache.hadoop.metrics.util.MBeanUtil;
170 import org.apache.hadoop.util.ReflectionUtils;
171 import org.apache.hadoop.util.StringUtils;
172 import org.apache.zookeeper.KeeperException;
173 import org.apache.zookeeper.KeeperException.NoNodeException;
174 import org.apache.zookeeper.data.Stat;
175 
176 import com.google.common.annotations.VisibleForTesting;
177 import com.google.common.base.Preconditions;
178 import com.google.common.collect.Maps;
179 import com.google.protobuf.BlockingRpcChannel;
180 import com.google.protobuf.Descriptors;
181 import com.google.protobuf.Message;
182 import com.google.protobuf.RpcCallback;
183 import com.google.protobuf.RpcController;
184 import com.google.protobuf.Service;
185 import com.google.protobuf.ServiceException;
186 
187 /**
188  * HRegionServer makes a set of HRegions available to clients. It checks in with
189  * the HMaster. There are many HRegionServers in a single HBase deployment.
190  */
191 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
192 @SuppressWarnings("deprecation")
193 public class HRegionServer extends HasThread implements
194     RegionServerServices, LastSequenceId {
195 
196   public static final Log LOG = LogFactory.getLog(HRegionServer.class);
197 
198   /**
199    * For testing only!  Set to true to skip notifying region assignment to master .
200    */
201   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
202   public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
203 
204   /*
205    * Strings to be used in forming the exception message for
206    * RegionsAlreadyInTransitionException.
207    */
208   protected static final String OPEN = "OPEN";
209   protected static final String CLOSE = "CLOSE";
210 
211   //RegionName vs current action in progress
212   //true - if open region action in progress
213   //false - if close region action in progress
214   protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
215     new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
216 
217   // Cache flushing
218   protected MemStoreFlusher cacheFlusher;
219 
220   protected HeapMemoryManager hMemManager;
221 
222   /**
223    * Cluster connection to be shared by services.
224    * Initialized at server startup and closed when server shuts down.
225    * Clients must never close it explicitly.
226    */
227   protected ClusterConnection clusterConnection;
228 
229   /*
230    * Long-living meta table locator, which is created when the server is started and stopped
231    * when server shuts down. References to this locator shall be used to perform according
232    * operations in EventHandlers. Primary reason for this decision is to make it mockable
233    * for tests.
234    */
235   protected MetaTableLocator metaTableLocator;
236 
237   // Watch if a region is out of recovering state from ZooKeeper
238   @SuppressWarnings("unused")
239   private RecoveringRegionWatcher recoveringRegionWatcher;
240 
241   /**
242    * Go here to get table descriptors.
243    */
244   protected TableDescriptors tableDescriptors;
245 
246   // Replication services. If no replication, this handler will be null.
247   protected ReplicationSourceService replicationSourceHandler;
248   protected ReplicationSinkService replicationSinkHandler;
249 
250   // Compactions
251   public CompactSplitThread compactSplitThread;
252 
253   /**
254    * Map of regions currently being served by this region server. Key is the
255    * encoded region name.  All access should be synchronized.
256    */
257   protected final Map<String, Region> onlineRegions = new ConcurrentHashMap<String, Region>();
258 
259   /**
260    * Map of encoded region names to the DataNode locations they should be hosted on
261    * We store the value as InetSocketAddress since this is used only in HDFS
262    * API (create() that takes favored nodes as hints for placing file blocks).
263    * We could have used ServerName here as the value class, but we'd need to
264    * convert it to InetSocketAddress at some point before the HDFS API call, and
265    * it seems a bit weird to store ServerName since ServerName refers to RegionServers
266    * and here we really mean DataNode locations.
267    */
268   protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
269       new ConcurrentHashMap<String, InetSocketAddress[]>();
270 
271   /**
272    * Set of regions currently being in recovering state which means it can accept writes(edits from
273    * previous failed region server) but not reads. A recovering region is also an online region.
274    */
275   protected final Map<String, Region> recoveringRegions = Collections
276       .synchronizedMap(new HashMap<String, Region>());
277 
278   // Leases
279   protected Leases leases;
280 
281   // Instance of the hbase executor service.
282   protected ExecutorService service;
283 
284   // If false, the file system has become unavailable
285   protected volatile boolean fsOk;
286   protected HFileSystem fs;
287 
288   // Set when a report to the master comes back with a message asking us to
289   // shutdown. Also set by call to stop when debugging or running unit tests
290   // of HRegionServer in isolation.
291   private volatile boolean stopped = false;
292 
293   // Go down hard. Used if file system becomes unavailable and also in
294   // debugging and unit tests.
295   private volatile boolean abortRequested;
296 
297   ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
298 
299   // A state before we go into stopped state.  At this stage we're closing user
300   // space regions.
301   private boolean stopping = false;
302 
303   private volatile boolean killed = false;
304 
305   protected final Configuration conf;
306 
307   private Path rootDir;
308 
309   protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
310 
311   final int numRetries;
312   protected final int threadWakeFrequency;
313   protected final int msgInterval;
314 
315   protected final int numRegionsToReport;
316 
317   // Stub to do region server status calls against the master.
318   private volatile RegionServerStatusService.BlockingInterface rssStub;
319   // RPC client. Used to make the stub above that does region server status checking.
320   RpcClient rpcClient;
321 
322   private RpcRetryingCallerFactory rpcRetryingCallerFactory;
323   private RpcControllerFactory rpcControllerFactory;
324 
325   private UncaughtExceptionHandler uncaughtExceptionHandler;
326 
327   // Info server. Default access so can be used by unit tests. REGIONSERVER
328   // is name of the webapp and the attribute name used stuffing this instance
329   // into web context.
330   protected InfoServer infoServer;
331   private JvmPauseMonitor pauseMonitor;
332 
333   /** region server process name */
334   public static final String REGIONSERVER = "regionserver";
335 
336   MetricsRegionServer metricsRegionServer;
337   private SpanReceiverHost spanReceiverHost;
338 
339   /**
340    * ChoreService used to schedule tasks that we want to run periodically
341    */
342   private final ChoreService choreService;
343 
344   /*
345    * Check for compactions requests.
346    */
347   ScheduledChore compactionChecker;
348 
349   /*
350    * Check for flushes
351    */
352   ScheduledChore periodicFlusher;
353 
354   protected volatile WALFactory walFactory;
355 
356   // WAL roller. log is protected rather than private to avoid
357   // eclipse warning when accessed by inner classes
358   final LogRoller walRoller;
359   // Lazily initialized if this RegionServer hosts a meta table.
360   final AtomicReference<LogRoller> metawalRoller = new AtomicReference<LogRoller>();
361 
362   // flag set after we're done setting up server threads
363   final AtomicBoolean online = new AtomicBoolean(false);
364 
365   // zookeeper connection and watcher
366   protected ZooKeeperWatcher zooKeeper;
367 
368   // master address tracker
369   private MasterAddressTracker masterAddressTracker;
370 
371   // Cluster Status Tracker
372   protected ClusterStatusTracker clusterStatusTracker;
373 
374   // Log Splitting Worker
375   private SplitLogWorker splitLogWorker;
376 
377   // A sleeper that sleeps for msgInterval.
378   protected final Sleeper sleeper;
379 
380   private final int operationTimeout;
381   private final int shortOperationTimeout;
382 
383   private final RegionServerAccounting regionServerAccounting;
384 
385   // Cache configuration and block cache reference
386   protected CacheConfig cacheConfig;
387 
388   /** The health check chore. */
389   private HealthCheckChore healthCheckChore;
390 
391   /** The nonce manager chore. */
392   private ScheduledChore nonceManagerChore;
393 
394   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
395 
396   /**
397    * The server name the Master sees us as.  Its made from the hostname the
398    * master passes us, port, and server startcode. Gets set after registration
399    * against  Master.
400    */
401   protected ServerName serverName;
402 
403   /*
404    * hostname specified by hostname config
405    */
406   protected String useThisHostnameInstead;
407 
408   // key to the config parameter of server hostname
409   // the specification of server hostname is optional. The hostname should be resolvable from
410   // both master and region server
411   final static String RS_HOSTNAME_KEY = "hbase.regionserver.hostname";
412 
413   final static String MASTER_HOSTNAME_KEY = "hbase.master.hostname";
414 
415   /**
416    * This servers startcode.
417    */
418   protected final long startcode;
419 
420   /**
421    * Unique identifier for the cluster we are a part of.
422    */
423   private String clusterId;
424 
425   /**
426    * MX Bean for RegionServerInfo
427    */
428   private ObjectName mxBean = null;
429 
430   /**
431    * Chore to clean periodically the moved region list
432    */
433   private MovedRegionsCleaner movedRegionsCleaner;
434 
435   // chore for refreshing store files for secondary regions
436   private StorefileRefresherChore storefileRefresher;
437 
438   private RegionServerCoprocessorHost rsHost;
439 
440   private RegionServerProcedureManagerHost rspmHost;
441 
442   private RegionServerQuotaManager rsQuotaManager;
443 
444   // Table level lock manager for locking for region operations
445   protected TableLockManager tableLockManager;
446 
447   /**
448    * Nonce manager. Nonces are used to make operations like increment and append idempotent
449    * in the case where client doesn't receive the response from a successful operation and
450    * retries. We track the successful ops for some time via a nonce sent by client and handle
451    * duplicate operations (currently, by failing them; in future we might use MVCC to return
452    * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
453    * HBASE-3787) are:
454    * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
455    *   of past records. If we don't read the records, we don't read and recover the nonces.
456    *   Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
457    * - There's no WAL recovery during normal region move, so nonces will not be transfered.
458    * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
459    * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
460    * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
461    * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
462    * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
463    * latest nonce in it expired. It can also be recovered during move.
464    */
465   final ServerNonceManager nonceManager;
466 
467   private UserProvider userProvider;
468 
469   protected final RSRpcServices rpcServices;
470 
471   protected BaseCoordinatedStateManager csm;
472 
473   /**
474    * Configuration manager is used to register/deregister and notify the configuration observers
475    * when the regionserver is notified that there was a change in the on disk configs.
476    */
477   protected final ConfigurationManager configurationManager;
478 
479   /**
480    * Starts a HRegionServer at the default location.
481    * @param conf
482    * @throws IOException
483    * @throws InterruptedException
484    */
485   public HRegionServer(Configuration conf) throws IOException, InterruptedException {
486     this(conf, CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
487   }
488 
489   /**
490    * Starts a HRegionServer at the default location
491    * @param conf
492    * @param csm implementation of CoordinatedStateManager to be used
493    * @throws IOException
494    */
495   public HRegionServer(Configuration conf, CoordinatedStateManager csm)
496       throws IOException {
497     this.fsOk = true;
498     this.conf = conf;
499     HFile.checkHFileVersion(this.conf);
500     checkCodecs(this.conf);
501     this.userProvider = UserProvider.instantiate(conf);
502     FSUtils.setupShortCircuitRead(this.conf);
503     // Disable usage of meta replicas in the regionserver
504     this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
505 
506     // Config'ed params
507     this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
508         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
509     this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
510     this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
511 
512     this.sleeper = new Sleeper(this.msgInterval, this);
513 
514     boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
515     this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
516 
517     this.numRegionsToReport = conf.getInt(
518       "hbase.regionserver.numregionstoreport", 10);
519 
520     this.operationTimeout = conf.getInt(
521       HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
522       HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
523 
524     this.shortOperationTimeout = conf.getInt(
525       HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
526       HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
527 
528     this.abortRequested = false;
529     this.stopped = false;
530 
531     rpcServices = createRpcServices();
532     this.startcode = System.currentTimeMillis();
533     if (this instanceof HMaster) {
534       useThisHostnameInstead = conf.get(MASTER_HOSTNAME_KEY);
535     } else {
536       useThisHostnameInstead = conf.get(RS_HOSTNAME_KEY);
537     }
538     String hostName = shouldUseThisHostnameInstead() ? useThisHostnameInstead :
539       rpcServices.isa.getHostName();
540     serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode);
541 
542     rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
543     rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
544 
545     // login the zookeeper client principal (if using security)
546     ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
547       "hbase.zookeeper.client.kerberos.principal", hostName);
548     // login the server principal (if using secure Hadoop)
549     login(userProvider, hostName);
550 
551     regionServerAccounting = new RegionServerAccounting();
552     uncaughtExceptionHandler = new UncaughtExceptionHandler() {
553       @Override
554       public void uncaughtException(Thread t, Throwable e) {
555         abort("Uncaught exception in service thread " + t.getName(), e);
556       }
557     };
558 
559     // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
560     // underlying hadoop hdfs accessors will be going against wrong filesystem
561     // (unless all is set to defaults).
562     FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
563     // Get fs instance used by this RS.  Do we use checksum verification in the hbase? If hbase
564     // checksum verification enabled, then automatically switch off hdfs checksum verification.
565     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
566     this.fs = new HFileSystem(this.conf, useHBaseChecksum);
567     this.rootDir = FSUtils.getRootDir(this.conf);
568     this.tableDescriptors = getFsTableDescriptors();
569 
570     service = new ExecutorService(getServerName().toShortString());
571     spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
572 
573     // Some unit tests don't need a cluster, so no zookeeper at all
574     if (!conf.getBoolean("hbase.testing.nocluster", false)) {
575       // Open connection to zookeeper and set primary watcher
576       zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
577         rpcServices.isa.getPort(), this, canCreateBaseZNode());
578 
579       this.csm = (BaseCoordinatedStateManager) csm;
580       this.csm.initialize(this);
581       this.csm.start();
582 
583       tableLockManager = TableLockManager.createTableLockManager(
584         conf, zooKeeper, serverName);
585 
586       masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
587       masterAddressTracker.start();
588 
589       clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
590       clusterStatusTracker.start();
591     }
592     this.configurationManager = new ConfigurationManager();
593 
594     rpcServices.start();
595     putUpWebUI();
596     this.walRoller = new LogRoller(this, this);
597     this.choreService = new ChoreService(getServerName().toString());
598   }
599 
600   protected TableDescriptors getFsTableDescriptors() throws IOException {
601     return new FSTableDescriptors(this.conf,
602       this.fs, this.rootDir, !canUpdateTableDescriptor(), false);
603   }
604 
605   /*
606    * Returns true if configured hostname should be used
607    */
608   protected boolean shouldUseThisHostnameInstead() {
609     return useThisHostnameInstead != null && !useThisHostnameInstead.isEmpty();
610   }
611 
612   protected void login(UserProvider user, String host) throws IOException {
613     user.login("hbase.regionserver.keytab.file",
614       "hbase.regionserver.kerberos.principal", host);
615   }
616 
617   protected void waitForMasterActive(){
618   }
619 
620   protected String getProcessName() {
621     return REGIONSERVER;
622   }
623 
624   protected boolean canCreateBaseZNode() {
625     return false;
626   }
627 
628   protected boolean canUpdateTableDescriptor() {
629     return false;
630   }
631 
632   protected RSRpcServices createRpcServices() throws IOException {
633     return new RSRpcServices(this);
634   }
635 
636   protected void configureInfoServer() {
637     infoServer.addServlet("rs-status", "/rs-status", RSStatusServlet.class);
638     infoServer.setAttribute(REGIONSERVER, this);
639   }
640 
641   protected Class<? extends HttpServlet> getDumpServlet() {
642     return RSDumpServlet.class;
643   }
644 
645   protected void doMetrics() {
646   }
647 
648   @Override
649   public boolean registerService(Service instance) {
650     /*
651      * No stacking of instances is allowed for a single service name
652      */
653     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
654     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
655       LOG.error("Coprocessor service " + serviceDesc.getFullName()
656           + " already registered, rejecting request from " + instance);
657       return false;
658     }
659 
660     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
661     if (LOG.isDebugEnabled()) {
662       LOG.debug("Registered regionserver coprocessor service: service=" + serviceDesc.getFullName());
663     }
664     return true;
665   }
666 
667   /**
668    * Create a 'smarter' HConnection, one that is capable of by-passing RPC if the request is to
669    * the local server.  Safe to use going to local or remote server.
670    * Create this instance in a method can be intercepted and mocked in tests.
671    * @throws IOException
672    */
673   @VisibleForTesting
674   protected ClusterConnection createClusterConnection() throws IOException {
675     // Create a cluster connection that when appropriate, can short-circuit and go directly to the
676     // local server if the request is to the local server bypassing RPC. Can be used for both local
677     // and remote invocations.
678     return ConnectionUtils.createShortCircuitHConnection(
679       ConnectionFactory.createConnection(conf), serverName, rpcServices, rpcServices);
680   }
681 
682   /**
683    * Run test on configured codecs to make sure supporting libs are in place.
684    * @param c
685    * @throws IOException
686    */
687   private static void checkCodecs(final Configuration c) throws IOException {
688     // check to see if the codec list is available:
689     String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
690     if (codecs == null) return;
691     for (String codec : codecs) {
692       if (!CompressionTest.testCompression(codec)) {
693         throw new IOException("Compression codec " + codec +
694           " not supported, aborting RS construction");
695       }
696     }
697   }
698 
699   public String getClusterId() {
700     return this.clusterId;
701   }
702 
703   /**
704    * Setup our cluster connection if not already initialized.
705    * @throws IOException
706    */
707   protected synchronized void setupClusterConnection() throws IOException {
708     if (clusterConnection == null) {
709       clusterConnection = createClusterConnection();
710       metaTableLocator = new MetaTableLocator();
711     }
712   }
713 
714   /**
715    * All initialization needed before we go register with Master.
716    *
717    * @throws IOException
718    * @throws InterruptedException
719    */
720   private void preRegistrationInitialization(){
721     try {
722       setupClusterConnection();
723 
724       // Health checker thread.
725       if (isHealthCheckerConfigured()) {
726         int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
727           HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
728         healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
729       }
730       this.pauseMonitor = new JvmPauseMonitor(conf);
731       pauseMonitor.start();
732 
733       initializeZooKeeper();
734       if (!isStopped() && !isAborted()) {
735         initializeThreads();
736       }
737     } catch (Throwable t) {
738       // Call stop if error or process will stick around for ever since server
739       // puts up non-daemon threads.
740       this.rpcServices.stop();
741       abort("Initialization of RS failed.  Hence aborting RS.", t);
742     }
743   }
744 
745   /**
746    * Bring up connection to zk ensemble and then wait until a master for this
747    * cluster and then after that, wait until cluster 'up' flag has been set.
748    * This is the order in which master does things.
749    * Finally open long-living server short-circuit connection.
750    * @throws IOException
751    * @throws InterruptedException
752    */
753   private void initializeZooKeeper() throws IOException, InterruptedException {
754     // Create the master address tracker, register with zk, and start it.  Then
755     // block until a master is available.  No point in starting up if no master
756     // running.
757     blockAndCheckIfStopped(this.masterAddressTracker);
758 
759     // Wait on cluster being up.  Master will set this flag up in zookeeper
760     // when ready.
761     blockAndCheckIfStopped(this.clusterStatusTracker);
762 
763     // Retrieve clusterId
764     // Since cluster status is now up
765     // ID should have already been set by HMaster
766     try {
767       clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
768       if (clusterId == null) {
769         this.abort("Cluster ID has not been set");
770       }
771       LOG.info("ClusterId : "+clusterId);
772     } catch (KeeperException e) {
773       this.abort("Failed to retrieve Cluster ID",e);
774     }
775 
776     // In case colocated master, wait here till it's active.
777     // So backup masters won't start as regionservers.
778     // This is to avoid showing backup masters as regionservers
779     // in master web UI, or assigning any region to them.
780     waitForMasterActive();
781     if (isStopped() || isAborted()) {
782       return; // No need for further initialization
783     }
784 
785     // watch for snapshots and other procedures
786     try {
787       rspmHost = new RegionServerProcedureManagerHost();
788       rspmHost.loadProcedures(conf);
789       rspmHost.initialize(this);
790     } catch (KeeperException e) {
791       this.abort("Failed to reach zk cluster when creating procedure handler.", e);
792     }
793     // register watcher for recovering regions
794     this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
795   }
796 
797   /**
798    * Utilty method to wait indefinitely on a znode availability while checking
799    * if the region server is shut down
800    * @param tracker znode tracker to use
801    * @throws IOException any IO exception, plus if the RS is stopped
802    * @throws InterruptedException
803    */
804   private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker)
805       throws IOException, InterruptedException {
806     while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
807       if (this.stopped) {
808         throw new IOException("Received the shutdown message while waiting.");
809       }
810     }
811   }
812 
813   /**
814    * @return False if cluster shutdown in progress
815    */
816   private boolean isClusterUp() {
817     return clusterStatusTracker != null && clusterStatusTracker.isClusterUp();
818   }
819 
820   private void initializeThreads() throws IOException {
821     // Cache flushing thread.
822     this.cacheFlusher = new MemStoreFlusher(conf, this);
823 
824     // Compaction thread
825     this.compactSplitThread = new CompactSplitThread(this);
826 
827     // Background thread to check for compactions; needed if region has not gotten updates
828     // in a while. It will take care of not checking too frequently on store-by-store basis.
829     this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
830     this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
831     this.leases = new Leases(this.threadWakeFrequency);
832 
833     // Create the thread to clean the moved regions list
834     movedRegionsCleaner = MovedRegionsCleaner.create(this);
835 
836     if (this.nonceManager != null) {
837       // Create the scheduled chore that cleans up nonces.
838       nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
839     }
840 
841     // Setup the Quota Manager
842     rsQuotaManager = new RegionServerQuotaManager(this);
843 
844     // Setup RPC client for master communication
845     rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
846         rpcServices.isa.getAddress(), 0));
847 
848     boolean onlyMetaRefresh = false;
849     int storefileRefreshPeriod = conf.getInt(
850         StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
851       , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
852     if (storefileRefreshPeriod == 0) {
853       storefileRefreshPeriod = conf.getInt(
854           StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
855           StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
856       onlyMetaRefresh = true;
857     }
858     if (storefileRefreshPeriod > 0) {
859       this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
860           onlyMetaRefresh, this, this);
861     }
862     registerConfigurationObservers();
863   }
864 
865   private void registerConfigurationObservers() {
866     // Registering the compactSplitThread object with the ConfigurationManager.
867     configurationManager.registerObserver(this.compactSplitThread);
868   }
869 
870   /**
871    * The HRegionServer sticks in this loop until closed.
872    */
873   @Override
874   public void run() {
875     try {
876       // Do pre-registration initializations; zookeeper, lease threads, etc.
877       preRegistrationInitialization();
878     } catch (Throwable e) {
879       abort("Fatal exception during initialization", e);
880     }
881 
882     try {
883       if (!isStopped() && !isAborted()) {
884         ShutdownHook.install(conf, fs, this, Thread.currentThread());
885         // Set our ephemeral znode up in zookeeper now we have a name.
886         createMyEphemeralNode();
887         // Initialize the RegionServerCoprocessorHost now that our ephemeral
888         // node was created, in case any coprocessors want to use ZooKeeper
889         this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
890       }
891 
892       // Try and register with the Master; tell it we are here.  Break if
893       // server is stopped or the clusterup flag is down or hdfs went wacky.
894       while (keepLooping()) {
895         RegionServerStartupResponse w = reportForDuty();
896         if (w == null) {
897           LOG.warn("reportForDuty failed; sleeping and then retrying.");
898           this.sleeper.sleep();
899         } else {
900           handleReportForDutyResponse(w);
901           break;
902         }
903       }
904 
905       if (!isStopped() && isHealthy()){
906         // start the snapshot handler and other procedure handlers,
907         // since the server is ready to run
908         rspmHost.start();
909 
910         // Start the Quota Manager
911         rsQuotaManager.start(getRpcServer().getScheduler());
912       }
913 
914       // We registered with the Master.  Go into run mode.
915       long lastMsg = System.currentTimeMillis();
916       long oldRequestCount = -1;
917       // The main run loop.
918       while (!isStopped() && isHealthy()) {
919         if (!isClusterUp()) {
920           if (isOnlineRegionsEmpty()) {
921             stop("Exiting; cluster shutdown set and not carrying any regions");
922           } else if (!this.stopping) {
923             this.stopping = true;
924             LOG.info("Closing user regions");
925             closeUserRegions(this.abortRequested);
926           } else if (this.stopping) {
927             boolean allUserRegionsOffline = areAllUserRegionsOffline();
928             if (allUserRegionsOffline) {
929               // Set stopped if no more write requests tp meta tables
930               // since last time we went around the loop.  Any open
931               // meta regions will be closed on our way out.
932               if (oldRequestCount == getWriteRequestCount()) {
933                 stop("Stopped; only catalog regions remaining online");
934                 break;
935               }
936               oldRequestCount = getWriteRequestCount();
937             } else {
938               // Make sure all regions have been closed -- some regions may
939               // have not got it because we were splitting at the time of
940               // the call to closeUserRegions.
941               closeUserRegions(this.abortRequested);
942             }
943             LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
944           }
945         }
946         long now = System.currentTimeMillis();
947         if ((now - lastMsg) >= msgInterval) {
948           tryRegionServerReport(lastMsg, now);
949           lastMsg = System.currentTimeMillis();
950           doMetrics();
951         }
952         if (!isStopped() && !isAborted()) {
953           this.sleeper.sleep();
954         }
955       } // for
956     } catch (Throwable t) {
957       if (!rpcServices.checkOOME(t)) {
958         String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
959         abort(prefix + t.getMessage(), t);
960       }
961     }
962     // Run shutdown.
963     if (mxBean != null) {
964       MBeanUtil.unregisterMBean(mxBean);
965       mxBean = null;
966     }
967     if (this.leases != null) this.leases.closeAfterLeasesExpire();
968     if (this.splitLogWorker != null) {
969       splitLogWorker.stop();
970     }
971     if (this.infoServer != null) {
972       LOG.info("Stopping infoServer");
973       try {
974         this.infoServer.stop();
975       } catch (Exception e) {
976         LOG.error("Failed to stop infoServer", e);
977       }
978     }
979     // Send cache a shutdown.
980     if (cacheConfig != null && cacheConfig.isBlockCacheEnabled()) {
981       cacheConfig.getBlockCache().shutdown();
982     }
983 
984     if (movedRegionsCleaner != null) {
985       movedRegionsCleaner.stop("Region Server stopping");
986     }
987 
988     // Send interrupts to wake up threads if sleeping so they notice shutdown.
989     // TODO: Should we check they are alive? If OOME could have exited already
990     if (this.hMemManager != null) this.hMemManager.stop();
991     if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
992     if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
993     if (this.compactionChecker != null) this.compactionChecker.cancel(true);
994     if (this.healthCheckChore != null) this.healthCheckChore.cancel(true);
995     if (this.nonceManagerChore != null) this.nonceManagerChore.cancel(true);
996     if (this.storefileRefresher != null) this.storefileRefresher.cancel(true);
997     sendShutdownInterrupt();
998 
999     // Stop the quota manager
1000     if (rsQuotaManager != null) {
1001       rsQuotaManager.stop();
1002     }
1003 
1004     // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
1005     if (rspmHost != null) {
1006       rspmHost.stop(this.abortRequested || this.killed);
1007     }
1008 
1009     if (this.killed) {
1010       // Just skip out w/o closing regions.  Used when testing.
1011     } else if (abortRequested) {
1012       if (this.fsOk) {
1013         closeUserRegions(abortRequested); // Don't leave any open file handles
1014       }
1015       LOG.info("aborting server " + this.serverName);
1016     } else {
1017       closeUserRegions(abortRequested);
1018       LOG.info("stopping server " + this.serverName);
1019     }
1020 
1021     // so callers waiting for meta without timeout can stop
1022     if (this.metaTableLocator != null) this.metaTableLocator.stop();
1023     if (this.clusterConnection != null && !clusterConnection.isClosed()) {
1024       try {
1025         this.clusterConnection.close();
1026       } catch (IOException e) {
1027         // Although the {@link Closeable} interface throws an {@link
1028         // IOException}, in reality, the implementation would never do that.
1029         LOG.warn("Attempt to close server's short circuit HConnection failed.", e);
1030       }
1031     }
1032 
1033     // Closing the compactSplit thread before closing meta regions
1034     if (!this.killed && containsMetaTableRegions()) {
1035       if (!abortRequested || this.fsOk) {
1036         if (this.compactSplitThread != null) {
1037           this.compactSplitThread.join();
1038           this.compactSplitThread = null;
1039         }
1040         closeMetaTableRegions(abortRequested);
1041       }
1042     }
1043 
1044     if (!this.killed && this.fsOk) {
1045       waitOnAllRegionsToClose(abortRequested);
1046       LOG.info("stopping server " + this.serverName +
1047         "; all regions closed.");
1048     }
1049 
1050     //fsOk flag may be changed when closing regions throws exception.
1051     if (this.fsOk) {
1052       shutdownWAL(!abortRequested);
1053     }
1054 
1055     // Make sure the proxy is down.
1056     if (this.rssStub != null) {
1057       this.rssStub = null;
1058     }
1059     if (this.rpcClient != null) {
1060       this.rpcClient.close();
1061     }
1062     if (this.leases != null) {
1063       this.leases.close();
1064     }
1065     if (this.pauseMonitor != null) {
1066       this.pauseMonitor.stop();
1067     }
1068 
1069     if (!killed) {
1070       stopServiceThreads();
1071     }
1072 
1073     if (this.rpcServices != null) {
1074       this.rpcServices.stop();
1075     }
1076 
1077     try {
1078       deleteMyEphemeralNode();
1079     } catch (KeeperException.NoNodeException nn) {
1080     } catch (KeeperException e) {
1081       LOG.warn("Failed deleting my ephemeral node", e);
1082     }
1083     // We may have failed to delete the znode at the previous step, but
1084     //  we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1085     ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1086 
1087     if (this.zooKeeper != null) {
1088       this.zooKeeper.close();
1089     }
1090     LOG.info("stopping server " + this.serverName +
1091       "; zookeeper connection closed.");
1092 
1093     LOG.info(Thread.currentThread().getName() + " exiting");
1094   }
1095 
1096   private boolean containsMetaTableRegions() {
1097     return onlineRegions.containsKey(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1098   }
1099 
1100   private boolean areAllUserRegionsOffline() {
1101     if (getNumberOfOnlineRegions() > 2) return false;
1102     boolean allUserRegionsOffline = true;
1103     for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
1104       if (!e.getValue().getRegionInfo().isMetaTable()) {
1105         allUserRegionsOffline = false;
1106         break;
1107       }
1108     }
1109     return allUserRegionsOffline;
1110   }
1111 
1112   /**
1113    * @return Current write count for all online regions.
1114    */
1115   private long getWriteRequestCount() {
1116     int writeCount = 0;
1117     for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
1118       writeCount += e.getValue().getWriteRequestsCount();
1119     }
1120     return writeCount;
1121   }
1122 
1123   @VisibleForTesting
1124   protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1125   throws IOException {
1126     RegionServerStatusService.BlockingInterface rss = rssStub;
1127     if (rss == null) {
1128       // the current server could be stopping.
1129       return;
1130     }
1131     ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1132     try {
1133       RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1134       ServerName sn = ServerName.parseVersionedServerName(
1135         this.serverName.getVersionedBytes());
1136       request.setServer(ProtobufUtil.toServerName(sn));
1137       request.setLoad(sl);
1138       rss.regionServerReport(null, request.build());
1139     } catch (ServiceException se) {
1140       IOException ioe = ProtobufUtil.getRemoteException(se);
1141       if (ioe instanceof YouAreDeadException) {
1142         // This will be caught and handled as a fatal error in run()
1143         throw ioe;
1144       }
1145       if (rssStub == rss) {
1146         rssStub = null;
1147       }
1148       // Couldn't connect to the master, get location from zk and reconnect
1149       // Method blocks until new master is found or we are stopped
1150       createRegionServerStatusStub();
1151     }
1152   }
1153 
1154   ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1155       throws IOException {
1156     // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1157     // per second, and other metrics  As long as metrics are part of ServerLoad it's best to use
1158     // the wrapper to compute those numbers in one place.
1159     // In the long term most of these should be moved off of ServerLoad and the heart beat.
1160     // Instead they should be stored in an HBase table so that external visibility into HBase is
1161     // improved; Additionally the load balancer will be able to take advantage of a more complete
1162     // history.
1163     MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1164     Collection<Region> regions = getOnlineRegionsLocalContext();
1165     MemoryUsage memory = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
1166 
1167     ClusterStatusProtos.ServerLoad.Builder serverLoad =
1168       ClusterStatusProtos.ServerLoad.newBuilder();
1169     serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1170     serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
1171     serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024));
1172     serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024));
1173     Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1174     for (String coprocessor : coprocessors) {
1175       serverLoad.addCoprocessors(
1176         Coprocessor.newBuilder().setName(coprocessor).build());
1177     }
1178     RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1179     RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1180     for (Region region : regions) {
1181       serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1182       for (String coprocessor :
1183           getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()) {
1184         serverLoad.addCoprocessors(Coprocessor.newBuilder().setName(coprocessor).build());
1185       }
1186     }
1187     serverLoad.setReportStartTime(reportStartTime);
1188     serverLoad.setReportEndTime(reportEndTime);
1189     if (this.infoServer != null) {
1190       serverLoad.setInfoServerPort(this.infoServer.getPort());
1191     } else {
1192       serverLoad.setInfoServerPort(-1);
1193     }
1194 
1195     // for the replicationLoad purpose. Only need to get from one service
1196     // either source or sink will get the same info
1197     ReplicationSourceService rsources = getReplicationSourceService();
1198 
1199     if (rsources != null) {
1200       // always refresh first to get the latest value
1201       ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
1202       if (rLoad != null) {
1203         serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1204         for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) {
1205           serverLoad.addReplLoadSource(rLS);
1206         }
1207       }
1208     }
1209 
1210     return serverLoad.build();
1211   }
1212 
1213   String getOnlineRegionsAsPrintableString() {
1214     StringBuilder sb = new StringBuilder();
1215     for (Region r: this.onlineRegions.values()) {
1216       if (sb.length() > 0) sb.append(", ");
1217       sb.append(r.getRegionInfo().getEncodedName());
1218     }
1219     return sb.toString();
1220   }
1221 
1222   /**
1223    * Wait on regions close.
1224    */
1225   private void waitOnAllRegionsToClose(final boolean abort) {
1226     // Wait till all regions are closed before going out.
1227     int lastCount = -1;
1228     long previousLogTime = 0;
1229     Set<String> closedRegions = new HashSet<String>();
1230     boolean interrupted = false;
1231     try {
1232       while (!isOnlineRegionsEmpty()) {
1233         int count = getNumberOfOnlineRegions();
1234         // Only print a message if the count of regions has changed.
1235         if (count != lastCount) {
1236           // Log every second at most
1237           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1238             previousLogTime = System.currentTimeMillis();
1239             lastCount = count;
1240             LOG.info("Waiting on " + count + " regions to close");
1241             // Only print out regions still closing if a small number else will
1242             // swamp the log.
1243             if (count < 10 && LOG.isDebugEnabled()) {
1244               LOG.debug(this.onlineRegions);
1245             }
1246           }
1247         }
1248         // Ensure all user regions have been sent a close. Use this to
1249         // protect against the case where an open comes in after we start the
1250         // iterator of onlineRegions to close all user regions.
1251         for (Map.Entry<String, Region> e : this.onlineRegions.entrySet()) {
1252           HRegionInfo hri = e.getValue().getRegionInfo();
1253           if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1254               && !closedRegions.contains(hri.getEncodedName())) {
1255             closedRegions.add(hri.getEncodedName());
1256             // Don't update zk with this close transition; pass false.
1257             closeRegionIgnoreErrors(hri, abort);
1258               }
1259         }
1260         // No regions in RIT, we could stop waiting now.
1261         if (this.regionsInTransitionInRS.isEmpty()) {
1262           if (!isOnlineRegionsEmpty()) {
1263             LOG.info("We were exiting though online regions are not empty," +
1264                 " because some regions failed closing");
1265           }
1266           break;
1267         }
1268         if (sleep(200)) {
1269           interrupted = true;
1270         }
1271       }
1272     } finally {
1273       if (interrupted) {
1274         Thread.currentThread().interrupt();
1275       }
1276     }
1277   }
1278 
1279   private boolean sleep(long millis) {
1280     boolean interrupted = false;
1281     try {
1282       Thread.sleep(millis);
1283     } catch (InterruptedException e) {
1284       LOG.warn("Interrupted while sleeping");
1285       interrupted = true;
1286     }
1287     return interrupted;
1288   }
1289 
1290   private void shutdownWAL(final boolean close) {
1291     if (this.walFactory != null) {
1292       try {
1293         if (close) {
1294           walFactory.close();
1295         } else {
1296           walFactory.shutdown();
1297         }
1298       } catch (Throwable e) {
1299         e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1300         LOG.error("Shutdown / close of WAL failed: " + e);
1301         LOG.debug("Shutdown / close exception details:", e);
1302       }
1303     }
1304   }
1305 
1306   /*
1307    * Run init. Sets up wal and starts up all server threads.
1308    *
1309    * @param c Extra configuration.
1310    */
1311   protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1312   throws IOException {
1313     try {
1314       for (NameStringPair e : c.getMapEntriesList()) {
1315         String key = e.getName();
1316         // The hostname the master sees us as.
1317         if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1318           String hostnameFromMasterPOV = e.getValue();
1319           this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1320             rpcServices.isa.getPort(), this.startcode);
1321           if (shouldUseThisHostnameInstead() &&
1322               !hostnameFromMasterPOV.equals(useThisHostnameInstead)) {
1323             String msg = "Master passed us a different hostname to use; was=" +
1324                 this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV;
1325             LOG.error(msg);
1326             throw new IOException(msg);
1327           }
1328           if (!shouldUseThisHostnameInstead() &&
1329               !hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) {
1330             String msg = "Master passed us a different hostname to use; was=" +
1331                 rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV;
1332             LOG.error(msg);
1333           }
1334           continue;
1335         }
1336         String value = e.getValue();
1337         if (LOG.isDebugEnabled()) {
1338           LOG.info("Config from master: " + key + "=" + value);
1339         }
1340         this.conf.set(key, value);
1341       }
1342 
1343       // hack! Maps DFSClient => RegionServer for logs.  HDFS made this
1344       // config param for task trackers, but we can piggyback off of it.
1345       if (this.conf.get("mapreduce.task.attempt.id") == null) {
1346         this.conf.set("mapreduce.task.attempt.id", "hb_rs_" +
1347           this.serverName.toString());
1348       }
1349 
1350       // Save it in a file, this will allow to see if we crash
1351       ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1352 
1353       this.cacheConfig = new CacheConfig(conf);
1354       this.walFactory = setupWALAndReplication();
1355       // Init in here rather than in constructor after thread name has been set
1356       this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
1357 
1358       startServiceThreads();
1359       startHeapMemoryManager();
1360       LOG.info("Serving as " + this.serverName +
1361         ", RpcServer on " + rpcServices.isa +
1362         ", sessionid=0x" +
1363         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1364 
1365       // Wake up anyone waiting for this server to online
1366       synchronized (online) {
1367         online.set(true);
1368         online.notifyAll();
1369       }
1370     } catch (Throwable e) {
1371       stop("Failed initialization");
1372       throw convertThrowableToIOE(cleanup(e, "Failed init"),
1373           "Region server startup failed");
1374     } finally {
1375       sleeper.skipSleepCycle();
1376     }
1377   }
1378 
1379   private void startHeapMemoryManager() {
1380     this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this);
1381     if (this.hMemManager != null) {
1382       this.hMemManager.start(getChoreService());
1383     }
1384   }
1385 
1386   private void createMyEphemeralNode() throws KeeperException, IOException {
1387     RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1388     rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1389     byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1390     ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper,
1391       getMyEphemeralNodePath(), data);
1392   }
1393 
1394   private void deleteMyEphemeralNode() throws KeeperException {
1395     ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1396   }
1397 
1398   @Override
1399   public RegionServerAccounting getRegionServerAccounting() {
1400     return regionServerAccounting;
1401   }
1402 
1403   @Override
1404   public TableLockManager getTableLockManager() {
1405     return tableLockManager;
1406   }
1407 
1408   /*
1409    * @param r Region to get RegionLoad for.
1410    * @param regionLoadBldr the RegionLoad.Builder, can be null
1411    * @param regionSpecifier the RegionSpecifier.Builder, can be null
1412    * @return RegionLoad instance.
1413    *
1414    * @throws IOException
1415    */
1416   private RegionLoad createRegionLoad(final Region r, RegionLoad.Builder regionLoadBldr,
1417       RegionSpecifier.Builder regionSpecifier) throws IOException {
1418     byte[] name = r.getRegionInfo().getRegionName();
1419     int stores = 0;
1420     int storefiles = 0;
1421     int storeUncompressedSizeMB = 0;
1422     int storefileSizeMB = 0;
1423     int memstoreSizeMB = (int) (r.getMemstoreSize() / 1024 / 1024);
1424     int storefileIndexSizeMB = 0;
1425     int rootIndexSizeKB = 0;
1426     int totalStaticIndexSizeKB = 0;
1427     int totalStaticBloomSizeKB = 0;
1428     long totalCompactingKVs = 0;
1429     long currentCompactedKVs = 0;
1430     List<Store> storeList = r.getStores();
1431     stores += storeList.size();
1432     for (Store store : storeList) {
1433       storefiles += store.getStorefilesCount();
1434       storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024);
1435       storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1436       storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
1437       CompactionProgress progress = store.getCompactionProgress();
1438       if (progress != null) {
1439         totalCompactingKVs += progress.totalCompactingKVs;
1440         currentCompactedKVs += progress.currentCompactedKVs;
1441       }
1442       rootIndexSizeKB += (int) (store.getStorefilesIndexSize() / 1024);
1443       totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024);
1444       totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024);
1445     }
1446 
1447     float dataLocality =
1448         r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname());
1449     if (regionLoadBldr == null) {
1450       regionLoadBldr = RegionLoad.newBuilder();
1451     }
1452     if (regionSpecifier == null) {
1453       regionSpecifier = RegionSpecifier.newBuilder();
1454     }
1455     regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1456     regionSpecifier.setValue(ByteStringer.wrap(name));
1457     regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1458       .setStores(stores)
1459       .setStorefiles(storefiles)
1460       .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1461       .setStorefileSizeMB(storefileSizeMB)
1462       .setMemstoreSizeMB(memstoreSizeMB)
1463       .setStorefileIndexSizeMB(storefileIndexSizeMB)
1464       .setRootIndexSizeKB(rootIndexSizeKB)
1465       .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1466       .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1467       .setReadRequestsCount(r.getReadRequestsCount())
1468       .setWriteRequestsCount(r.getWriteRequestsCount())
1469       .setTotalCompactingKVs(totalCompactingKVs)
1470       .setCurrentCompactedKVs(currentCompactedKVs)
1471       .setDataLocality(dataLocality)
1472       .setLastMajorCompactionTs(r.getOldestHfileTs(true));
1473     ((HRegion)r).setCompleteSequenceId(regionLoadBldr);
1474 
1475     return regionLoadBldr.build();
1476   }
1477 
1478   /**
1479    * @param encodedRegionName
1480    * @return An instance of RegionLoad.
1481    */
1482   public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1483     Region r = onlineRegions.get(encodedRegionName);
1484     return r != null ? createRegionLoad(r, null, null) : null;
1485   }
1486 
1487   /*
1488    * Inner class that runs on a long period checking if regions need compaction.
1489    */
1490   private static class CompactionChecker extends ScheduledChore {
1491     private final HRegionServer instance;
1492     private final int majorCompactPriority;
1493     private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1494     private long iteration = 0;
1495 
1496     CompactionChecker(final HRegionServer h, final int sleepTime,
1497         final Stoppable stopper) {
1498       super("CompactionChecker", stopper, sleepTime);
1499       this.instance = h;
1500       LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));
1501 
1502       /* MajorCompactPriority is configurable.
1503        * If not set, the compaction will use default priority.
1504        */
1505       this.majorCompactPriority = this.instance.conf.
1506         getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1507         DEFAULT_PRIORITY);
1508     }
1509 
1510     @Override
1511     protected void chore() {
1512       for (Region r : this.instance.onlineRegions.values()) {
1513         if (r == null)
1514           continue;
1515         for (Store s : r.getStores()) {
1516           try {
1517             long multiplier = s.getCompactionCheckMultiplier();
1518             assert multiplier > 0;
1519             if (iteration % multiplier != 0) continue;
1520             if (s.needsCompaction()) {
1521               // Queue a compaction. Will recognize if major is needed.
1522               this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
1523                   + " requests compaction");
1524             } else if (s.isMajorCompaction()) {
1525               if (majorCompactPriority == DEFAULT_PRIORITY
1526                   || majorCompactPriority > ((HRegion)r).getCompactPriority()) {
1527                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1528                     + " requests major compaction; use default priority", null);
1529               } else {
1530                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1531                     + " requests major compaction; use configured priority",
1532                   this.majorCompactPriority, null);
1533               }
1534             }
1535           } catch (IOException e) {
1536             LOG.warn("Failed major compaction check on " + r, e);
1537           }
1538         }
1539       }
1540       iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1541     }
1542   }
1543 
1544   static class PeriodicMemstoreFlusher extends ScheduledChore {
1545     final HRegionServer server;
1546     final static int RANGE_OF_DELAY = 20000; //millisec
1547     final static int MIN_DELAY_TIME = 3000; //millisec
1548     public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1549       super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval);
1550       this.server = server;
1551     }
1552 
1553     @Override
1554     protected void chore() {
1555       for (Region r : this.server.onlineRegions.values()) {
1556         if (r == null)
1557           continue;
1558         if (((HRegion)r).shouldFlush()) {
1559           FlushRequester requester = server.getFlushRequester();
1560           if (requester != null) {
1561             long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
1562             LOG.info(getName() + " requesting flush for region " +
1563               r.getRegionInfo().getRegionNameAsString() + " after a delay of " + randomDelay);
1564             //Throttle the flushes by putting a delay. If we don't throttle, and there
1565             //is a balanced write-load on the regions in a table, we might end up
1566             //overwhelming the filesystem with too many flushes at once.
1567             requester.requestDelayedFlush(r, randomDelay, false);
1568           }
1569         }
1570       }
1571     }
1572   }
1573 
1574   /**
1575    * Report the status of the server. A server is online once all the startup is
1576    * completed (setting up filesystem, starting service threads, etc.). This
1577    * method is designed mostly to be useful in tests.
1578    *
1579    * @return true if online, false if not.
1580    */
1581   public boolean isOnline() {
1582     return online.get();
1583   }
1584 
1585   /**
1586    * Setup WAL log and replication if enabled.
1587    * Replication setup is done in here because it wants to be hooked up to WAL.
1588    * @return A WAL instance.
1589    * @throws IOException
1590    */
1591   private WALFactory setupWALAndReplication() throws IOException {
1592     // TODO Replication make assumptions here based on the default filesystem impl
1593     final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1594     final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString());
1595 
1596     Path logdir = new Path(rootDir, logName);
1597     if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1598     if (this.fs.exists(logdir)) {
1599       throw new RegionServerRunningException("Region server has already " +
1600         "created directory at " + this.serverName.toString());
1601     }
1602 
1603     // Instantiate replication manager if replication enabled.  Pass it the
1604     // log directories.
1605     createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
1606 
1607     // listeners the wal factory will add to wals it creates.
1608     final List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1609     listeners.add(new MetricsWAL());
1610     if (this.replicationSourceHandler != null &&
1611         this.replicationSourceHandler.getWALActionsListener() != null) {
1612       // Replication handler is an implementation of WALActionsListener.
1613       listeners.add(this.replicationSourceHandler.getWALActionsListener());
1614     }
1615 
1616     return new WALFactory(conf, listeners, serverName.toString());
1617   }
1618 
1619   /**
1620    * We initialize the roller for the wal that handles meta lazily
1621    * since we don't know if this regionserver will handle it. All calls to
1622    * this method return a reference to the that same roller. As newly referenced
1623    * meta regions are brought online, they will be offered to the roller for maintenance.
1624    * As a part of that registration process, the roller will add itself as a
1625    * listener on the wal.
1626    */
1627   protected LogRoller ensureMetaWALRoller() {
1628     // Using a tmp log roller to ensure metaLogRoller is alive once it is not
1629     // null
1630     LogRoller roller = metawalRoller.get();
1631     if (null == roller) {
1632       LogRoller tmpLogRoller = new LogRoller(this, this);
1633       String n = Thread.currentThread().getName();
1634       Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
1635           n + "-MetaLogRoller", uncaughtExceptionHandler);
1636       if (metawalRoller.compareAndSet(null, tmpLogRoller)) {
1637         roller = tmpLogRoller;
1638       } else {
1639         // Another thread won starting the roller
1640         Threads.shutdown(tmpLogRoller.getThread());
1641         roller = metawalRoller.get();
1642       }
1643     }
1644     return roller;
1645   }
1646 
1647   public MetricsRegionServer getRegionServerMetrics() {
1648     return this.metricsRegionServer;
1649   }
1650 
1651   /**
1652    * @return Master address tracker instance.
1653    */
1654   public MasterAddressTracker getMasterAddressTracker() {
1655     return this.masterAddressTracker;
1656   }
1657 
1658   /*
1659    * Start maintenance Threads, Server, Worker and lease checker threads.
1660    * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1661    * get an unhandled exception. We cannot set the handler on all threads.
1662    * Server's internal Listener thread is off limits. For Server, if an OOME, it
1663    * waits a while then retries. Meantime, a flush or a compaction that tries to
1664    * run should trigger same critical condition and the shutdown will run. On
1665    * its way out, this server will shut down Server. Leases are sort of
1666    * inbetween. It has an internal thread that while it inherits from Chore, it
1667    * keeps its own internal stop mechanism so needs to be stopped by this
1668    * hosting server. Worker logs the exception and exits.
1669    */
1670   private void startServiceThreads() throws IOException {
1671     // Start executor services
1672     this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
1673       conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1674     this.service.startExecutorService(ExecutorType.RS_OPEN_META,
1675       conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1676     this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1677       conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1678     this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
1679       conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1680     if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1681       this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
1682         conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
1683     }
1684     this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
1685        "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
1686 
1687     if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1688       this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
1689         conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1690           conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
1691     }
1692 
1693     Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
1694         uncaughtExceptionHandler);
1695     this.cacheFlusher.start(uncaughtExceptionHandler);
1696 
1697     if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);
1698     if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher);
1699     if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore);
1700     if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore);
1701     if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher);
1702     if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner);
1703 
1704     // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1705     // an unhandled exception, it will just exit.
1706     this.leases.setName(getName() + ".leaseChecker");
1707     this.leases.start();
1708 
1709     if (this.replicationSourceHandler == this.replicationSinkHandler &&
1710         this.replicationSourceHandler != null) {
1711       this.replicationSourceHandler.startReplicationService();
1712     } else {
1713       if (this.replicationSourceHandler != null) {
1714         this.replicationSourceHandler.startReplicationService();
1715       }
1716       if (this.replicationSinkHandler != null) {
1717         this.replicationSinkHandler.startReplicationService();
1718       }
1719     }
1720 
1721     // Create the log splitting worker and start it
1722     // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
1723     // quite a while inside HConnection layer. The worker won't be available for other
1724     // tasks even after current task is preempted after a split task times out.
1725     Configuration sinkConf = HBaseConfiguration.create(conf);
1726     sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1727       conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
1728     sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1729       conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
1730     sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
1731     this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory);
1732     splitLogWorker.start();
1733   }
1734 
1735   /**
1736    * Puts up the webui.
1737    * @return Returns final port -- maybe different from what we started with.
1738    * @throws IOException
1739    */
1740   private int putUpWebUI() throws IOException {
1741     int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
1742       HConstants.DEFAULT_REGIONSERVER_INFOPORT);
1743     String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
1744 
1745     if(this instanceof HMaster) {
1746       port = conf.getInt(HConstants.MASTER_INFO_PORT,
1747           HConstants.DEFAULT_MASTER_INFOPORT);
1748       addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
1749     }
1750     // -1 is for disabling info server
1751     if (port < 0) return port;
1752 
1753     if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
1754       String msg =
1755           "Failed to start http info server. Address " + addr
1756               + " does not belong to this host. Correct configuration parameter: "
1757               + "hbase.regionserver.info.bindAddress";
1758       LOG.error(msg);
1759       throw new IOException(msg);
1760     }
1761     // check if auto port bind enabled
1762     boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
1763         false);
1764     while (true) {
1765       try {
1766         this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
1767         infoServer.addServlet("dump", "/dump", getDumpServlet());
1768         configureInfoServer();
1769         this.infoServer.start();
1770         break;
1771       } catch (BindException e) {
1772         if (!auto) {
1773           // auto bind disabled throw BindException
1774           LOG.error("Failed binding http info server to port: " + port);
1775           throw e;
1776         }
1777         // auto bind enabled, try to use another port
1778         LOG.info("Failed binding http info server to port: " + port);
1779         port++;
1780       }
1781     }
1782     port = this.infoServer.getPort();
1783     conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
1784     int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT,
1785       HConstants.DEFAULT_MASTER_INFOPORT);
1786     conf.setInt("hbase.master.info.port.orig", masterInfoPort);
1787     conf.setInt(HConstants.MASTER_INFO_PORT, port);
1788     return port;
1789   }
1790 
1791   /*
1792    * Verify that server is healthy
1793    */
1794   private boolean isHealthy() {
1795     if (!fsOk) {
1796       // File system problem
1797       return false;
1798     }
1799     // Verify that all threads are alive
1800     if (!(leases.isAlive()
1801         && cacheFlusher.isAlive() && walRoller.isAlive()
1802         && this.compactionChecker.isScheduled()
1803         && this.periodicFlusher.isScheduled())) {
1804       stop("One or more threads are no longer alive -- stop");
1805       return false;
1806     }
1807     final LogRoller metawalRoller = this.metawalRoller.get();
1808     if (metawalRoller != null && !metawalRoller.isAlive()) {
1809       stop("Meta WAL roller thread is no longer alive -- stop");
1810       return false;
1811     }
1812     return true;
1813   }
1814 
1815   private static final byte[] UNSPECIFIED_REGION = new byte[]{};
1816 
1817   @Override
1818   public WAL getWAL(HRegionInfo regionInfo) throws IOException {
1819     WAL wal;
1820     LogRoller roller = walRoller;
1821     //_ROOT_ and hbase:meta regions have separate WAL.
1822     if (regionInfo != null && regionInfo.isMetaTable() &&
1823         regionInfo.getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
1824       roller = ensureMetaWALRoller();
1825       wal = walFactory.getMetaWAL(regionInfo.getEncodedNameAsBytes());
1826     } else if (regionInfo == null) {
1827       wal = walFactory.getWAL(UNSPECIFIED_REGION);
1828     } else {
1829       wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes());
1830     }
1831     roller.addWAL(wal);
1832     return wal;
1833   }
1834 
1835   @Override
1836   public ClusterConnection getConnection() {
1837     return this.clusterConnection;
1838   }
1839 
1840   @Override
1841   public MetaTableLocator getMetaTableLocator() {
1842     return this.metaTableLocator;
1843   }
1844 
1845   @Override
1846   public void stop(final String msg) {
1847     if (!this.stopped) {
1848       try {
1849         if (this.rsHost != null) {
1850           this.rsHost.preStop(msg);
1851         }
1852         this.stopped = true;
1853         LOG.info("STOPPED: " + msg);
1854         // Wakes run() if it is sleeping
1855         sleeper.skipSleepCycle();
1856       } catch (IOException exp) {
1857         LOG.warn("The region server did not stop", exp);
1858       }
1859     }
1860   }
1861 
1862   public void waitForServerOnline(){
1863     while (!isStopped() && !isOnline()) {
1864       synchronized (online) {
1865         try {
1866           online.wait(msgInterval);
1867         } catch (InterruptedException ie) {
1868           Thread.currentThread().interrupt();
1869           break;
1870         }
1871       }
1872     }
1873   }
1874 
1875   @Override
1876   public void postOpenDeployTasks(final Region r) throws KeeperException, IOException {
1877     Preconditions.checkArgument(r instanceof HRegion, "r must be an HRegion");
1878     rpcServices.checkOpen();
1879     LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString());
1880     // Do checks to see if we need to compact (references or too many files)
1881     for (Store s : r.getStores()) {
1882       if (s.hasReferences() || s.needsCompaction()) {
1883        this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
1884       }
1885     }
1886     long openSeqNum = r.getOpenSeqNum();
1887     if (openSeqNum == HConstants.NO_SEQNUM) {
1888       // If we opened a region, we should have read some sequence number from it.
1889       LOG.error("No sequence number found when opening " +
1890         r.getRegionInfo().getRegionNameAsString());
1891       openSeqNum = 0;
1892     }
1893 
1894     // Update flushed sequence id of a recovering region in ZK
1895     updateRecoveringRegionLastFlushedSequenceId(r);
1896 
1897     // Notify master
1898     if (!reportRegionStateTransition(
1899         TransitionCode.OPENED, openSeqNum, r.getRegionInfo())) {
1900       throw new IOException("Failed to report opened region to master: "
1901         + r.getRegionInfo().getRegionNameAsString());
1902     }
1903 
1904     triggerFlushInPrimaryRegion((HRegion)r);
1905 
1906     LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
1907   }
1908 
1909   @Override
1910   public boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris) {
1911     return reportRegionStateTransition(code, HConstants.NO_SEQNUM, hris);
1912   }
1913 
1914   @Override
1915   public boolean reportRegionStateTransition(
1916       TransitionCode code, long openSeqNum, HRegionInfo... hris) {
1917     if (TEST_SKIP_REPORTING_TRANSITION) {
1918       // This is for testing only in case there is no master
1919       // to handle the region transition report at all.
1920       if (code == TransitionCode.OPENED) {
1921         Preconditions.checkArgument(hris != null && hris.length == 1);
1922         if (hris[0].isMetaRegion()) {
1923           try {
1924             MetaTableLocator.setMetaLocation(getZooKeeper(), serverName,
1925                 hris[0].getReplicaId(),State.OPEN);
1926           } catch (KeeperException e) {
1927             LOG.info("Failed to update meta location", e);
1928             return false;
1929           }
1930         } else {
1931           try {
1932             MetaTableAccessor.updateRegionLocation(clusterConnection,
1933               hris[0], serverName, openSeqNum);
1934           } catch (IOException e) {
1935             LOG.info("Failed to update meta", e);
1936             return false;
1937           }
1938         }
1939       }
1940       return true;
1941     }
1942 
1943     ReportRegionStateTransitionRequest.Builder builder =
1944       ReportRegionStateTransitionRequest.newBuilder();
1945     builder.setServer(ProtobufUtil.toServerName(serverName));
1946     RegionStateTransition.Builder transition = builder.addTransitionBuilder();
1947     transition.setTransitionCode(code);
1948     if (code == TransitionCode.OPENED && openSeqNum >= 0) {
1949       transition.setOpenSeqNum(openSeqNum);
1950     }
1951     for (HRegionInfo hri: hris) {
1952       transition.addRegionInfo(HRegionInfo.convert(hri));
1953     }
1954     ReportRegionStateTransitionRequest request = builder.build();
1955     while (keepLooping()) {
1956       RegionServerStatusService.BlockingInterface rss = rssStub;
1957       try {
1958         if (rss == null) {
1959           createRegionServerStatusStub();
1960           continue;
1961         }
1962         ReportRegionStateTransitionResponse response =
1963           rss.reportRegionStateTransition(null, request);
1964         if (response.hasErrorMessage()) {
1965           LOG.info("Failed to transition " + hris[0]
1966             + " to " + code + ": " + response.getErrorMessage());
1967           return false;
1968         }
1969         return true;
1970       } catch (ServiceException se) {
1971         IOException ioe = ProtobufUtil.getRemoteException(se);
1972         LOG.info("Failed to report region transition, will retry", ioe);
1973         if (rssStub == rss) {
1974           rssStub = null;
1975         }
1976       }
1977     }
1978     return false;
1979   }
1980 
1981   /**
1982    * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
1983    * block this thread. See RegionReplicaFlushHandler for details.
1984    */
1985   void triggerFlushInPrimaryRegion(final HRegion region) {
1986     if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
1987       return;
1988     }
1989     if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) ||
1990         !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(
1991           region.conf)) {
1992       region.setReadsEnabled(true);
1993       return;
1994     }
1995 
1996     region.setReadsEnabled(false); // disable reads before marking the region as opened.
1997     // RegionReplicaFlushHandler might reset this.
1998 
1999     // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2000     this.service.submit(
2001       new RegionReplicaFlushHandler(this, clusterConnection,
2002         rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
2003   }
2004 
2005   @Override
2006   public RpcServerInterface getRpcServer() {
2007     return rpcServices.rpcServer;
2008   }
2009 
2010   @VisibleForTesting
2011   public RSRpcServices getRSRpcServices() {
2012     return rpcServices;
2013   }
2014 
2015   /**
2016    * Cause the server to exit without closing the regions it is serving, the log
2017    * it is using and without notifying the master. Used unit testing and on
2018    * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
2019    *
2020    * @param reason
2021    *          the reason we are aborting
2022    * @param cause
2023    *          the exception that caused the abort, or null
2024    */
2025   @Override
2026   public void abort(String reason, Throwable cause) {
2027     String msg = "ABORTING region server " + this + ": " + reason;
2028     if (cause != null) {
2029       LOG.fatal(msg, cause);
2030     } else {
2031       LOG.fatal(msg);
2032     }
2033     this.abortRequested = true;
2034     // HBASE-4014: show list of coprocessors that were loaded to help debug
2035     // regionserver crashes.Note that we're implicitly using
2036     // java.util.HashSet's toString() method to print the coprocessor names.
2037     LOG.fatal("RegionServer abort: loaded coprocessors are: " +
2038         CoprocessorHost.getLoadedCoprocessors());
2039     // Try and dump metrics if abort -- might give clue as to how fatal came about....
2040     try {
2041       LOG.info("Dump of metrics as JSON on abort: " + JSONBean.dumpRegionServerMetrics());
2042     } catch (MalformedObjectNameException | IOException e) {
2043       LOG.warn("Failed dumping metrics", e);
2044     }
2045 
2046     // Do our best to report our abort to the master, but this may not work
2047     try {
2048       if (cause != null) {
2049         msg += "\nCause:\n" + StringUtils.stringifyException(cause);
2050       }
2051       // Report to the master but only if we have already registered with the master.
2052       if (rssStub != null && this.serverName != null) {
2053         ReportRSFatalErrorRequest.Builder builder =
2054           ReportRSFatalErrorRequest.newBuilder();
2055         ServerName sn =
2056           ServerName.parseVersionedServerName(this.serverName.getVersionedBytes());
2057         builder.setServer(ProtobufUtil.toServerName(sn));
2058         builder.setErrorMessage(msg);
2059         rssStub.reportRSFatalError(null, builder.build());
2060       }
2061     } catch (Throwable t) {
2062       LOG.warn("Unable to report fatal error to master", t);
2063     }
2064     stop(reason);
2065   }
2066 
2067   /**
2068    * @see HRegionServer#abort(String, Throwable)
2069    */
2070   public void abort(String reason) {
2071     abort(reason, null);
2072   }
2073 
2074   @Override
2075   public boolean isAborted() {
2076     return this.abortRequested;
2077   }
2078 
2079   /*
2080    * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
2081    * logs but it does close socket in case want to bring up server on old
2082    * hostname+port immediately.
2083    */
2084   protected void kill() {
2085     this.killed = true;
2086     abort("Simulated kill");
2087   }
2088 
2089   /**
2090    * Called on stop/abort before closing the cluster connection and meta locator.
2091    */
2092   protected void sendShutdownInterrupt() {
2093   }
2094 
2095   /**
2096    * Wait on all threads to finish. Presumption is that all closes and stops
2097    * have already been called.
2098    */
2099   protected void stopServiceThreads() {
2100     // clean up the scheduled chores
2101     if (this.choreService != null) choreService.shutdown();
2102     if (this.nonceManagerChore != null) nonceManagerChore.cancel(true);
2103     if (this.compactionChecker != null) compactionChecker.cancel(true);
2104     if (this.periodicFlusher != null) periodicFlusher.cancel(true);
2105     if (this.healthCheckChore != null) healthCheckChore.cancel(true);
2106     if (this.storefileRefresher != null) storefileRefresher.cancel(true);
2107     if (this.movedRegionsCleaner != null) movedRegionsCleaner.cancel(true);
2108 
2109     if (this.cacheFlusher != null) {
2110       this.cacheFlusher.join();
2111     }
2112 
2113     if (this.spanReceiverHost != null) {
2114       this.spanReceiverHost.closeReceivers();
2115     }
2116     if (this.walRoller != null) {
2117       Threads.shutdown(this.walRoller.getThread());
2118     }
2119     final LogRoller metawalRoller = this.metawalRoller.get();
2120     if (metawalRoller != null) {
2121       Threads.shutdown(metawalRoller.getThread());
2122     }
2123     if (this.compactSplitThread != null) {
2124       this.compactSplitThread.join();
2125     }
2126     if (this.service != null) this.service.shutdown();
2127     if (this.replicationSourceHandler != null &&
2128         this.replicationSourceHandler == this.replicationSinkHandler) {
2129       this.replicationSourceHandler.stopReplicationService();
2130     } else {
2131       if (this.replicationSourceHandler != null) {
2132         this.replicationSourceHandler.stopReplicationService();
2133       }
2134       if (this.replicationSinkHandler != null) {
2135         this.replicationSinkHandler.stopReplicationService();
2136       }
2137     }
2138   }
2139 
2140   /**
2141    * @return Return the object that implements the replication
2142    * source service.
2143    */
2144   ReplicationSourceService getReplicationSourceService() {
2145     return replicationSourceHandler;
2146   }
2147 
2148   /**
2149    * @return Return the object that implements the replication
2150    * sink service.
2151    */
2152   ReplicationSinkService getReplicationSinkService() {
2153     return replicationSinkHandler;
2154   }
2155 
2156   /**
2157    * Get the current master from ZooKeeper and open the RPC connection to it.
2158    * To get a fresh connection, the current rssStub must be null.
2159    * Method will block until a master is available. You can break from this
2160    * block by requesting the server stop.
2161    *
2162    * @return master + port, or null if server has been stopped
2163    */
2164   @VisibleForTesting
2165   protected synchronized ServerName createRegionServerStatusStub() {
2166     if (rssStub != null) {
2167       return masterAddressTracker.getMasterAddress();
2168     }
2169     ServerName sn = null;
2170     long previousLogTime = 0;
2171     boolean refresh = false; // for the first time, use cached data
2172     RegionServerStatusService.BlockingInterface intf = null;
2173     boolean interrupted = false;
2174     try {
2175       while (keepLooping()) {
2176         sn = this.masterAddressTracker.getMasterAddress(refresh);
2177         if (sn == null) {
2178           if (!keepLooping()) {
2179             // give up with no connection.
2180             LOG.debug("No master found and cluster is stopped; bailing out");
2181             return null;
2182           }
2183           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2184             LOG.debug("No master found; retry");
2185             previousLogTime = System.currentTimeMillis();
2186           }
2187           refresh = true; // let's try pull it from ZK directly
2188           if (sleep(200)) {
2189             interrupted = true;
2190           }
2191           continue;
2192         }
2193 
2194         // If we are on the active master, use the shortcut
2195         if (this instanceof HMaster && sn.equals(getServerName())) {
2196           intf = ((HMaster)this).getMasterRpcServices();
2197           break;
2198         }
2199         try {
2200           BlockingRpcChannel channel =
2201             this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(),
2202               shortOperationTimeout);
2203           intf = RegionServerStatusService.newBlockingStub(channel);
2204           break;
2205         } catch (IOException e) {
2206           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2207             e = e instanceof RemoteException ?
2208               ((RemoteException)e).unwrapRemoteException() : e;
2209             if (e instanceof ServerNotRunningYetException) {
2210               LOG.info("Master isn't available yet, retrying");
2211             } else {
2212               LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2213             }
2214             previousLogTime = System.currentTimeMillis();
2215           }
2216           if (sleep(200)) {
2217             interrupted = true;
2218           }
2219         }
2220       }
2221     } finally {
2222       if (interrupted) {
2223         Thread.currentThread().interrupt();
2224       }
2225     }
2226     rssStub = intf;
2227     return sn;
2228   }
2229 
2230   /**
2231    * @return True if we should break loop because cluster is going down or
2232    * this server has been stopped or hdfs has gone bad.
2233    */
2234   private boolean keepLooping() {
2235     return !this.stopped && isClusterUp();
2236   }
2237 
2238   /*
2239    * Let the master know we're here Run initialization using parameters passed
2240    * us by the master.
2241    * @return A Map of key/value configurations we got from the Master else
2242    * null if we failed to register.
2243    * @throws IOException
2244    */
2245   private RegionServerStartupResponse reportForDuty() throws IOException {
2246     ServerName masterServerName = createRegionServerStatusStub();
2247     if (masterServerName == null) return null;
2248     RegionServerStartupResponse result = null;
2249     try {
2250       rpcServices.requestCount.set(0);
2251       LOG.info("reportForDuty to master=" + masterServerName + " with port="
2252         + rpcServices.isa.getPort() + ", startcode=" + this.startcode);
2253       long now = EnvironmentEdgeManager.currentTime();
2254       int port = rpcServices.isa.getPort();
2255       RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2256       if (shouldUseThisHostnameInstead()) {
2257         request.setUseThisHostnameInstead(useThisHostnameInstead);
2258       }
2259       request.setPort(port);
2260       request.setServerStartCode(this.startcode);
2261       request.setServerCurrentTime(now);
2262       result = this.rssStub.regionServerStartup(null, request.build());
2263     } catch (ServiceException se) {
2264       IOException ioe = ProtobufUtil.getRemoteException(se);
2265       if (ioe instanceof ClockOutOfSyncException) {
2266         LOG.fatal("Master rejected startup because clock is out of sync", ioe);
2267         // Re-throw IOE will cause RS to abort
2268         throw ioe;
2269       } else if (ioe instanceof ServerNotRunningYetException) {
2270         LOG.debug("Master is not running yet");
2271       } else {
2272         LOG.warn("error telling master we are up", se);
2273         rssStub = null;
2274       }
2275     }
2276     return result;
2277   }
2278 
2279   @Override
2280   public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2281     try {
2282       GetLastFlushedSequenceIdRequest req =
2283           RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2284       RegionServerStatusService.BlockingInterface rss = rssStub;
2285       if (rss == null) { // Try to connect one more time
2286         createRegionServerStatusStub();
2287         rss = rssStub;
2288         if (rss == null) {
2289           // Still no luck, we tried
2290           LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2291           return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2292               .build();
2293         }
2294       }
2295       GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2296       return RegionStoreSequenceIds.newBuilder()
2297           .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2298           .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2299     } catch (ServiceException e) {
2300       LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2301       return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2302           .build();
2303     }
2304   }
2305 
2306   /**
2307    * Closes all regions.  Called on our way out.
2308    * Assumes that its not possible for new regions to be added to onlineRegions
2309    * while this method runs.
2310    */
2311   protected void closeAllRegions(final boolean abort) {
2312     closeUserRegions(abort);
2313     closeMetaTableRegions(abort);
2314   }
2315 
2316   /**
2317    * Close meta region if we carry it
2318    * @param abort Whether we're running an abort.
2319    */
2320   void closeMetaTableRegions(final boolean abort) {
2321     Region meta = null;
2322     this.lock.writeLock().lock();
2323     try {
2324       for (Map.Entry<String, Region> e: onlineRegions.entrySet()) {
2325         HRegionInfo hri = e.getValue().getRegionInfo();
2326         if (hri.isMetaRegion()) {
2327           meta = e.getValue();
2328         }
2329         if (meta != null) break;
2330       }
2331     } finally {
2332       this.lock.writeLock().unlock();
2333     }
2334     if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2335   }
2336 
2337   /**
2338    * Schedule closes on all user regions.
2339    * Should be safe calling multiple times because it wont' close regions
2340    * that are already closed or that are closing.
2341    * @param abort Whether we're running an abort.
2342    */
2343   void closeUserRegions(final boolean abort) {
2344     this.lock.writeLock().lock();
2345     try {
2346       for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
2347         Region r = e.getValue();
2348         if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) {
2349           // Don't update zk with this close transition; pass false.
2350           closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2351         }
2352       }
2353     } finally {
2354       this.lock.writeLock().unlock();
2355     }
2356   }
2357 
2358   /** @return the info server */
2359   public InfoServer getInfoServer() {
2360     return infoServer;
2361   }
2362 
2363   /**
2364    * @return true if a stop has been requested.
2365    */
2366   @Override
2367   public boolean isStopped() {
2368     return this.stopped;
2369   }
2370 
2371   @Override
2372   public boolean isStopping() {
2373     return this.stopping;
2374   }
2375 
2376   @Override
2377   public Map<String, Region> getRecoveringRegions() {
2378     return this.recoveringRegions;
2379   }
2380 
2381   /**
2382    *
2383    * @return the configuration
2384    */
2385   @Override
2386   public Configuration getConfiguration() {
2387     return conf;
2388   }
2389 
2390   /** @return the write lock for the server */
2391   ReentrantReadWriteLock.WriteLock getWriteLock() {
2392     return lock.writeLock();
2393   }
2394 
2395   public int getNumberOfOnlineRegions() {
2396     return this.onlineRegions.size();
2397   }
2398 
2399   boolean isOnlineRegionsEmpty() {
2400     return this.onlineRegions.isEmpty();
2401   }
2402 
2403   /**
2404    * For tests, web ui and metrics.
2405    * This method will only work if HRegionServer is in the same JVM as client;
2406    * HRegion cannot be serialized to cross an rpc.
2407    */
2408   public Collection<Region> getOnlineRegionsLocalContext() {
2409     Collection<Region> regions = this.onlineRegions.values();
2410     return Collections.unmodifiableCollection(regions);
2411   }
2412 
2413   @Override
2414   public void addToOnlineRegions(Region region) {
2415     this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2416     configurationManager.registerObserver(region);
2417   }
2418 
2419   /**
2420    * @return A new Map of online regions sorted by region size with the first entry being the
2421    * biggest.  If two regions are the same size, then the last one found wins; i.e. this method
2422    * may NOT return all regions.
2423    */
2424   SortedMap<Long, Region> getCopyOfOnlineRegionsSortedBySize() {
2425     // we'll sort the regions in reverse
2426     SortedMap<Long, Region> sortedRegions = new TreeMap<Long, Region>(
2427         new Comparator<Long>() {
2428           @Override
2429           public int compare(Long a, Long b) {
2430             return -1 * a.compareTo(b);
2431           }
2432         });
2433     // Copy over all regions. Regions are sorted by size with biggest first.
2434     for (Region region : this.onlineRegions.values()) {
2435       sortedRegions.put(region.getMemstoreSize(), region);
2436     }
2437     return sortedRegions;
2438   }
2439 
2440   /**
2441    * @return time stamp in millis of when this region server was started
2442    */
2443   public long getStartcode() {
2444     return this.startcode;
2445   }
2446 
2447   /** @return reference to FlushRequester */
2448   @Override
2449   public FlushRequester getFlushRequester() {
2450     return this.cacheFlusher;
2451   }
2452 
2453   /**
2454    * Get the top N most loaded regions this server is serving so we can tell the
2455    * master which regions it can reallocate if we're overloaded. TODO: actually
2456    * calculate which regions are most loaded. (Right now, we're just grabbing
2457    * the first N regions being served regardless of load.)
2458    */
2459   protected HRegionInfo[] getMostLoadedRegions() {
2460     ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2461     for (Region r : onlineRegions.values()) {
2462       if (!r.isAvailable()) {
2463         continue;
2464       }
2465       if (regions.size() < numRegionsToReport) {
2466         regions.add(r.getRegionInfo());
2467       } else {
2468         break;
2469       }
2470     }
2471     return regions.toArray(new HRegionInfo[regions.size()]);
2472   }
2473 
2474   @Override
2475   public Leases getLeases() {
2476     return leases;
2477   }
2478 
2479   /**
2480    * @return Return the rootDir.
2481    */
2482   protected Path getRootDir() {
2483     return rootDir;
2484   }
2485 
2486   /**
2487    * @return Return the fs.
2488    */
2489   @Override
2490   public FileSystem getFileSystem() {
2491     return fs;
2492   }
2493 
2494   @Override
2495   public String toString() {
2496     return getServerName().toString();
2497   }
2498 
2499   /**
2500    * Interval at which threads should run
2501    *
2502    * @return the interval
2503    */
2504   public int getThreadWakeFrequency() {
2505     return threadWakeFrequency;
2506   }
2507 
2508   @Override
2509   public ZooKeeperWatcher getZooKeeper() {
2510     return zooKeeper;
2511   }
2512 
2513   @Override
2514   public BaseCoordinatedStateManager getCoordinatedStateManager() {
2515     return csm;
2516   }
2517 
2518   @Override
2519   public ServerName getServerName() {
2520     return serverName;
2521   }
2522 
2523   @Override
2524   public CompactionRequestor getCompactionRequester() {
2525     return this.compactSplitThread;
2526   }
2527 
2528   public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
2529     return this.rsHost;
2530   }
2531 
2532   @Override
2533   public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2534     return this.regionsInTransitionInRS;
2535   }
2536 
2537   @Override
2538   public ExecutorService getExecutorService() {
2539     return service;
2540   }
2541 
2542   @Override
2543   public ChoreService getChoreService() {
2544     return choreService;
2545   }
2546 
2547   @Override
2548   public RegionServerQuotaManager getRegionServerQuotaManager() {
2549     return rsQuotaManager;
2550   }
2551 
2552   //
2553   // Main program and support routines
2554   //
2555 
2556   /**
2557    * Load the replication service objects, if any
2558    */
2559   static private void createNewReplicationInstance(Configuration conf,
2560     HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
2561 
2562     // If replication is not enabled, then return immediately.
2563     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
2564         HConstants.REPLICATION_ENABLE_DEFAULT)) {
2565       return;
2566     }
2567 
2568     // read in the name of the source replication class from the config file.
2569     String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2570                                HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2571 
2572     // read in the name of the sink replication class from the config file.
2573     String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2574                              HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2575 
2576     // If both the sink and the source class names are the same, then instantiate
2577     // only one object.
2578     if (sourceClassname.equals(sinkClassname)) {
2579       server.replicationSourceHandler = (ReplicationSourceService)
2580                                          newReplicationInstance(sourceClassname,
2581                                          conf, server, fs, logDir, oldLogDir);
2582       server.replicationSinkHandler = (ReplicationSinkService)
2583                                          server.replicationSourceHandler;
2584     } else {
2585       server.replicationSourceHandler = (ReplicationSourceService)
2586                                          newReplicationInstance(sourceClassname,
2587                                          conf, server, fs, logDir, oldLogDir);
2588       server.replicationSinkHandler = (ReplicationSinkService)
2589                                          newReplicationInstance(sinkClassname,
2590                                          conf, server, fs, logDir, oldLogDir);
2591     }
2592   }
2593 
2594   static private ReplicationService newReplicationInstance(String classname,
2595     Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
2596     Path oldLogDir) throws IOException{
2597 
2598     Class<?> clazz = null;
2599     try {
2600       ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2601       clazz = Class.forName(classname, true, classLoader);
2602     } catch (java.lang.ClassNotFoundException nfe) {
2603       throw new IOException("Could not find class for " + classname);
2604     }
2605 
2606     // create an instance of the replication object.
2607     ReplicationService service = (ReplicationService)
2608                               ReflectionUtils.newInstance(clazz, conf);
2609     service.initialize(server, fs, logDir, oldLogDir);
2610     return service;
2611   }
2612 
2613   /**
2614    * Utility for constructing an instance of the passed HRegionServer class.
2615    *
2616    * @param regionServerClass
2617    * @param conf2
2618    * @return HRegionServer instance.
2619    */
2620   public static HRegionServer constructRegionServer(
2621       Class<? extends HRegionServer> regionServerClass,
2622       final Configuration conf2, CoordinatedStateManager cp) {
2623     try {
2624       Constructor<? extends HRegionServer> c = regionServerClass
2625           .getConstructor(Configuration.class, CoordinatedStateManager.class);
2626       return c.newInstance(conf2, cp);
2627     } catch (Exception e) {
2628       throw new RuntimeException("Failed construction of " + "Regionserver: "
2629           + regionServerClass.toString(), e);
2630     }
2631   }
2632 
2633   /**
2634    * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2635    */
2636   public static void main(String[] args) throws Exception {
2637     VersionInfo.logVersion();
2638     Configuration conf = HBaseConfiguration.create();
2639     @SuppressWarnings("unchecked")
2640     Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2641         .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2642 
2643     new HRegionServerCommandLine(regionServerClass).doMain(args);
2644   }
2645 
2646   /**
2647    * Gets the online regions of the specified table.
2648    * This method looks at the in-memory onlineRegions.  It does not go to <code>hbase:meta</code>.
2649    * Only returns <em>online</em> regions.  If a region on this table has been
2650    * closed during a disable, etc., it will not be included in the returned list.
2651    * So, the returned list may not necessarily be ALL regions in this table, its
2652    * all the ONLINE regions in the table.
2653    * @param tableName
2654    * @return Online regions from <code>tableName</code>
2655    */
2656   @Override
2657   public List<Region> getOnlineRegions(TableName tableName) {
2658      List<Region> tableRegions = new ArrayList<Region>();
2659      synchronized (this.onlineRegions) {
2660        for (Region region: this.onlineRegions.values()) {
2661          HRegionInfo regionInfo = region.getRegionInfo();
2662          if(regionInfo.getTable().equals(tableName)) {
2663            tableRegions.add(region);
2664          }
2665        }
2666      }
2667      return tableRegions;
2668    }
2669 
2670   /**
2671    * Gets the online tables in this RS.
2672    * This method looks at the in-memory onlineRegions.
2673    * @return all the online tables in this RS
2674    */
2675   @Override
2676   public Set<TableName> getOnlineTables() {
2677     Set<TableName> tables = new HashSet<TableName>();
2678     synchronized (this.onlineRegions) {
2679       for (Region region: this.onlineRegions.values()) {
2680         tables.add(region.getTableDesc().getTableName());
2681       }
2682     }
2683     return tables;
2684   }
2685 
2686   // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
2687   public String[] getRegionServerCoprocessors() {
2688     TreeSet<String> coprocessors = new TreeSet<String>();
2689     try {
2690       coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
2691     } catch (IOException exception) {
2692       LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
2693           "skipping.");
2694       LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2695     }
2696     Collection<Region> regions = getOnlineRegionsLocalContext();
2697     for (Region region: regions) {
2698       coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2699       try {
2700         coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
2701       } catch (IOException exception) {
2702         LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
2703             "; skipping.");
2704         LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2705       }
2706     }
2707     return coprocessors.toArray(new String[coprocessors.size()]);
2708   }
2709 
2710   /**
2711    * Try to close the region, logs a warning on failure but continues.
2712    * @param region Region to close
2713    */
2714   private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
2715     try {
2716       if (!closeRegion(region.getEncodedName(), abort, null)) {
2717         LOG.warn("Failed to close " + region.getRegionNameAsString() +
2718             " - ignoring and continuing");
2719       }
2720     } catch (IOException e) {
2721       LOG.warn("Failed to close " + region.getRegionNameAsString() +
2722           " - ignoring and continuing", e);
2723     }
2724   }
2725 
2726   /**
2727    * Close asynchronously a region, can be called from the master or internally by the regionserver
2728    * when stopping. If called from the master, the region will update the znode status.
2729    *
2730    * <p>
2731    * If an opening was in progress, this method will cancel it, but will not start a new close. The
2732    * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
2733    * </p>
2734 
2735    * <p>
2736    *   If a close was in progress, this new request will be ignored, and an exception thrown.
2737    * </p>
2738    *
2739    * @param encodedName Region to close
2740    * @param abort True if we are aborting
2741    * @return True if closed a region.
2742    * @throws NotServingRegionException if the region is not online
2743    */
2744   protected boolean closeRegion(String encodedName, final boolean abort, final ServerName sn)
2745       throws NotServingRegionException {
2746     //Check for permissions to close.
2747     Region actualRegion = this.getFromOnlineRegions(encodedName);
2748     if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2749       try {
2750         actualRegion.getCoprocessorHost().preClose(false);
2751       } catch (IOException exp) {
2752         LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2753         return false;
2754       }
2755     }
2756 
2757     final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(encodedName.getBytes(),
2758         Boolean.FALSE);
2759 
2760     if (Boolean.TRUE.equals(previous)) {
2761       LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
2762           "trying to OPEN. Cancelling OPENING.");
2763       if (!regionsInTransitionInRS.replace(encodedName.getBytes(), previous, Boolean.FALSE)){
2764         // The replace failed. That should be an exceptional case, but theoretically it can happen.
2765         // We're going to try to do a standard close then.
2766         LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
2767             " Doing a standard close now");
2768         return closeRegion(encodedName, abort, sn);
2769       }
2770       // Let's get the region from the online region list again
2771       actualRegion = this.getFromOnlineRegions(encodedName);
2772       if (actualRegion == null) { // If already online, we still need to close it.
2773         LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
2774         // The master deletes the znode when it receives this exception.
2775         throw new NotServingRegionException("The region " + encodedName +
2776           " was opening but not yet served. Opening is cancelled.");
2777       }
2778     } else if (Boolean.FALSE.equals(previous)) {
2779       LOG.info("Received CLOSE for the region: " + encodedName +
2780         ", which we are already trying to CLOSE, but not completed yet");
2781       return true;
2782     }
2783 
2784     if (actualRegion == null) {
2785       LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
2786       this.regionsInTransitionInRS.remove(encodedName.getBytes());
2787       // The master deletes the znode when it receives this exception.
2788       throw new NotServingRegionException("The region " + encodedName +
2789           " is not online, and is not opening.");
2790     }
2791 
2792     CloseRegionHandler crh;
2793     final HRegionInfo hri = actualRegion.getRegionInfo();
2794     if (hri.isMetaRegion()) {
2795       crh = new CloseMetaHandler(this, this, hri, abort);
2796     } else {
2797       crh = new CloseRegionHandler(this, this, hri, abort, sn);
2798     }
2799     this.service.submit(crh);
2800     return true;
2801   }
2802 
2803    /**
2804    * @param regionName
2805    * @return HRegion for the passed binary <code>regionName</code> or null if
2806    *         named region is not member of the online regions.
2807    */
2808   public Region getOnlineRegion(final byte[] regionName) {
2809     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2810     return this.onlineRegions.get(encodedRegionName);
2811   }
2812 
2813   public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) {
2814     return this.regionFavoredNodesMap.get(encodedRegionName);
2815   }
2816 
2817   @Override
2818   public Region getFromOnlineRegions(final String encodedRegionName) {
2819     return this.onlineRegions.get(encodedRegionName);
2820   }
2821 
2822 
2823   @Override
2824   public boolean removeFromOnlineRegions(final Region r, ServerName destination) {
2825     Region toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
2826 
2827     if (destination != null) {
2828       try {
2829         WAL wal = getWAL(r.getRegionInfo());
2830         long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes());
2831         if (closeSeqNum == HConstants.NO_SEQNUM) {
2832           // No edits in WAL for this region; get the sequence number when the region was opened.
2833           closeSeqNum = r.getOpenSeqNum();
2834           if (closeSeqNum == HConstants.NO_SEQNUM) {
2835             closeSeqNum = 0;
2836           }
2837         }
2838         addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
2839       } catch (IOException exception) {
2840         LOG.error("Could not retrieve WAL information for region " + r.getRegionInfo() +
2841             "; not adding to moved regions.");
2842         LOG.debug("Exception details for failure to get wal", exception);
2843       }
2844     }
2845     this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
2846     return toReturn != null;
2847   }
2848 
2849   /**
2850    * Protected utility method for safely obtaining an HRegion handle.
2851    *
2852    * @param regionName
2853    *          Name of online {@link HRegion} to return
2854    * @return {@link HRegion} for <code>regionName</code>
2855    * @throws NotServingRegionException
2856    */
2857   protected Region getRegion(final byte[] regionName)
2858       throws NotServingRegionException {
2859     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2860     return getRegionByEncodedName(regionName, encodedRegionName);
2861   }
2862 
2863   public Region getRegionByEncodedName(String encodedRegionName)
2864       throws NotServingRegionException {
2865     return getRegionByEncodedName(null, encodedRegionName);
2866   }
2867 
2868   protected Region getRegionByEncodedName(byte[] regionName, String encodedRegionName)
2869     throws NotServingRegionException {
2870     Region region = this.onlineRegions.get(encodedRegionName);
2871     if (region == null) {
2872       MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
2873       if (moveInfo != null) {
2874         throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
2875       }
2876       Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
2877       String regionNameStr = regionName == null?
2878         encodedRegionName: Bytes.toStringBinary(regionName);
2879       if (isOpening != null && isOpening.booleanValue()) {
2880         throw new RegionOpeningException("Region " + regionNameStr +
2881           " is opening on " + this.serverName);
2882       }
2883       throw new NotServingRegionException("Region " + regionNameStr +
2884         " is not online on " + this.serverName);
2885     }
2886     return region;
2887   }
2888 
2889   /*
2890    * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
2891    * IOE if it isn't already.
2892    *
2893    * @param t Throwable
2894    *
2895    * @param msg Message to log in error. Can be null.
2896    *
2897    * @return Throwable converted to an IOE; methods can only let out IOEs.
2898    */
2899   private Throwable cleanup(final Throwable t, final String msg) {
2900     // Don't log as error if NSRE; NSRE is 'normal' operation.
2901     if (t instanceof NotServingRegionException) {
2902       LOG.debug("NotServingRegionException; " + t.getMessage());
2903       return t;
2904     }
2905     Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
2906     if (msg == null) {
2907       LOG.error("", e);
2908     } else {
2909       LOG.error(msg, e);
2910     }
2911     if (!rpcServices.checkOOME(t)) {
2912       checkFileSystem();
2913     }
2914     return t;
2915   }
2916 
2917   /*
2918    * @param t
2919    *
2920    * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
2921    *
2922    * @return Make <code>t</code> an IOE if it isn't already.
2923    */
2924   protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
2925     return (t instanceof IOException ? (IOException) t : msg == null
2926         || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
2927   }
2928 
2929   /**
2930    * Checks to see if the file system is still accessible. If not, sets
2931    * abortRequested and stopRequested
2932    *
2933    * @return false if file system is not available
2934    */
2935   public boolean checkFileSystem() {
2936     if (this.fsOk && this.fs != null) {
2937       try {
2938         FSUtils.checkFileSystemAvailable(this.fs);
2939       } catch (IOException e) {
2940         abort("File System not available", e);
2941         this.fsOk = false;
2942       }
2943     }
2944     return this.fsOk;
2945   }
2946 
2947   @Override
2948   public void updateRegionFavoredNodesMapping(String encodedRegionName,
2949       List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
2950     InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
2951     // Refer to the comment on the declaration of regionFavoredNodesMap on why
2952     // it is a map of region name to InetSocketAddress[]
2953     for (int i = 0; i < favoredNodes.size(); i++) {
2954       addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
2955           favoredNodes.get(i).getPort());
2956     }
2957     regionFavoredNodesMap.put(encodedRegionName, addr);
2958   }
2959 
2960   /**
2961    * Return the favored nodes for a region given its encoded name. Look at the
2962    * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[]
2963    * @param encodedRegionName
2964    * @return array of favored locations
2965    */
2966   @Override
2967   public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
2968     return regionFavoredNodesMap.get(encodedRegionName);
2969   }
2970 
2971   @Override
2972   public ServerNonceManager getNonceManager() {
2973     return this.nonceManager;
2974   }
2975 
2976   private static class MovedRegionInfo {
2977     private final ServerName serverName;
2978     private final long seqNum;
2979     private final long ts;
2980 
2981     public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
2982       this.serverName = serverName;
2983       this.seqNum = closeSeqNum;
2984       ts = EnvironmentEdgeManager.currentTime();
2985      }
2986 
2987     public ServerName getServerName() {
2988       return serverName;
2989     }
2990 
2991     public long getSeqNum() {
2992       return seqNum;
2993     }
2994 
2995     public long getMoveTime() {
2996       return ts;
2997     }
2998   }
2999 
3000   // This map will contains all the regions that we closed for a move.
3001   //  We add the time it was moved as we don't want to keep too old information
3002   protected Map<String, MovedRegionInfo> movedRegions =
3003       new ConcurrentHashMap<String, MovedRegionInfo>(3000);
3004 
3005   // We need a timeout. If not there is a risk of giving a wrong information: this would double
3006   //  the number of network calls instead of reducing them.
3007   private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3008 
3009   protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
3010     if (ServerName.isSameHostnameAndPort(destination, this.getServerName())) {
3011       LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3012       return;
3013     }
3014     LOG.info("Adding moved region record: "
3015       + encodedName + " to " + destination + " as of " + closeSeqNum);
3016     movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3017   }
3018 
3019   void removeFromMovedRegions(String encodedName) {
3020     movedRegions.remove(encodedName);
3021   }
3022 
3023   private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
3024     MovedRegionInfo dest = movedRegions.get(encodedRegionName);
3025 
3026     long now = EnvironmentEdgeManager.currentTime();
3027     if (dest != null) {
3028       if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
3029         return dest;
3030       } else {
3031         movedRegions.remove(encodedRegionName);
3032       }
3033     }
3034 
3035     return null;
3036   }
3037 
3038   /**
3039    * Remove the expired entries from the moved regions list.
3040    */
3041   protected void cleanMovedRegions() {
3042     final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
3043     Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();
3044 
3045     while (it.hasNext()){
3046       Map.Entry<String, MovedRegionInfo> e = it.next();
3047       if (e.getValue().getMoveTime() < cutOff) {
3048         it.remove();
3049       }
3050     }
3051   }
3052 
3053   /*
3054    * Use this to allow tests to override and schedule more frequently.
3055    */
3056 
3057   protected int movedRegionCleanerPeriod() {
3058         return TIMEOUT_REGION_MOVED;
3059   }
3060 
3061   /**
3062    * Creates a Chore thread to clean the moved region cache.
3063    */
3064 
3065   protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable {
3066     private HRegionServer regionServer;
3067     Stoppable stoppable;
3068 
3069     private MovedRegionsCleaner(
3070       HRegionServer regionServer, Stoppable stoppable){
3071       super("MovedRegionsCleaner for region " + regionServer, stoppable,
3072           regionServer.movedRegionCleanerPeriod());
3073       this.regionServer = regionServer;
3074       this.stoppable = stoppable;
3075     }
3076 
3077     static MovedRegionsCleaner create(HRegionServer rs){
3078       Stoppable stoppable = new Stoppable() {
3079         private volatile boolean isStopped = false;
3080         @Override public void stop(String why) { isStopped = true;}
3081         @Override public boolean isStopped() {return isStopped;}
3082       };
3083 
3084       return new MovedRegionsCleaner(rs, stoppable);
3085     }
3086 
3087     @Override
3088     protected void chore() {
3089       regionServer.cleanMovedRegions();
3090     }
3091 
3092     @Override
3093     public void stop(String why) {
3094       stoppable.stop(why);
3095     }
3096 
3097     @Override
3098     public boolean isStopped() {
3099       return stoppable.isStopped();
3100     }
3101   }
3102 
3103   private String getMyEphemeralNodePath() {
3104     return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
3105   }
3106 
3107   private boolean isHealthCheckerConfigured() {
3108     String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3109     return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
3110   }
3111 
3112   /**
3113    * @return the underlying {@link CompactSplitThread} for the servers
3114    */
3115   public CompactSplitThread getCompactSplitThread() {
3116     return this.compactSplitThread;
3117   }
3118 
3119   /**
3120    * A helper function to store the last flushed sequence Id with the previous failed RS for a
3121    * recovering region. The Id is used to skip wal edits which are flushed. Since the flushed
3122    * sequence id is only valid for each RS, we associate the Id with corresponding failed RS.
3123    * @throws KeeperException
3124    * @throws IOException
3125    */
3126   private void updateRecoveringRegionLastFlushedSequenceId(Region r) throws KeeperException,
3127       IOException {
3128     if (!r.isRecovering()) {
3129       // return immdiately for non-recovering regions
3130       return;
3131     }
3132 
3133     HRegionInfo regionInfo = r.getRegionInfo();
3134     ZooKeeperWatcher zkw = getZooKeeper();
3135     String previousRSName = this.getLastFailedRSFromZK(regionInfo.getEncodedName());
3136     Map<byte[], Long> maxSeqIdInStores = r.getMaxStoreSeqId();
3137     long minSeqIdForLogReplay = -1;
3138     for (Long storeSeqIdForReplay : maxSeqIdInStores.values()) {
3139       if (minSeqIdForLogReplay == -1 || storeSeqIdForReplay < minSeqIdForLogReplay) {
3140         minSeqIdForLogReplay = storeSeqIdForReplay;
3141       }
3142     }
3143 
3144     try {
3145       long lastRecordedFlushedSequenceId = -1;
3146       String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode,
3147         regionInfo.getEncodedName());
3148       // recovering-region level
3149       byte[] data;
3150       try {
3151         data = ZKUtil.getData(zkw, nodePath);
3152       } catch (InterruptedException e) {
3153         throw new InterruptedIOException();
3154       }
3155       if (data != null) {
3156         lastRecordedFlushedSequenceId = ZKSplitLog.parseLastFlushedSequenceIdFrom(data);
3157       }
3158       if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
3159         ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
3160       }
3161       if (previousRSName != null) {
3162         // one level deeper for the failed RS
3163         nodePath = ZKUtil.joinZNode(nodePath, previousRSName);
3164         ZKUtil.setData(zkw, nodePath,
3165           ZKUtil.regionSequenceIdsToByteArray(minSeqIdForLogReplay, maxSeqIdInStores));
3166         LOG.debug("Update last flushed sequence id of region " + regionInfo.getEncodedName() +
3167           " for " + previousRSName);
3168       } else {
3169         LOG.warn("Can't find failed region server for recovering region " +
3170             regionInfo.getEncodedName());
3171       }
3172     } catch (NoNodeException ignore) {
3173       LOG.debug("Region " + regionInfo.getEncodedName() +
3174         " must have completed recovery because its recovery znode has been removed", ignore);
3175     }
3176   }
3177 
3178   /**
3179    * Return the last failed RS name under /hbase/recovering-regions/encodedRegionName
3180    * @param encodedRegionName
3181    * @throws KeeperException
3182    */
3183   private String getLastFailedRSFromZK(String encodedRegionName) throws KeeperException {
3184     String result = null;
3185     long maxZxid = 0;
3186     ZooKeeperWatcher zkw = this.getZooKeeper();
3187     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
3188     List<String> failedServers = ZKUtil.listChildrenNoWatch(zkw, nodePath);
3189     if (failedServers == null || failedServers.isEmpty()) {
3190       return result;
3191     }
3192     for (String failedServer : failedServers) {
3193       String rsPath = ZKUtil.joinZNode(nodePath, failedServer);
3194       Stat stat = new Stat();
3195       ZKUtil.getDataNoWatch(zkw, rsPath, stat);
3196       if (maxZxid < stat.getCzxid()) {
3197         maxZxid = stat.getCzxid();
3198         result = failedServer;
3199       }
3200     }
3201     return result;
3202   }
3203 
3204   public CoprocessorServiceResponse execRegionServerService(final RpcController controller,
3205       final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3206     try {
3207       ServerRpcController execController = new ServerRpcController();
3208       CoprocessorServiceCall call = serviceRequest.getCall();
3209       String serviceName = call.getServiceName();
3210       String methodName = call.getMethodName();
3211       if (!coprocessorServiceHandlers.containsKey(serviceName)) {
3212         throw new UnknownProtocolException(null,
3213             "No registered coprocessor service found for name " + serviceName);
3214       }
3215       Service service = coprocessorServiceHandlers.get(serviceName);
3216       Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
3217       Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3218       if (methodDesc == null) {
3219         throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName
3220             + " called on service " + serviceName);
3221       }
3222       Message request =
3223           service.getRequestPrototype(methodDesc).newBuilderForType().mergeFrom(call.getRequest())
3224               .build();
3225       final Message.Builder responseBuilder =
3226           service.getResponsePrototype(methodDesc).newBuilderForType();
3227       service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
3228         @Override
3229         public void run(Message message) {
3230           if (message != null) {
3231             responseBuilder.mergeFrom(message);
3232           }
3233         }
3234       });
3235       Message execResult = responseBuilder.build();
3236       if (execController.getFailedOn() != null) {
3237         throw execController.getFailedOn();
3238       }
3239       ClientProtos.CoprocessorServiceResponse.Builder builder =
3240           ClientProtos.CoprocessorServiceResponse.newBuilder();
3241       builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
3242         HConstants.EMPTY_BYTE_ARRAY));
3243       builder.setValue(builder.getValueBuilder().setName(execResult.getClass().getName())
3244           .setValue(execResult.toByteString()));
3245       return builder.build();
3246     } catch (IOException ie) {
3247       throw new ServiceException(ie);
3248     }
3249   }
3250 
3251   /**
3252    * @return The cache config instance used by the regionserver.
3253    */
3254   public CacheConfig getCacheConfig() {
3255     return this.cacheConfig;
3256   }
3257 
3258   /**
3259    * @return : Returns the ConfigurationManager object for testing purposes.
3260    */
3261   protected ConfigurationManager getConfigurationManager() {
3262     return configurationManager;
3263   }
3264 
3265   /**
3266    * @return Return table descriptors implementation.
3267    */
3268   public TableDescriptors getTableDescriptors() {
3269     return this.tableDescriptors;
3270   }
3271 
3272   /**
3273    * Reload the configuration from disk.
3274    */
3275   public void updateConfiguration() {
3276     LOG.info("Reloading the configuration from disk.");
3277     // Reload the configuration from disk.
3278     conf.reloadConfiguration();
3279     configurationManager.notifyAllObservers(conf);
3280   }
3281 
3282   @Override
3283   public HeapMemoryManager getHeapMemoryManager() {
3284     return hMemManager;
3285   }
3286 
3287   @Override
3288   public double getCompactionPressure() {
3289     double max = 0;
3290     for (Region region : onlineRegions.values()) {
3291       for (Store store : region.getStores()) {
3292         double normCount = store.getCompactionPressure();
3293         if (normCount > max) {
3294           max = normCount;
3295         }
3296       }
3297     }
3298     return max;
3299   }
3300 }