View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.lang.Thread.UncaughtExceptionHandler;
24  import java.lang.management.ManagementFactory;
25  import java.lang.management.MemoryUsage;
26  import java.lang.reflect.Constructor;
27  import java.net.BindException;
28  import java.net.InetAddress;
29  import java.net.InetSocketAddress;
30  import java.util.ArrayList;
31  import java.util.Collection;
32  import java.util.Collections;
33  import java.util.Comparator;
34  import java.util.HashMap;
35  import java.util.HashSet;
36  import java.util.Iterator;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.Map.Entry;
40  import java.util.Set;
41  import java.util.SortedMap;
42  import java.util.TreeMap;
43  import java.util.TreeSet;
44  import java.util.concurrent.ConcurrentHashMap;
45  import java.util.concurrent.ConcurrentMap;
46  import java.util.concurrent.ConcurrentSkipListMap;
47  import java.util.concurrent.atomic.AtomicBoolean;
48  import java.util.concurrent.atomic.AtomicReference;
49  import java.util.concurrent.locks.ReentrantReadWriteLock;
50  
51  import javax.management.MalformedObjectNameException;
52  import javax.management.ObjectName;
53  import javax.servlet.http.HttpServlet;
54  
55  import org.apache.commons.lang.SystemUtils;
56  import org.apache.commons.lang.math.RandomUtils;
57  import org.apache.commons.logging.Log;
58  import org.apache.commons.logging.LogFactory;
59  import org.apache.hadoop.conf.Configuration;
60  import org.apache.hadoop.fs.FileSystem;
61  import org.apache.hadoop.fs.Path;
62  import org.apache.hadoop.hbase.ChoreService;
63  import org.apache.hadoop.hbase.ClockOutOfSyncException;
64  import org.apache.hadoop.hbase.CoordinatedStateManager;
65  import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
66  import org.apache.hadoop.hbase.HBaseConfiguration;
67  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
68  import org.apache.hadoop.hbase.HConstants;
69  import org.apache.hadoop.hbase.HRegionInfo;
70  import org.apache.hadoop.hbase.HealthCheckChore;
71  import org.apache.hadoop.hbase.MetaTableAccessor;
72  import org.apache.hadoop.hbase.NotServingRegionException;
73  import org.apache.hadoop.hbase.RemoteExceptionHandler;
74  import org.apache.hadoop.hbase.ScheduledChore;
75  import org.apache.hadoop.hbase.ServerName;
76  import org.apache.hadoop.hbase.Stoppable;
77  import org.apache.hadoop.hbase.TableDescriptors;
78  import org.apache.hadoop.hbase.TableName;
79  import org.apache.hadoop.hbase.YouAreDeadException;
80  import org.apache.hadoop.hbase.ZNodeClearer;
81  import org.apache.hadoop.hbase.classification.InterfaceAudience;
82  import org.apache.hadoop.hbase.client.ClusterConnection;
83  import org.apache.hadoop.hbase.client.ConnectionUtils;
84  import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
85  import org.apache.hadoop.hbase.conf.ConfigurationManager;
86  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
87  import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
88  import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
89  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
90  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
91  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
92  import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
93  import org.apache.hadoop.hbase.executor.ExecutorService;
94  import org.apache.hadoop.hbase.executor.ExecutorType;
95  import org.apache.hadoop.hbase.fs.HFileSystem;
96  import org.apache.hadoop.hbase.http.InfoServer;
97  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
98  import org.apache.hadoop.hbase.ipc.RpcClient;
99  import org.apache.hadoop.hbase.ipc.RpcClientFactory;
100 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
101 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
102 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
103 import org.apache.hadoop.hbase.ipc.ServerRpcController;
104 import org.apache.hadoop.hbase.master.HMaster;
105 import org.apache.hadoop.hbase.master.RegionState.State;
106 import org.apache.hadoop.hbase.master.TableLockManager;
107 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
108 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
109 import org.apache.hadoop.hbase.protobuf.RequestConverter;
110 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
111 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
112 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
113 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
114 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
115 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
116 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
117 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
118 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
119 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor.Builder;
120 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
121 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
122 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
123 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
124 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
125 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
126 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
127 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
128 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
129 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
130 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
131 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
132 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
133 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
134 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
135 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
136 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
137 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
138 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
139 import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
140 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
141 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
142 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
143 import org.apache.hadoop.hbase.security.Superusers;
144 import org.apache.hadoop.hbase.security.UserProvider;
145 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
146 import org.apache.hadoop.hbase.util.Addressing;
147 import org.apache.hadoop.hbase.util.ByteStringer;
148 import org.apache.hadoop.hbase.util.Bytes;
149 import org.apache.hadoop.hbase.util.CompressionTest;
150 import org.apache.hadoop.hbase.util.ConfigUtil;
151 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
152 import org.apache.hadoop.hbase.util.FSTableDescriptors;
153 import org.apache.hadoop.hbase.util.FSUtils;
154 import org.apache.hadoop.hbase.util.HasThread;
155 import org.apache.hadoop.hbase.util.JSONBean;
156 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
157 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
158 import org.apache.hadoop.hbase.util.Sleeper;
159 import org.apache.hadoop.hbase.util.Threads;
160 import org.apache.hadoop.hbase.util.VersionInfo;
161 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
162 import org.apache.hadoop.hbase.wal.WAL;
163 import org.apache.hadoop.hbase.wal.WALFactory;
164 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
165 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
166 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
167 import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
168 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
169 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
170 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
171 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
172 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
173 import org.apache.hadoop.ipc.RemoteException;
174 import org.apache.hadoop.metrics.util.MBeanUtil;
175 import org.apache.hadoop.util.ReflectionUtils;
176 import org.apache.hadoop.util.StringUtils;
177 import org.apache.zookeeper.KeeperException;
178 import org.apache.zookeeper.KeeperException.NoNodeException;
179 import org.apache.zookeeper.data.Stat;
180 
181 import com.google.common.annotations.VisibleForTesting;
182 import com.google.common.base.Preconditions;
183 import com.google.common.collect.Maps;
184 import com.google.protobuf.BlockingRpcChannel;
185 import com.google.protobuf.Descriptors;
186 import com.google.protobuf.Message;
187 import com.google.protobuf.RpcCallback;
188 import com.google.protobuf.RpcController;
189 import com.google.protobuf.Service;
190 import com.google.protobuf.ServiceException;
191 import sun.misc.Signal;
192 import sun.misc.SignalHandler;
193 
194 /**
195  * HRegionServer makes a set of HRegions available to clients. It checks in with
196  * the HMaster. There are many HRegionServers in a single HBase deployment.
197  */
198 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
199 @SuppressWarnings("deprecation")
200 public class HRegionServer extends HasThread implements
201     RegionServerServices, LastSequenceId {
202 
203   private static final Log LOG = LogFactory.getLog(HRegionServer.class);
204 
205   /*
206    * Strings to be used in forming the exception message for
207    * RegionsAlreadyInTransitionException.
208    */
209   protected static final String OPEN = "OPEN";
210   protected static final String CLOSE = "CLOSE";
211 
212   //RegionName vs current action in progress
213   //true - if open region action in progress
214   //false - if close region action in progress
215   protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
216     new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
217 
218   // Cache flushing
219   protected MemStoreFlusher cacheFlusher;
220 
221   protected HeapMemoryManager hMemManager;
222 
223   /**
224    * Cluster connection to be shared by services.
225    * Initialized at server startup and closed when server shuts down.
226    * Clients must never close it explicitly.
227    */
228   protected ClusterConnection clusterConnection;
229 
230   /*
231    * Long-living meta table locator, which is created when the server is started and stopped
232    * when server shuts down. References to this locator shall be used to perform according
233    * operations in EventHandlers. Primary reason for this decision is to make it mockable
234    * for tests.
235    */
236   protected MetaTableLocator metaTableLocator;
237 
238   // Watch if a region is out of recovering state from ZooKeeper
239   @SuppressWarnings("unused")
240   private RecoveringRegionWatcher recoveringRegionWatcher;
241 
242   /**
243    * Go here to get table descriptors.
244    */
245   protected TableDescriptors tableDescriptors;
246 
247   // Replication services. If no replication, this handler will be null.
248   protected ReplicationSourceService replicationSourceHandler;
249   protected ReplicationSinkService replicationSinkHandler;
250 
251   // Compactions
252   public CompactSplitThread compactSplitThread;
253 
254   /**
255    * Map of regions currently being served by this region server. Key is the
256    * encoded region name.  All access should be synchronized.
257    */
258   protected final Map<String, Region> onlineRegions = new ConcurrentHashMap<String, Region>();
259 
260   /**
261    * Map of encoded region names to the DataNode locations they should be hosted on
262    * We store the value as InetSocketAddress since this is used only in HDFS
263    * API (create() that takes favored nodes as hints for placing file blocks).
264    * We could have used ServerName here as the value class, but we'd need to
265    * convert it to InetSocketAddress at some point before the HDFS API call, and
266    * it seems a bit weird to store ServerName since ServerName refers to RegionServers
267    * and here we really mean DataNode locations.
268    */
269   protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
270       new ConcurrentHashMap<String, InetSocketAddress[]>();
271 
272   /**
273    * Set of regions currently being in recovering state which means it can accept writes(edits from
274    * previous failed region server) but not reads. A recovering region is also an online region.
275    */
276   protected final Map<String, Region> recoveringRegions = Collections
277       .synchronizedMap(new HashMap<String, Region>());
278 
279   // Leases
280   protected Leases leases;
281 
282   // Instance of the hbase executor service.
283   protected ExecutorService service;
284 
285   // If false, the file system has become unavailable
286   protected volatile boolean fsOk;
287   protected HFileSystem fs;
288 
289   // Set when a report to the master comes back with a message asking us to
290   // shutdown. Also set by call to stop when debugging or running unit tests
291   // of HRegionServer in isolation.
292   private volatile boolean stopped = false;
293 
294   // Go down hard. Used if file system becomes unavailable and also in
295   // debugging and unit tests.
296   private volatile boolean abortRequested;
297 
298   ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
299 
300   // A state before we go into stopped state.  At this stage we're closing user
301   // space regions.
302   private boolean stopping = false;
303 
304   private volatile boolean killed = false;
305 
306   protected final Configuration conf;
307 
308   private Path rootDir;
309 
310   protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
311 
312   final int numRetries;
313   protected final int threadWakeFrequency;
314   protected final int msgInterval;
315 
316   protected final int numRegionsToReport;
317 
318   // Stub to do region server status calls against the master.
319   private volatile RegionServerStatusService.BlockingInterface rssStub;
320   // RPC client. Used to make the stub above that does region server status checking.
321   RpcClient rpcClient;
322 
323   private RpcRetryingCallerFactory rpcRetryingCallerFactory;
324   private RpcControllerFactory rpcControllerFactory;
325 
326   private UncaughtExceptionHandler uncaughtExceptionHandler;
327 
328   // Info server. Default access so can be used by unit tests. REGIONSERVER
329   // is name of the webapp and the attribute name used stuffing this instance
330   // into web context.
331   protected InfoServer infoServer;
332   private JvmPauseMonitor pauseMonitor;
333 
334   /** region server process name */
335   public static final String REGIONSERVER = "regionserver";
336 
337   MetricsRegionServer metricsRegionServer;
338   private SpanReceiverHost spanReceiverHost;
339 
340   /**
341    * ChoreService used to schedule tasks that we want to run periodically
342    */
343   private final ChoreService choreService;
344 
345   /*
346    * Check for compactions requests.
347    */
348   ScheduledChore compactionChecker;
349 
350   /*
351    * Check for flushes
352    */
353   ScheduledChore periodicFlusher;
354 
355   protected volatile WALFactory walFactory;
356 
357   // WAL roller. log is protected rather than private to avoid
358   // eclipse warning when accessed by inner classes
359   final LogRoller walRoller;
360   // Lazily initialized if this RegionServer hosts a meta table.
361   final AtomicReference<LogRoller> metawalRoller = new AtomicReference<LogRoller>();
362 
363   // flag set after we're done setting up server threads
364   final AtomicBoolean online = new AtomicBoolean(false);
365 
366   // zookeeper connection and watcher
367   protected ZooKeeperWatcher zooKeeper;
368 
369   // master address tracker
370   private MasterAddressTracker masterAddressTracker;
371 
372   // Cluster Status Tracker
373   protected ClusterStatusTracker clusterStatusTracker;
374 
375   // Log Splitting Worker
376   private SplitLogWorker splitLogWorker;
377 
378   // A sleeper that sleeps for msgInterval.
379   protected final Sleeper sleeper;
380 
381   private final int operationTimeout;
382   private final int shortOperationTimeout;
383 
384   private final RegionServerAccounting regionServerAccounting;
385 
386   // Cache configuration and block cache reference
387   protected CacheConfig cacheConfig;
388 
389   /** The health check chore. */
390   private HealthCheckChore healthCheckChore;
391 
392   /** The nonce manager chore. */
393   private ScheduledChore nonceManagerChore;
394 
395   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
396 
397   /**
398    * The server name the Master sees us as.  Its made from the hostname the
399    * master passes us, port, and server startcode. Gets set after registration
400    * against  Master.
401    */
402   protected ServerName serverName;
403 
404   /*
405    * hostname specified by hostname config
406    */
407   private String useThisHostnameInstead;
408 
409   // key to the config parameter of server hostname
410   // the specification of server hostname is optional. The hostname should be resolvable from
411   // both master and region server
412   final static String RS_HOSTNAME_KEY = "hbase.regionserver.hostname";
413 
414   final static String MASTER_HOSTNAME_KEY = "hbase.master.hostname";
415 
416   /**
417    * This servers startcode.
418    */
419   protected final long startcode;
420 
421   /**
422    * Unique identifier for the cluster we are a part of.
423    */
424   private String clusterId;
425 
426   /**
427    * MX Bean for RegionServerInfo
428    */
429   private ObjectName mxBean = null;
430 
431   /**
432    * Chore to clean periodically the moved region list
433    */
434   private MovedRegionsCleaner movedRegionsCleaner;
435 
436   // chore for refreshing store files for secondary regions
437   private StorefileRefresherChore storefileRefresher;
438 
439   private RegionServerCoprocessorHost rsHost;
440 
441   private RegionServerProcedureManagerHost rspmHost;
442   
443   private RegionServerQuotaManager rsQuotaManager;
444 
445   // Table level lock manager for locking for region operations
446   protected TableLockManager tableLockManager;
447 
448   /**
449    * Nonce manager. Nonces are used to make operations like increment and append idempotent
450    * in the case where client doesn't receive the response from a successful operation and
451    * retries. We track the successful ops for some time via a nonce sent by client and handle
452    * duplicate operations (currently, by failing them; in future we might use MVCC to return
453    * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
454    * HBASE-3787) are:
455    * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
456    *   of past records. If we don't read the records, we don't read and recover the nonces.
457    *   Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
458    * - There's no WAL recovery during normal region move, so nonces will not be transfered.
459    * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
460    * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
461    * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
462    * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
463    * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
464    * latest nonce in it expired. It can also be recovered during move.
465    */
466   final ServerNonceManager nonceManager;
467 
468   private UserProvider userProvider;
469 
470   protected final RSRpcServices rpcServices;
471 
472   protected BaseCoordinatedStateManager csm;
473 
474   private final boolean useZKForAssignment;
475 
476   /**
477    * Configuration manager is used to register/deregister and notify the configuration observers
478    * when the regionserver is notified that there was a change in the on disk configs.
479    */
480   protected final ConfigurationManager configurationManager;
481 
482   /**
483    * Starts a HRegionServer at the default location.
484    * @param conf
485    * @throws IOException
486    * @throws InterruptedException
487    */
488   public HRegionServer(Configuration conf) throws IOException, InterruptedException {
489     this(conf, CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
490   }
491 
492   /**
493    * Starts a HRegionServer at the default location
494    * @param conf
495    * @param csm implementation of CoordinatedStateManager to be used
496    * @throws IOException
497    * @throws InterruptedException
498    */
499   public HRegionServer(Configuration conf, CoordinatedStateManager csm)
500       throws IOException, InterruptedException {
501     this.fsOk = true;
502     this.conf = conf;
503     checkCodecs(this.conf);
504     this.userProvider = UserProvider.instantiate(conf);
505     FSUtils.setupShortCircuitRead(this.conf);
506     // Disable usage of meta replicas in the regionserver
507     this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
508 
509     // Config'ed params
510     this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
511         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
512     this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
513     this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
514 
515     this.sleeper = new Sleeper(this.msgInterval, this);
516 
517     boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
518     this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
519 
520     this.numRegionsToReport = conf.getInt(
521       "hbase.regionserver.numregionstoreport", 10);
522 
523     this.operationTimeout = conf.getInt(
524       HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
525       HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
526 
527     this.shortOperationTimeout = conf.getInt(
528       HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
529       HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
530 
531     this.abortRequested = false;
532     this.stopped = false;
533 
534     rpcServices = createRpcServices();
535     this.startcode = System.currentTimeMillis();
536     if (this instanceof HMaster) {
537       useThisHostnameInstead = conf.get(MASTER_HOSTNAME_KEY);
538     } else {
539       useThisHostnameInstead = conf.get(RS_HOSTNAME_KEY);
540     }
541     String hostName = shouldUseThisHostnameInstead() ? useThisHostnameInstead :
542       rpcServices.isa.getHostName();
543     serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode);
544 
545     rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
546     rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
547 
548     // login the zookeeper client principal (if using security)
549     ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
550       HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
551     // login the server principal (if using secure Hadoop)
552     login(userProvider, hostName);
553     // init superusers and add the server principal (if using security)
554     // or process owner as default super user.
555     Superusers.initialize(conf);
556 
557     regionServerAccounting = new RegionServerAccounting();
558     uncaughtExceptionHandler = new UncaughtExceptionHandler() {
559       @Override
560       public void uncaughtException(Thread t, Throwable e) {
561         abort("Uncaught exception in service thread " + t.getName(), e);
562       }
563     };
564 
565     useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
566 
567     // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
568     // underlying hadoop hdfs accessors will be going against wrong filesystem
569     // (unless all is set to defaults).
570     FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
571     // Get fs instance used by this RS.  Do we use checksum verification in the hbase? If hbase
572     // checksum verification enabled, then automatically switch off hdfs checksum verification.
573     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
574     this.fs = new HFileSystem(this.conf, useHBaseChecksum);
575     this.rootDir = FSUtils.getRootDir(this.conf);
576     this.tableDescriptors = new FSTableDescriptors(
577       this.conf, this.fs, this.rootDir, !canUpdateTableDescriptor(), false);
578 
579     service = new ExecutorService(getServerName().toShortString());
580     spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
581 
582     // Some unit tests don't need a cluster, so no zookeeper at all
583     if (!conf.getBoolean("hbase.testing.nocluster", false)) {
584       // Open connection to zookeeper and set primary watcher
585       zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
586         rpcServices.isa.getPort(), this, canCreateBaseZNode());
587 
588       this.csm = (BaseCoordinatedStateManager) csm;
589       this.csm.initialize(this);
590       this.csm.start();
591 
592       tableLockManager = TableLockManager.createTableLockManager(
593         conf, zooKeeper, serverName);
594 
595       masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
596       masterAddressTracker.start();
597 
598       clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
599       clusterStatusTracker.start();
600     }
601     this.configurationManager = new ConfigurationManager();
602 
603     rpcServices.start();
604     putUpWebUI();
605     this.walRoller = new LogRoller(this, this);
606     this.choreService = new ChoreService(getServerName().toString(), true);
607 
608     if (!SystemUtils.IS_OS_WINDOWS) {
609       Signal.handle(new Signal("HUP"), new SignalHandler() {
610         public void handle(Signal signal) {
611           getConfiguration().reloadConfiguration();
612           configurationManager.notifyAllObservers(getConfiguration());
613         }
614       });
615     }
616   }
617 
618   /*
619    * Returns true if configured hostname should be used
620    */
621   protected boolean shouldUseThisHostnameInstead() {
622     return useThisHostnameInstead != null && !useThisHostnameInstead.isEmpty();
623   }
624 
625   protected void login(UserProvider user, String host) throws IOException {
626     user.login("hbase.regionserver.keytab.file",
627       "hbase.regionserver.kerberos.principal", host);
628   }
629 
630   protected void waitForMasterActive(){
631   }
632 
633   protected String getProcessName() {
634     return REGIONSERVER;
635   }
636 
637   protected boolean canCreateBaseZNode() {
638     return false;
639   }
640 
641   protected boolean canUpdateTableDescriptor() {
642     return false;
643   }
644 
645   protected RSRpcServices createRpcServices() throws IOException {
646     return new RSRpcServices(this);
647   }
648 
649   protected void configureInfoServer() {
650     infoServer.addServlet("rs-status", "/rs-status", RSStatusServlet.class);
651     infoServer.setAttribute(REGIONSERVER, this);
652   }
653 
654   protected Class<? extends HttpServlet> getDumpServlet() {
655     return RSDumpServlet.class;
656   }
657 
658   @Override
659   public boolean registerService(Service instance) {
660     /*
661      * No stacking of instances is allowed for a single service name
662      */
663     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
664     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
665       LOG.error("Coprocessor service " + serviceDesc.getFullName()
666           + " already registered, rejecting request from " + instance);
667       return false;
668     }
669 
670     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
671     if (LOG.isDebugEnabled()) {
672       LOG.debug("Registered regionserver coprocessor service: service="+serviceDesc.getFullName());
673     }
674     return true;
675   }
676 
677   /**
678    * Create a 'smarter' HConnection, one that is capable of by-passing RPC if the request is to
679    * the local server. Safe to use going to local or remote server.
680    * Create this instance in a method can be intercepted and mocked in tests.
681    * @throws IOException
682    */
683   @VisibleForTesting
684   protected ClusterConnection createClusterConnection() throws IOException {
685     // Create a cluster connection that when appropriate, can short-circuit and go directly to the
686     // local server if the request is to the local server bypassing RPC. Can be used for both local
687     // and remote invocations.
688     return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(),
689       serverName, rpcServices, rpcServices);
690   }
691 
692   /**
693    * Run test on configured codecs to make sure supporting libs are in place.
694    * @param c
695    * @throws IOException
696    */
697   private static void checkCodecs(final Configuration c) throws IOException {
698     // check to see if the codec list is available:
699     String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
700     if (codecs == null) return;
701     for (String codec : codecs) {
702       if (!CompressionTest.testCompression(codec)) {
703         throw new IOException("Compression codec " + codec +
704           " not supported, aborting RS construction");
705       }
706     }
707   }
708 
709   public String getClusterId() {
710     return this.clusterId;
711   }
712 
713   /**
714    * Setup our cluster connection if not already initialized.
715    * @throws IOException
716    */
717   protected synchronized void setupClusterConnection() throws IOException {
718     if (clusterConnection == null) {
719       clusterConnection = createClusterConnection();
720       metaTableLocator = new MetaTableLocator();
721     }
722   }
723 
724   /**
725    * All initialization needed before we go register with Master.
726    *
727    * @throws IOException
728    * @throws InterruptedException
729    */
730   private void preRegistrationInitialization(){
731     try {
732       setupClusterConnection();
733 
734       // Health checker thread.
735       if (isHealthCheckerConfigured()) {
736         int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
737           HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
738         healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
739       }
740       this.pauseMonitor = new JvmPauseMonitor(conf);
741       pauseMonitor.start();
742 
743       initializeZooKeeper();
744       if (!isStopped() && !isAborted()) {
745         initializeThreads();
746       }
747     } catch (Throwable t) {
748       // Call stop if error or process will stick around for ever since server
749       // puts up non-daemon threads.
750       this.rpcServices.stop();
751       abort("Initialization of RS failed.  Hence aborting RS.", t);
752     }
753   }
754 
755   /**
756    * Bring up connection to zk ensemble and then wait until a master for this
757    * cluster and then after that, wait until cluster 'up' flag has been set.
758    * This is the order in which master does things.
759    * Finally open long-living server short-circuit connection.
760    * @throws IOException
761    * @throws InterruptedException
762    */
763   private void initializeZooKeeper() throws IOException, InterruptedException {
764     // Create the master address tracker, register with zk, and start it.  Then
765     // block until a master is available.  No point in starting up if no master
766     // running.
767     blockAndCheckIfStopped(this.masterAddressTracker);
768 
769     // Wait on cluster being up.  Master will set this flag up in zookeeper
770     // when ready.
771     blockAndCheckIfStopped(this.clusterStatusTracker);
772 
773     // Retrieve clusterId
774     // Since cluster status is now up
775     // ID should have already been set by HMaster
776     try {
777       clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
778       if (clusterId == null) {
779         this.abort("Cluster ID has not been set");
780       }
781       LOG.info("ClusterId : "+clusterId);
782     } catch (KeeperException e) {
783       this.abort("Failed to retrieve Cluster ID",e);
784     }
785 
786     // In case colocated master, wait here till it's active.
787     // So backup masters won't start as regionservers.
788     // This is to avoid showing backup masters as regionservers
789     // in master web UI, or assigning any region to them.
790     waitForMasterActive();
791     if (isStopped() || isAborted()) {
792       return; // No need for further initialization
793     }
794 
795     // watch for snapshots and other procedures
796     try {
797       rspmHost = new RegionServerProcedureManagerHost();
798       rspmHost.loadProcedures(conf);
799       rspmHost.initialize(this);
800     } catch (KeeperException e) {
801       this.abort("Failed to reach zk cluster when creating procedure handler.", e);
802     }
803     // register watcher for recovering regions
804     this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
805   }
806 
807   /**
808    * Utilty method to wait indefinitely on a znode availability while checking
809    * if the region server is shut down
810    * @param tracker znode tracker to use
811    * @throws IOException any IO exception, plus if the RS is stopped
812    * @throws InterruptedException
813    */
814   private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker)
815       throws IOException, InterruptedException {
816     while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
817       if (this.stopped) {
818         throw new IOException("Received the shutdown message while waiting.");
819       }
820     }
821   }
822 
823   /**
824    * @return False if cluster shutdown in progress
825    */
826   private boolean isClusterUp() {
827     return clusterStatusTracker != null && clusterStatusTracker.isClusterUp();
828   }
829 
830   private void initializeThreads() throws IOException {
831     // Cache flushing thread.
832     this.cacheFlusher = new MemStoreFlusher(conf, this);
833 
834     // Compaction thread
835     this.compactSplitThread = new CompactSplitThread(this);
836 
837     // Background thread to check for compactions; needed if region has not gotten updates
838     // in a while. It will take care of not checking too frequently on store-by-store basis.
839     this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
840     this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
841     this.leases = new Leases(this.threadWakeFrequency);
842 
843     // Create the thread to clean the moved regions list
844     movedRegionsCleaner = MovedRegionsCleaner.create(this);
845 
846     if (this.nonceManager != null) {
847       // Create the scheduled chore that cleans up nonces.
848       nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
849     }
850 
851     // Setup the Quota Manager
852     rsQuotaManager = new RegionServerQuotaManager(this);
853     
854     // Setup RPC client for master communication
855     rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
856         rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
857 
858     boolean onlyMetaRefresh = false;
859     int storefileRefreshPeriod = conf.getInt(
860         StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
861       , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
862     if (storefileRefreshPeriod == 0) {
863       storefileRefreshPeriod = conf.getInt(
864           StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
865           StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
866       onlyMetaRefresh = true;
867     }
868     if (storefileRefreshPeriod > 0) {
869       this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
870           onlyMetaRefresh, this, this);
871     }
872     registerConfigurationObservers();
873   }
874 
875   private void registerConfigurationObservers() {
876     // Registering the compactSplitThread object with the ConfigurationManager.
877     configurationManager.registerObserver(this.compactSplitThread);
878     configurationManager.registerObserver(this.rpcServices);
879   }
880 
881   /**
882    * The HRegionServer sticks in this loop until closed.
883    */
884   @Override
885   public void run() {
886     try {
887       // Do pre-registration initializations; zookeeper, lease threads, etc.
888       preRegistrationInitialization();
889     } catch (Throwable e) {
890       abort("Fatal exception during initialization", e);
891     }
892 
893     try {
894       if (!isStopped() && !isAborted()) {
895         ShutdownHook.install(conf, fs, this, Thread.currentThread());
896         // Set our ephemeral znode up in zookeeper now we have a name.
897         createMyEphemeralNode();
898         // Initialize the RegionServerCoprocessorHost now that our ephemeral
899         // node was created, in case any coprocessors want to use ZooKeeper
900         this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
901       }
902 
903       // Try and register with the Master; tell it we are here.  Break if
904       // server is stopped or the clusterup flag is down or hdfs went wacky.
905       while (keepLooping()) {
906         RegionServerStartupResponse w = reportForDuty();
907         if (w == null) {
908           LOG.warn("reportForDuty failed; sleeping and then retrying.");
909           this.sleeper.sleep();
910         } else {
911           handleReportForDutyResponse(w);
912           break;
913         }
914       }
915 
916       if (!isStopped() && isHealthy()){
917         // start the snapshot handler and other procedure handlers,
918         // since the server is ready to run
919         rspmHost.start();
920       }
921       
922       // Start the Quota Manager
923       if (this.rsQuotaManager != null) {
924         rsQuotaManager.start(getRpcServer().getScheduler());
925       }
926 
927       // We registered with the Master.  Go into run mode.
928       long lastMsg = System.currentTimeMillis();
929       long oldRequestCount = -1;
930       // The main run loop.
931       while (!isStopped() && isHealthy()) {
932         if (!isClusterUp()) {
933           if (isOnlineRegionsEmpty()) {
934             stop("Exiting; cluster shutdown set and not carrying any regions");
935           } else if (!this.stopping) {
936             this.stopping = true;
937             LOG.info("Closing user regions");
938             closeUserRegions(this.abortRequested);
939           } else if (this.stopping) {
940             boolean allUserRegionsOffline = areAllUserRegionsOffline();
941             if (allUserRegionsOffline) {
942               // Set stopped if no more write requests tp meta tables
943               // since last time we went around the loop.  Any open
944               // meta regions will be closed on our way out.
945               if (oldRequestCount == getWriteRequestCount()) {
946                 stop("Stopped; only catalog regions remaining online");
947                 break;
948               }
949               oldRequestCount = getWriteRequestCount();
950             } else {
951               // Make sure all regions have been closed -- some regions may
952               // have not got it because we were splitting at the time of
953               // the call to closeUserRegions.
954               closeUserRegions(this.abortRequested);
955             }
956             LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
957           }
958         }
959         long now = System.currentTimeMillis();
960         if ((now - lastMsg) >= msgInterval) {
961           tryRegionServerReport(lastMsg, now);
962           lastMsg = System.currentTimeMillis();
963         }
964         if (!isStopped() && !isAborted()) {
965           this.sleeper.sleep();
966         }
967       } // for
968     } catch (Throwable t) {
969       if (!rpcServices.checkOOME(t)) {
970         String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
971         abort(prefix + t.getMessage(), t);
972       }
973     }
974     // Run shutdown.
975     if (mxBean != null) {
976       MBeanUtil.unregisterMBean(mxBean);
977       mxBean = null;
978     }
979     if (this.leases != null) this.leases.closeAfterLeasesExpire();
980     if (this.splitLogWorker != null) {
981       splitLogWorker.stop();
982     }
983     if (this.infoServer != null) {
984       LOG.info("Stopping infoServer");
985       try {
986         this.infoServer.stop();
987       } catch (Exception e) {
988         LOG.error("Failed to stop infoServer", e);
989       }
990     }
991     // Send cache a shutdown.
992     if (cacheConfig != null && cacheConfig.isBlockCacheEnabled()) {
993       cacheConfig.getBlockCache().shutdown();
994     }
995 
996     if (movedRegionsCleaner != null) {
997       movedRegionsCleaner.stop("Region Server stopping");
998     }
999 
1000     // Send interrupts to wake up threads if sleeping so they notice shutdown.
1001     // TODO: Should we check they are alive? If OOME could have exited already
1002     if (this.hMemManager != null) this.hMemManager.stop();
1003     if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
1004     if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
1005     if (this.compactionChecker != null) this.compactionChecker.cancel(true);
1006     if (this.healthCheckChore != null) this.healthCheckChore.cancel(true);
1007     if (this.nonceManagerChore != null) this.nonceManagerChore.cancel(true);
1008     if (this.storefileRefresher != null) this.storefileRefresher.cancel(true);
1009     sendShutdownInterrupt();
1010 
1011     // Stop the quota manager
1012     if (rsQuotaManager != null) {
1013       rsQuotaManager.stop();
1014     }
1015     
1016     // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
1017     if (rspmHost != null) {
1018       rspmHost.stop(this.abortRequested || this.killed);
1019     }
1020 
1021     if (this.killed) {
1022       // Just skip out w/o closing regions.  Used when testing.
1023     } else if (abortRequested) {
1024       if (this.fsOk) {
1025         closeUserRegions(abortRequested); // Don't leave any open file handles
1026       }
1027       LOG.info("aborting server " + this.serverName);
1028     } else {
1029       closeUserRegions(abortRequested);
1030       LOG.info("stopping server " + this.serverName);
1031     }
1032 
1033     // so callers waiting for meta without timeout can stop
1034     if (this.metaTableLocator != null) this.metaTableLocator.stop();
1035     if (this.clusterConnection != null && !clusterConnection.isClosed()) {
1036       try {
1037         this.clusterConnection.close();
1038       } catch (IOException e) {
1039         // Although the {@link Closeable} interface throws an {@link
1040         // IOException}, in reality, the implementation would never do that.
1041         LOG.warn("Attempt to close server's short circuit HConnection failed.", e);
1042       }
1043     }
1044 
1045     // Closing the compactSplit thread before closing meta regions
1046     if (!this.killed && containsMetaTableRegions()) {
1047       if (!abortRequested || this.fsOk) {
1048         if (this.compactSplitThread != null) {
1049           this.compactSplitThread.join();
1050           this.compactSplitThread = null;
1051         }
1052         closeMetaTableRegions(abortRequested);
1053       }
1054     }
1055 
1056     if (!this.killed && this.fsOk) {
1057       waitOnAllRegionsToClose(abortRequested);
1058       LOG.info("stopping server " + this.serverName +
1059         "; all regions closed.");
1060     }
1061 
1062     //fsOk flag may be changed when closing regions throws exception.
1063     if (this.fsOk) {
1064       shutdownWAL(!abortRequested);
1065     }
1066 
1067     // Make sure the proxy is down.
1068     if (this.rssStub != null) {
1069       this.rssStub = null;
1070     }
1071     if (this.rpcClient != null) {
1072       this.rpcClient.close();
1073     }
1074     if (this.leases != null) {
1075       this.leases.close();
1076     }
1077     if (this.pauseMonitor != null) {
1078       this.pauseMonitor.stop();
1079     }
1080 
1081     if (!killed) {
1082       stopServiceThreads();
1083     }
1084 
1085     if (this.rpcServices != null) {
1086       this.rpcServices.stop();
1087     }
1088 
1089     try {
1090       deleteMyEphemeralNode();
1091     } catch (KeeperException.NoNodeException nn) {
1092     } catch (KeeperException e) {
1093       LOG.warn("Failed deleting my ephemeral node", e);
1094     }
1095     // We may have failed to delete the znode at the previous step, but
1096     //  we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1097     ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1098 
1099     if (this.zooKeeper != null) {
1100       this.zooKeeper.close();
1101     }
1102     LOG.info("stopping server " + this.serverName +
1103       "; zookeeper connection closed.");
1104 
1105     LOG.info(Thread.currentThread().getName() + " exiting");
1106   }
1107 
1108   private boolean containsMetaTableRegions() {
1109     return onlineRegions.containsKey(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1110   }
1111 
1112   private boolean areAllUserRegionsOffline() {
1113     if (getNumberOfOnlineRegions() > 2) return false;
1114     boolean allUserRegionsOffline = true;
1115     for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
1116       if (!e.getValue().getRegionInfo().isMetaTable()) {
1117         allUserRegionsOffline = false;
1118         break;
1119       }
1120     }
1121     return allUserRegionsOffline;
1122   }
1123 
1124   /**
1125    * @return Current write count for all online regions.
1126    */
1127   private long getWriteRequestCount() {
1128     long writeCount = 0;
1129     for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
1130       writeCount += e.getValue().getWriteRequestsCount();
1131     }
1132     return writeCount;
1133   }
1134 
1135   @VisibleForTesting
1136   protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1137   throws IOException {
1138     RegionServerStatusService.BlockingInterface rss = rssStub;
1139     if (rss == null) {
1140       // the current server could be stopping.
1141       return;
1142     }
1143     ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1144     try {
1145       RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1146       ServerName sn = ServerName.parseVersionedServerName(
1147         this.serverName.getVersionedBytes());
1148       request.setServer(ProtobufUtil.toServerName(sn));
1149       request.setLoad(sl);
1150       rss.regionServerReport(null, request.build());
1151     } catch (ServiceException se) {
1152       IOException ioe = ProtobufUtil.getRemoteException(se);
1153       if (ioe instanceof YouAreDeadException) {
1154         // This will be caught and handled as a fatal error in run()
1155         throw ioe;
1156       }
1157       if (rssStub == rss) {
1158         rssStub = null;
1159       }
1160       // Couldn't connect to the master, get location from zk and reconnect
1161       // Method blocks until new master is found or we are stopped
1162       createRegionServerStatusStub(true);
1163     }
1164   }
1165 
1166   ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1167       throws IOException {
1168     // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1169     // per second, and other metrics  As long as metrics are part of ServerLoad it's best to use
1170     // the wrapper to compute those numbers in one place.
1171     // In the long term most of these should be moved off of ServerLoad and the heart beat.
1172     // Instead they should be stored in an HBase table so that external visibility into HBase is
1173     // improved; Additionally the load balancer will be able to take advantage of a more complete
1174     // history.
1175     MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1176     Collection<Region> regions = getOnlineRegionsLocalContext();
1177     MemoryUsage memory = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
1178 
1179     ClusterStatusProtos.ServerLoad.Builder serverLoad =
1180       ClusterStatusProtos.ServerLoad.newBuilder();
1181     serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1182     serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
1183     serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024));
1184     serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024));
1185     Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1186     Builder coprocessorBuilder = Coprocessor.newBuilder();
1187     for (String coprocessor : coprocessors) {
1188       serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1189     }
1190     RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1191     RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1192     for (Region region : regions) {
1193       if (region.getCoprocessorHost() != null) {
1194         Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1195         Iterator<String> iterator = regionCoprocessors.iterator();
1196         while (iterator.hasNext()) {
1197           serverLoad.addCoprocessors(coprocessorBuilder.setName(iterator.next()).build());
1198         }
1199       }
1200       serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1201       for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1202           .getCoprocessors()) {
1203         serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1204       }
1205     }
1206     serverLoad.setReportStartTime(reportStartTime);
1207     serverLoad.setReportEndTime(reportEndTime);
1208     if (this.infoServer != null) {
1209       serverLoad.setInfoServerPort(this.infoServer.getPort());
1210     } else {
1211       serverLoad.setInfoServerPort(-1);
1212     }
1213 
1214     // for the replicationLoad purpose. Only need to get from one service
1215     // either source or sink will get the same info
1216     ReplicationSourceService rsources = getReplicationSourceService();
1217 
1218     if (rsources != null) {
1219       // always refresh first to get the latest value
1220       ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
1221       if (rLoad != null) {
1222         serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1223         for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) {
1224           serverLoad.addReplLoadSource(rLS);
1225         }
1226       }
1227     }
1228 
1229     return serverLoad.build();
1230   }
1231 
1232   String getOnlineRegionsAsPrintableString() {
1233     StringBuilder sb = new StringBuilder();
1234     for (Region r: this.onlineRegions.values()) {
1235       if (sb.length() > 0) sb.append(", ");
1236       sb.append(r.getRegionInfo().getEncodedName());
1237     }
1238     return sb.toString();
1239   }
1240 
1241   /**
1242    * Wait on regions close.
1243    */
1244   private void waitOnAllRegionsToClose(final boolean abort) {
1245     // Wait till all regions are closed before going out.
1246     int lastCount = -1;
1247     long previousLogTime = 0;
1248     Set<String> closedRegions = new HashSet<String>();
1249     boolean interrupted = false;
1250     try {
1251       while (!isOnlineRegionsEmpty()) {
1252         int count = getNumberOfOnlineRegions();
1253         // Only print a message if the count of regions has changed.
1254         if (count != lastCount) {
1255           // Log every second at most
1256           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1257             previousLogTime = System.currentTimeMillis();
1258             lastCount = count;
1259             LOG.info("Waiting on " + count + " regions to close");
1260             // Only print out regions still closing if a small number else will
1261             // swamp the log.
1262             if (count < 10 && LOG.isDebugEnabled()) {
1263               LOG.debug(this.onlineRegions);
1264             }
1265           }
1266         }
1267         // Ensure all user regions have been sent a close. Use this to
1268         // protect against the case where an open comes in after we start the
1269         // iterator of onlineRegions to close all user regions.
1270         for (Map.Entry<String, Region> e : this.onlineRegions.entrySet()) {
1271           HRegionInfo hri = e.getValue().getRegionInfo();
1272           if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1273               && !closedRegions.contains(hri.getEncodedName())) {
1274             closedRegions.add(hri.getEncodedName());
1275             // Don't update zk with this close transition; pass false.
1276             closeRegionIgnoreErrors(hri, abort);
1277               }
1278         }
1279         // No regions in RIT, we could stop waiting now.
1280         if (this.regionsInTransitionInRS.isEmpty()) {
1281           if (!isOnlineRegionsEmpty()) {
1282             LOG.info("We were exiting though online regions are not empty," +
1283                 " because some regions failed closing");
1284           }
1285           break;
1286         }
1287         if (sleep(200)) {
1288           interrupted = true;
1289         }
1290       }
1291     } finally {
1292       if (interrupted) {
1293         Thread.currentThread().interrupt();
1294       }
1295     }
1296   }
1297 
1298   private boolean sleep(long millis) {
1299     boolean interrupted = false;
1300     try {
1301       Thread.sleep(millis);
1302     } catch (InterruptedException e) {
1303       LOG.warn("Interrupted while sleeping");
1304       interrupted = true;
1305     }
1306     return interrupted;
1307   }
1308 
1309   private void shutdownWAL(final boolean close) {
1310     if (this.walFactory != null) {
1311       try {
1312         if (close) {
1313           walFactory.close();
1314         } else {
1315           walFactory.shutdown();
1316         }
1317       } catch (Throwable e) {
1318         e = RemoteExceptionHandler.checkThrowable(e);
1319         LOG.error("Shutdown / close of WAL failed: " + e);
1320         LOG.debug("Shutdown / close exception details:", e);
1321       }
1322     }
1323   }
1324 
1325   /*
1326    * Run init. Sets up wal and starts up all server threads.
1327    *
1328    * @param c Extra configuration.
1329    */
1330   protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1331   throws IOException {
1332     try {
1333       for (NameStringPair e : c.getMapEntriesList()) {
1334         String key = e.getName();
1335         // The hostname the master sees us as.
1336         if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1337           String hostnameFromMasterPOV = e.getValue();
1338           this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1339             rpcServices.isa.getPort(), this.startcode);
1340           if (shouldUseThisHostnameInstead() &&
1341               !hostnameFromMasterPOV.equals(useThisHostnameInstead)) {
1342             String msg = "Master passed us a different hostname to use; was=" +
1343                 this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV;
1344             LOG.error(msg);
1345             throw new IOException(msg);
1346           }
1347           if (!shouldUseThisHostnameInstead() &&
1348               !hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) {
1349             String msg = "Master passed us a different hostname to use; was=" +
1350                 rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV;
1351             LOG.error(msg);
1352           }
1353           continue;
1354         }
1355         String value = e.getValue();
1356         if (LOG.isDebugEnabled()) {
1357           LOG.info("Config from master: " + key + "=" + value);
1358         }
1359         this.conf.set(key, value);
1360       }
1361 
1362       // hack! Maps DFSClient => RegionServer for logs.  HDFS made this
1363       // config param for task trackers, but we can piggyback off of it.
1364       if (this.conf.get("mapreduce.task.attempt.id") == null) {
1365         this.conf.set("mapreduce.task.attempt.id", "hb_rs_" +
1366           this.serverName.toString());
1367       }
1368 
1369       // Save it in a file, this will allow to see if we crash
1370       ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1371 
1372       this.cacheConfig = new CacheConfig(conf);
1373       this.walFactory = setupWALAndReplication();
1374       // Init in here rather than in constructor after thread name has been set
1375       this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
1376 
1377       startServiceThreads();
1378       startHeapMemoryManager();
1379       LOG.info("Serving as " + this.serverName +
1380         ", RpcServer on " + rpcServices.isa +
1381         ", sessionid=0x" +
1382         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1383 
1384       // Wake up anyone waiting for this server to online
1385       synchronized (online) {
1386         online.set(true);
1387         online.notifyAll();
1388       }
1389     } catch (Throwable e) {
1390       stop("Failed initialization");
1391       throw convertThrowableToIOE(cleanup(e, "Failed init"),
1392           "Region server startup failed");
1393     } finally {
1394       sleeper.skipSleepCycle();
1395     }
1396   }
1397 
1398   private void startHeapMemoryManager() {
1399     this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher,
1400         this, this.regionServerAccounting);
1401     if (this.hMemManager != null) {
1402       this.hMemManager.start(getChoreService());
1403     }
1404   }
1405 
1406   private void createMyEphemeralNode() throws KeeperException, IOException {
1407     RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1408     rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1409     rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1410     byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1411     ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper,
1412       getMyEphemeralNodePath(), data);
1413   }
1414 
1415   private void deleteMyEphemeralNode() throws KeeperException {
1416     ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1417   }
1418 
1419   @Override
1420   public RegionServerAccounting getRegionServerAccounting() {
1421     return regionServerAccounting;
1422   }
1423 
1424   @Override
1425   public TableLockManager getTableLockManager() {
1426     return tableLockManager;
1427   }
1428 
1429   /*
1430    * @param r Region to get RegionLoad for.
1431    * @param regionLoadBldr the RegionLoad.Builder, can be null
1432    * @param regionSpecifier the RegionSpecifier.Builder, can be null
1433    * @return RegionLoad instance.
1434    *
1435    * @throws IOException
1436    */
1437   private RegionLoad createRegionLoad(final Region r, RegionLoad.Builder regionLoadBldr,
1438       RegionSpecifier.Builder regionSpecifier) throws IOException {
1439     byte[] name = r.getRegionInfo().getRegionName();
1440     int stores = 0;
1441     int storefiles = 0;
1442     int storeUncompressedSizeMB = 0;
1443     int storefileSizeMB = 0;
1444     int memstoreSizeMB = (int) (r.getMemstoreSize() / 1024 / 1024);
1445     int storefileIndexSizeMB = 0;
1446     int rootIndexSizeKB = 0;
1447     int totalStaticIndexSizeKB = 0;
1448     int totalStaticBloomSizeKB = 0;
1449     long totalCompactingKVs = 0;
1450     long currentCompactedKVs = 0;
1451     List<Store> storeList = r.getStores();
1452     stores += storeList.size();
1453     for (Store store : storeList) {
1454       storefiles += store.getStorefilesCount();
1455       storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024);
1456       storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1457       storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
1458       CompactionProgress progress = store.getCompactionProgress();
1459       if (progress != null) {
1460         totalCompactingKVs += progress.totalCompactingKVs;
1461         currentCompactedKVs += progress.currentCompactedKVs;
1462       }
1463       rootIndexSizeKB += (int) (store.getStorefilesIndexSize() / 1024);
1464       totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024);
1465       totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024);
1466     }
1467 
1468     float dataLocality =
1469         r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname());
1470     if (regionLoadBldr == null) {
1471       regionLoadBldr = RegionLoad.newBuilder();
1472     }
1473     if (regionSpecifier == null) {
1474       regionSpecifier = RegionSpecifier.newBuilder();
1475     }
1476     regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1477     regionSpecifier.setValue(ByteStringer.wrap(name));
1478     regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1479       .setStores(stores)
1480       .setStorefiles(storefiles)
1481       .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1482       .setStorefileSizeMB(storefileSizeMB)
1483       .setMemstoreSizeMB(memstoreSizeMB)
1484       .setStorefileIndexSizeMB(storefileIndexSizeMB)
1485       .setRootIndexSizeKB(rootIndexSizeKB)
1486       .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1487       .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1488       .setReadRequestsCount(r.getReadRequestsCount())
1489       .setWriteRequestsCount(r.getWriteRequestsCount())
1490       .setTotalCompactingKVs(totalCompactingKVs)
1491       .setCurrentCompactedKVs(currentCompactedKVs)
1492       .setDataLocality(dataLocality)
1493       .setLastMajorCompactionTs(r.getOldestHfileTs(true));
1494     ((HRegion)r).setCompleteSequenceId(regionLoadBldr);
1495 
1496     return regionLoadBldr.build();
1497   }
1498 
1499   /**
1500    * @param encodedRegionName
1501    * @return An instance of RegionLoad.
1502    */
1503   public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1504     Region r = onlineRegions.get(encodedRegionName);
1505     return r != null ? createRegionLoad(r, null, null) : null;
1506   }
1507 
1508   /*
1509    * Inner class that runs on a long period checking if regions need compaction.
1510    */
1511   private static class CompactionChecker extends ScheduledChore {
1512     private final HRegionServer instance;
1513     private final int majorCompactPriority;
1514     private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1515     private long iteration = 0;
1516 
1517     CompactionChecker(final HRegionServer h, final int sleepTime,
1518         final Stoppable stopper) {
1519       super("CompactionChecker", stopper, sleepTime);
1520       this.instance = h;
1521       LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));
1522 
1523       /* MajorCompactPriority is configurable.
1524        * If not set, the compaction will use default priority.
1525        */
1526       this.majorCompactPriority = this.instance.conf.
1527         getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1528         DEFAULT_PRIORITY);
1529     }
1530 
1531     @Override
1532     protected void chore() {
1533       for (Region r : this.instance.onlineRegions.values()) {
1534         if (r == null)
1535           continue;
1536         for (Store s : r.getStores()) {
1537           try {
1538             long multiplier = s.getCompactionCheckMultiplier();
1539             assert multiplier > 0;
1540             if (iteration % multiplier != 0) continue;
1541             if (s.needsCompaction()) {
1542               // Queue a compaction. Will recognize if major is needed.
1543               this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
1544                   + " requests compaction");
1545             } else if (s.isMajorCompaction()) {
1546               if (majorCompactPriority == DEFAULT_PRIORITY
1547                   || majorCompactPriority > ((HRegion)r).getCompactPriority()) {
1548                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1549                     + " requests major compaction; use default priority", null);
1550               } else {
1551                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1552                     + " requests major compaction; use configured priority",
1553                   this.majorCompactPriority, null, null);
1554               }
1555             }
1556           } catch (IOException e) {
1557             LOG.warn("Failed major compaction check on " + r, e);
1558           }
1559         }
1560       }
1561       iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1562     }
1563   }
1564 
1565   static class PeriodicMemstoreFlusher extends ScheduledChore {
1566     final HRegionServer server;
1567     final static int RANGE_OF_DELAY = 5 * 60 * 1000; // 5 min in milliseconds
1568     final static int MIN_DELAY_TIME = 0; // millisec
1569     public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1570       super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval);
1571       this.server = server;
1572     }
1573 
1574     @Override
1575     protected void chore() {
1576       final StringBuffer whyFlush = new StringBuffer();
1577       for (Region r : this.server.onlineRegions.values()) {
1578         if (r == null) continue;
1579         if (((HRegion)r).shouldFlush(whyFlush)) {
1580           FlushRequester requester = server.getFlushRequester();
1581           if (requester != null) {
1582             long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
1583             LOG.info(getName() + " requesting flush of " +
1584               r.getRegionInfo().getRegionNameAsString() + " because " +
1585               whyFlush.toString() +
1586               " after random delay " + randomDelay + "ms");
1587             //Throttle the flushes by putting a delay. If we don't throttle, and there
1588             //is a balanced write-load on the regions in a table, we might end up
1589             //overwhelming the filesystem with too many flushes at once.
1590             requester.requestDelayedFlush(r, randomDelay, false);
1591           }
1592         }
1593       }
1594     }
1595   }
1596 
1597   /**
1598    * Report the status of the server. A server is online once all the startup is
1599    * completed (setting up filesystem, starting service threads, etc.). This
1600    * method is designed mostly to be useful in tests.
1601    *
1602    * @return true if online, false if not.
1603    */
1604   public boolean isOnline() {
1605     return online.get();
1606   }
1607 
1608   /**
1609    * Setup WAL log and replication if enabled.
1610    * Replication setup is done in here because it wants to be hooked up to WAL.
1611    * @return A WAL instance.
1612    * @throws IOException
1613    */
1614   private WALFactory setupWALAndReplication() throws IOException {
1615     // TODO Replication make assumptions here based on the default filesystem impl
1616     final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1617     final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString());
1618 
1619     Path logdir = new Path(rootDir, logName);
1620     if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1621     if (this.fs.exists(logdir)) {
1622       throw new RegionServerRunningException("Region server has already " +
1623         "created directory at " + this.serverName.toString());
1624     }
1625 
1626     // Instantiate replication manager if replication enabled.  Pass it the
1627     // log directories.
1628     createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
1629 
1630     // listeners the wal factory will add to wals it creates.
1631     final List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1632     listeners.add(new MetricsWAL());
1633     if (this.replicationSourceHandler != null &&
1634         this.replicationSourceHandler.getWALActionsListener() != null) {
1635       // Replication handler is an implementation of WALActionsListener.
1636       listeners.add(this.replicationSourceHandler.getWALActionsListener());
1637     }
1638 
1639     return new WALFactory(conf, listeners, serverName.toString());
1640   }
1641 
1642   /**
1643    * We initialize the roller for the wal that handles meta lazily
1644    * since we don't know if this regionserver will handle it. All calls to
1645    * this method return a reference to the that same roller. As newly referenced
1646    * meta regions are brought online, they will be offered to the roller for maintenance.
1647    * As a part of that registration process, the roller will add itself as a
1648    * listener on the wal.
1649    */
1650   protected LogRoller ensureMetaWALRoller() {
1651     // Using a tmp log roller to ensure metaLogRoller is alive once it is not
1652     // null
1653     LogRoller roller = metawalRoller.get();
1654     if (null == roller) {
1655       LogRoller tmpLogRoller = new LogRoller(this, this);
1656       String n = Thread.currentThread().getName();
1657       Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
1658           n + "-MetaLogRoller", uncaughtExceptionHandler);
1659       if (metawalRoller.compareAndSet(null, tmpLogRoller)) {
1660         roller = tmpLogRoller;
1661       } else {
1662         // Another thread won starting the roller
1663         Threads.shutdown(tmpLogRoller.getThread());
1664         roller = metawalRoller.get();
1665       }
1666     }
1667     return roller;
1668   }
1669 
1670   public MetricsRegionServer getRegionServerMetrics() {
1671     return this.metricsRegionServer;
1672   }
1673 
1674   /**
1675    * @return Master address tracker instance.
1676    */
1677   public MasterAddressTracker getMasterAddressTracker() {
1678     return this.masterAddressTracker;
1679   }
1680 
1681   /*
1682    * Start maintenance Threads, Server, Worker and lease checker threads.
1683    * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1684    * get an unhandled exception. We cannot set the handler on all threads.
1685    * Server's internal Listener thread is off limits. For Server, if an OOME, it
1686    * waits a while then retries. Meantime, a flush or a compaction that tries to
1687    * run should trigger same critical condition and the shutdown will run. On
1688    * its way out, this server will shut down Server. Leases are sort of
1689    * inbetween. It has an internal thread that while it inherits from Chore, it
1690    * keeps its own internal stop mechanism so needs to be stopped by this
1691    * hosting server. Worker logs the exception and exits.
1692    */
1693   private void startServiceThreads() throws IOException {
1694     // Start executor services
1695     this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
1696       conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1697     this.service.startExecutorService(ExecutorType.RS_OPEN_META,
1698       conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1699     this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1700       conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1701     this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
1702       conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1703     if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1704       this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
1705         conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
1706     }
1707     this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
1708        "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
1709 
1710     if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1711       this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
1712         conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1713           conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
1714     }
1715 
1716     Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
1717         uncaughtExceptionHandler);
1718     this.cacheFlusher.start(uncaughtExceptionHandler);
1719 
1720     if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);
1721     if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher);
1722     if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore);
1723     if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore);
1724     if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher);
1725     if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner);
1726 
1727     // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1728     // an unhandled exception, it will just exit.
1729     Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker",
1730       uncaughtExceptionHandler);
1731 
1732     if (this.replicationSourceHandler == this.replicationSinkHandler &&
1733         this.replicationSourceHandler != null) {
1734       this.replicationSourceHandler.startReplicationService();
1735     } else {
1736       if (this.replicationSourceHandler != null) {
1737         this.replicationSourceHandler.startReplicationService();
1738       }
1739       if (this.replicationSinkHandler != null) {
1740         this.replicationSinkHandler.startReplicationService();
1741       }
1742     }
1743 
1744     // Create the log splitting worker and start it
1745     // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
1746     // quite a while inside HConnection layer. The worker won't be available for other
1747     // tasks even after current task is preempted after a split task times out.
1748     Configuration sinkConf = HBaseConfiguration.create(conf);
1749     sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1750       conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
1751     sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1752       conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
1753     sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
1754     this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory);
1755     splitLogWorker.start();
1756   }
1757 
1758   /**
1759    * Puts up the webui.
1760    * @return Returns final port -- maybe different from what we started with.
1761    * @throws IOException
1762    */
1763   private int putUpWebUI() throws IOException {
1764     int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
1765       HConstants.DEFAULT_REGIONSERVER_INFOPORT);
1766     String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
1767 
1768     if(this instanceof HMaster) {
1769       port = conf.getInt(HConstants.MASTER_INFO_PORT,
1770           HConstants.DEFAULT_MASTER_INFOPORT);
1771       addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
1772     }
1773     // -1 is for disabling info server
1774     if (port < 0) return port;
1775 
1776     if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
1777       String msg =
1778           "Failed to start http info server. Address " + addr
1779               + " does not belong to this host. Correct configuration parameter: "
1780               + "hbase.regionserver.info.bindAddress";
1781       LOG.error(msg);
1782       throw new IOException(msg);
1783     }
1784     // check if auto port bind enabled
1785     boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
1786         false);
1787     while (true) {
1788       try {
1789         this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
1790         infoServer.addServlet("dump", "/dump", getDumpServlet());
1791         configureInfoServer();
1792         this.infoServer.start();
1793         break;
1794       } catch (BindException e) {
1795         if (!auto) {
1796           // auto bind disabled throw BindException
1797           LOG.error("Failed binding http info server to port: " + port);
1798           throw e;
1799         }
1800         // auto bind enabled, try to use another port
1801         LOG.info("Failed binding http info server to port: " + port);
1802         port++;
1803       }
1804     }
1805     port = this.infoServer.getPort();
1806     conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
1807     int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT,
1808       HConstants.DEFAULT_MASTER_INFOPORT);
1809     conf.setInt("hbase.master.info.port.orig", masterInfoPort);
1810     conf.setInt(HConstants.MASTER_INFO_PORT, port);
1811     return port;
1812   }
1813 
1814   /*
1815    * Verify that server is healthy
1816    */
1817   private boolean isHealthy() {
1818     if (!fsOk) {
1819       // File system problem
1820       return false;
1821     }
1822     // Verify that all threads are alive
1823     if (!(leases.isAlive()
1824         && cacheFlusher.isAlive() && walRoller.isAlive()
1825         && this.compactionChecker.isScheduled()
1826         && this.periodicFlusher.isScheduled())) {
1827       stop("One or more threads are no longer alive -- stop");
1828       return false;
1829     }
1830     final LogRoller metawalRoller = this.metawalRoller.get();
1831     if (metawalRoller != null && !metawalRoller.isAlive()) {
1832       stop("Meta WAL roller thread is no longer alive -- stop");
1833       return false;
1834     }
1835     return true;
1836   }
1837 
1838   private static final byte[] UNSPECIFIED_REGION = new byte[]{};
1839 
1840   @Override
1841   public WAL getWAL(HRegionInfo regionInfo) throws IOException {
1842     WAL wal;
1843     LogRoller roller = walRoller;
1844     //_ROOT_ and hbase:meta regions have separate WAL.
1845     if (regionInfo != null && regionInfo.isMetaTable() &&
1846         regionInfo.getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
1847       roller = ensureMetaWALRoller();
1848       wal = walFactory.getMetaWAL(regionInfo.getEncodedNameAsBytes());
1849     } else if (regionInfo == null) {
1850       wal = walFactory.getWAL(UNSPECIFIED_REGION);
1851     } else {
1852       wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes());
1853     }
1854     roller.addWAL(wal);
1855     return wal;
1856   }
1857 
1858   @Override
1859   public ClusterConnection getConnection() {
1860     return this.clusterConnection;
1861   }
1862 
1863   @Override
1864   public MetaTableLocator getMetaTableLocator() {
1865     return this.metaTableLocator;
1866   }
1867 
1868   @Override
1869   public void stop(final String msg) {
1870     if (!this.stopped) {
1871       try {
1872         if (this.rsHost != null) {
1873           this.rsHost.preStop(msg);
1874         }
1875         this.stopped = true;
1876         LOG.info("STOPPED: " + msg);
1877         // Wakes run() if it is sleeping
1878         sleeper.skipSleepCycle();
1879       } catch (IOException exp) {
1880         LOG.warn("The region server did not stop", exp);
1881       }
1882     }
1883   }
1884 
1885   public void waitForServerOnline(){
1886     while (!isStopped() && !isOnline()) {
1887       synchronized (online) {
1888         try {
1889           online.wait(msgInterval);
1890         } catch (InterruptedException ie) {
1891           Thread.currentThread().interrupt();
1892           break;
1893         }
1894       }
1895     }
1896   }
1897 
1898   @Override
1899   public void postOpenDeployTasks(final Region r) throws KeeperException, IOException {
1900     postOpenDeployTasks(new PostOpenDeployContext(r, -1));
1901   }
1902 
1903   @Override
1904   public void postOpenDeployTasks(final PostOpenDeployContext context)
1905       throws KeeperException, IOException {
1906     Region r = context.getRegion();
1907     long masterSystemTime = context.getMasterSystemTime();
1908     Preconditions.checkArgument(r instanceof HRegion, "r must be an HRegion");
1909     rpcServices.checkOpen();
1910     LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString());
1911     // Do checks to see if we need to compact (references or too many files)
1912     for (Store s : r.getStores()) {
1913       if (s.hasReferences() || s.needsCompaction()) {
1914        this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
1915       }
1916     }
1917     long openSeqNum = r.getOpenSeqNum();
1918     if (openSeqNum == HConstants.NO_SEQNUM) {
1919       // If we opened a region, we should have read some sequence number from it.
1920       LOG.error("No sequence number found when opening " +
1921         r.getRegionInfo().getRegionNameAsString());
1922       openSeqNum = 0;
1923     }
1924 
1925     // Update flushed sequence id of a recovering region in ZK
1926     updateRecoveringRegionLastFlushedSequenceId(r);
1927 
1928     // Update ZK, or META
1929     if (r.getRegionInfo().isMetaRegion()) {
1930       MetaTableLocator.setMetaLocation(getZooKeeper(), serverName, r.getRegionInfo().getReplicaId(),
1931          State.OPEN);
1932     } else if (useZKForAssignment) {
1933       MetaTableAccessor.updateRegionLocation(getConnection(), r.getRegionInfo(),
1934         this.serverName, openSeqNum, masterSystemTime);
1935     }
1936     if (!useZKForAssignment && !reportRegionStateTransition(new RegionStateTransitionContext(
1937         TransitionCode.OPENED, openSeqNum, masterSystemTime, r.getRegionInfo()))) {
1938       throw new IOException("Failed to report opened region to master: "
1939         + r.getRegionInfo().getRegionNameAsString());
1940     }
1941 
1942     triggerFlushInPrimaryRegion((HRegion)r);
1943 
1944     LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
1945   }
1946 
1947   @Override
1948   public boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris) {
1949     return reportRegionStateTransition(code, HConstants.NO_SEQNUM, hris);
1950   }
1951 
1952   @Override
1953   public boolean reportRegionStateTransition(
1954       TransitionCode code, long openSeqNum, HRegionInfo... hris) {
1955     return reportRegionStateTransition(
1956       new RegionStateTransitionContext(code, HConstants.NO_SEQNUM, -1, hris));
1957   }
1958 
1959   @Override
1960   public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
1961     TransitionCode code = context.getCode();
1962     long openSeqNum = context.getOpenSeqNum();
1963     HRegionInfo[] hris = context.getHris();
1964 
1965     ReportRegionStateTransitionRequest.Builder builder =
1966       ReportRegionStateTransitionRequest.newBuilder();
1967     builder.setServer(ProtobufUtil.toServerName(serverName));
1968     RegionStateTransition.Builder transition = builder.addTransitionBuilder();
1969     transition.setTransitionCode(code);
1970     if (code == TransitionCode.OPENED && openSeqNum >= 0) {
1971       transition.setOpenSeqNum(openSeqNum);
1972     }
1973     for (HRegionInfo hri: hris) {
1974       transition.addRegionInfo(HRegionInfo.convert(hri));
1975     }
1976     ReportRegionStateTransitionRequest request = builder.build();
1977     while (keepLooping()) {
1978       RegionServerStatusService.BlockingInterface rss = rssStub;
1979       try {
1980         if (rss == null) {
1981           createRegionServerStatusStub();
1982           continue;
1983         }
1984         ReportRegionStateTransitionResponse response =
1985           rss.reportRegionStateTransition(null, request);
1986         if (response.hasErrorMessage()) {
1987           LOG.info("Failed to transition " + hris[0]
1988             + " to " + code + ": " + response.getErrorMessage());
1989           return false;
1990         }
1991         return true;
1992       } catch (ServiceException se) {
1993         IOException ioe = ProtobufUtil.getRemoteException(se);
1994         LOG.info("Failed to report region transition, will retry", ioe);
1995         if (rssStub == rss) {
1996           rssStub = null;
1997         }
1998       }
1999     }
2000     return false;
2001   }
2002 
2003   /**
2004    * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2005    * block this thread. See RegionReplicaFlushHandler for details.
2006    */
2007   void triggerFlushInPrimaryRegion(final HRegion region) {
2008     if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2009       return;
2010     }
2011     if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) ||
2012         !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(
2013           region.conf)) {
2014       region.setReadsEnabled(true);
2015       return;
2016     }
2017 
2018     region.setReadsEnabled(false); // disable reads before marking the region as opened.
2019     // RegionReplicaFlushHandler might reset this.
2020 
2021     // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2022     this.service.submit(
2023       new RegionReplicaFlushHandler(this, clusterConnection,
2024         rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
2025   }
2026 
2027   @Override
2028   public RpcServerInterface getRpcServer() {
2029     return rpcServices.rpcServer;
2030   }
2031 
2032   @VisibleForTesting
2033   public RSRpcServices getRSRpcServices() {
2034     return rpcServices;
2035   }
2036 
2037   /**
2038    * Cause the server to exit without closing the regions it is serving, the log
2039    * it is using and without notifying the master. Used unit testing and on
2040    * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
2041    *
2042    * @param reason
2043    *          the reason we are aborting
2044    * @param cause
2045    *          the exception that caused the abort, or null
2046    */
2047   @Override
2048   public void abort(String reason, Throwable cause) {
2049     String msg = "ABORTING region server " + this + ": " + reason;
2050     if (cause != null) {
2051       LOG.fatal(msg, cause);
2052     } else {
2053       LOG.fatal(msg);
2054     }
2055     this.abortRequested = true;
2056     // HBASE-4014: show list of coprocessors that were loaded to help debug
2057     // regionserver crashes.Note that we're implicitly using
2058     // java.util.HashSet's toString() method to print the coprocessor names.
2059     LOG.fatal("RegionServer abort: loaded coprocessors are: " +
2060         CoprocessorHost.getLoadedCoprocessors());
2061     // Try and dump metrics if abort -- might give clue as to how fatal came about....
2062     try {
2063       LOG.info("Dump of metrics as JSON on abort: " + JSONBean.dumpRegionServerMetrics());
2064     } catch (MalformedObjectNameException | IOException e) {
2065       LOG.warn("Failed dumping metrics", e);
2066     }
2067 
2068     // Do our best to report our abort to the master, but this may not work
2069     try {
2070       if (cause != null) {
2071         msg += "\nCause:\n" + StringUtils.stringifyException(cause);
2072       }
2073       // Report to the master but only if we have already registered with the master.
2074       if (rssStub != null && this.serverName != null) {
2075         ReportRSFatalErrorRequest.Builder builder =
2076           ReportRSFatalErrorRequest.newBuilder();
2077         ServerName sn =
2078           ServerName.parseVersionedServerName(this.serverName.getVersionedBytes());
2079         builder.setServer(ProtobufUtil.toServerName(sn));
2080         builder.setErrorMessage(msg);
2081         rssStub.reportRSFatalError(null, builder.build());
2082       }
2083     } catch (Throwable t) {
2084       LOG.warn("Unable to report fatal error to master", t);
2085     }
2086     stop(reason);
2087   }
2088 
2089   /**
2090    * @see HRegionServer#abort(String, Throwable)
2091    */
2092   public void abort(String reason) {
2093     abort(reason, null);
2094   }
2095 
2096   @Override
2097   public boolean isAborted() {
2098     return this.abortRequested;
2099   }
2100 
2101   /*
2102    * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
2103    * logs but it does close socket in case want to bring up server on old
2104    * hostname+port immediately.
2105    */
2106   protected void kill() {
2107     this.killed = true;
2108     abort("Simulated kill");
2109   }
2110 
2111   /**
2112    * Called on stop/abort before closing the cluster connection and meta locator.
2113    */
2114   protected void sendShutdownInterrupt() {
2115   }
2116 
2117   /**
2118    * Wait on all threads to finish. Presumption is that all closes and stops
2119    * have already been called.
2120    */
2121   protected void stopServiceThreads() {
2122     // clean up the scheduled chores
2123     if (this.choreService != null) choreService.shutdown();
2124     if (this.nonceManagerChore != null) nonceManagerChore.cancel(true);
2125     if (this.compactionChecker != null) compactionChecker.cancel(true);
2126     if (this.periodicFlusher != null) periodicFlusher.cancel(true);
2127     if (this.healthCheckChore != null) healthCheckChore.cancel(true);
2128     if (this.storefileRefresher != null) storefileRefresher.cancel(true);
2129     if (this.movedRegionsCleaner != null) movedRegionsCleaner.cancel(true);
2130 
2131     if (this.cacheFlusher != null) {
2132       this.cacheFlusher.join();
2133     }
2134 
2135     if (this.spanReceiverHost != null) {
2136       this.spanReceiverHost.closeReceivers();
2137     }
2138     if (this.walRoller != null) {
2139       Threads.shutdown(this.walRoller.getThread());
2140     }
2141     final LogRoller metawalRoller = this.metawalRoller.get();
2142     if (metawalRoller != null) {
2143       Threads.shutdown(metawalRoller.getThread());
2144     }
2145     if (this.compactSplitThread != null) {
2146       this.compactSplitThread.join();
2147     }
2148     if (this.service != null) this.service.shutdown();
2149     if (this.replicationSourceHandler != null &&
2150         this.replicationSourceHandler == this.replicationSinkHandler) {
2151       this.replicationSourceHandler.stopReplicationService();
2152     } else {
2153       if (this.replicationSourceHandler != null) {
2154         this.replicationSourceHandler.stopReplicationService();
2155       }
2156       if (this.replicationSinkHandler != null) {
2157         this.replicationSinkHandler.stopReplicationService();
2158       }
2159     }
2160   }
2161 
2162   /**
2163    * @return Return the object that implements the replication
2164    * source service.
2165    */
2166   ReplicationSourceService getReplicationSourceService() {
2167     return replicationSourceHandler;
2168   }
2169 
2170   /**
2171    * @return Return the object that implements the replication
2172    * sink service.
2173    */
2174   ReplicationSinkService getReplicationSinkService() {
2175     return replicationSinkHandler;
2176   }
2177 
2178   /**
2179    * Get the current master from ZooKeeper and open the RPC connection to it.
2180    * To get a fresh connection, the current rssStub must be null.
2181    * Method will block until a master is available. You can break from this
2182    * block by requesting the server stop.
2183    *
2184    * @return master + port, or null if server has been stopped
2185    */
2186   @VisibleForTesting
2187   protected synchronized ServerName createRegionServerStatusStub() {
2188     // Create RS stub without refreshing the master node from ZK, use cached data
2189     return createRegionServerStatusStub(false);
2190   }
2191 
2192   /**
2193    * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2194    * connection, the current rssStub must be null. Method will block until a master is available.
2195    * You can break from this block by requesting the server stop.
2196    * @param refresh If true then master address will be read from ZK, otherwise use cached data
2197    * @return master + port, or null if server has been stopped
2198    */
2199   @VisibleForTesting
2200   protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
2201     if (rssStub != null) {
2202       return masterAddressTracker.getMasterAddress();
2203     }
2204     ServerName sn = null;
2205     long previousLogTime = 0;
2206     RegionServerStatusService.BlockingInterface intf = null;
2207     boolean interrupted = false;
2208     try {
2209       while (keepLooping()) {
2210         sn = this.masterAddressTracker.getMasterAddress(refresh);
2211         if (sn == null) {
2212           if (!keepLooping()) {
2213             // give up with no connection.
2214             LOG.debug("No master found and cluster is stopped; bailing out");
2215             return null;
2216           }
2217           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2218             LOG.debug("No master found; retry");
2219             previousLogTime = System.currentTimeMillis();
2220           }
2221           refresh = true; // let's try pull it from ZK directly
2222           if (sleep(200)) {
2223             interrupted = true;
2224           }
2225           continue;
2226         }
2227 
2228         // If we are on the active master, use the shortcut
2229         if (this instanceof HMaster && sn.equals(getServerName())) {
2230           intf = ((HMaster)this).getMasterRpcServices();
2231           break;
2232         }
2233         try {
2234           BlockingRpcChannel channel =
2235             this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(),
2236               shortOperationTimeout);
2237           intf = RegionServerStatusService.newBlockingStub(channel);
2238           break;
2239         } catch (IOException e) {
2240           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2241             e = e instanceof RemoteException ?
2242               ((RemoteException)e).unwrapRemoteException() : e;
2243             if (e instanceof ServerNotRunningYetException) {
2244               LOG.info("Master isn't available yet, retrying");
2245             } else {
2246               LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2247             }
2248             previousLogTime = System.currentTimeMillis();
2249           }
2250           if (sleep(200)) {
2251             interrupted = true;
2252           }
2253         }
2254       }
2255     } finally {
2256       if (interrupted) {
2257         Thread.currentThread().interrupt();
2258       }
2259     }
2260     rssStub = intf;
2261     return sn;
2262   }
2263 
2264   /**
2265    * @return True if we should break loop because cluster is going down or
2266    * this server has been stopped or hdfs has gone bad.
2267    */
2268   private boolean keepLooping() {
2269     return !this.stopped && isClusterUp();
2270   }
2271 
2272   /*
2273    * Let the master know we're here Run initialization using parameters passed
2274    * us by the master.
2275    * @return A Map of key/value configurations we got from the Master else
2276    * null if we failed to register.
2277    * @throws IOException
2278    */
2279   private RegionServerStartupResponse reportForDuty() throws IOException {
2280     ServerName masterServerName = createRegionServerStatusStub(true);
2281     if (masterServerName == null) return null;
2282     RegionServerStartupResponse result = null;
2283     try {
2284       rpcServices.requestCount.set(0);
2285       LOG.info("reportForDuty to master=" + masterServerName + " with port="
2286         + rpcServices.isa.getPort() + ", startcode=" + this.startcode);
2287       long now = EnvironmentEdgeManager.currentTime();
2288       int port = rpcServices.isa.getPort();
2289       RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2290       if (shouldUseThisHostnameInstead()) {
2291         request.setUseThisHostnameInstead(useThisHostnameInstead);
2292       }
2293       request.setPort(port);
2294       request.setServerStartCode(this.startcode);
2295       request.setServerCurrentTime(now);
2296       result = this.rssStub.regionServerStartup(null, request.build());
2297     } catch (ServiceException se) {
2298       IOException ioe = ProtobufUtil.getRemoteException(se);
2299       if (ioe instanceof ClockOutOfSyncException) {
2300         LOG.fatal("Master rejected startup because clock is out of sync", ioe);
2301         // Re-throw IOE will cause RS to abort
2302         throw ioe;
2303       } else if (ioe instanceof ServerNotRunningYetException) {
2304         LOG.debug("Master is not running yet");
2305       } else {
2306         LOG.warn("error telling master we are up", se);
2307       }
2308       rssStub = null;
2309     }
2310     return result;
2311   }
2312 
2313   @Override
2314   public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2315     try {
2316       GetLastFlushedSequenceIdRequest req =
2317           RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2318       RegionServerStatusService.BlockingInterface rss = rssStub;
2319       if (rss == null) { // Try to connect one more time
2320         createRegionServerStatusStub();
2321         rss = rssStub;
2322         if (rss == null) {
2323           // Still no luck, we tried
2324           LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2325           return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2326               .build();
2327         }
2328       }
2329       GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2330       return RegionStoreSequenceIds.newBuilder()
2331           .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2332           .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2333     } catch (ServiceException e) {
2334       LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2335       return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2336           .build();
2337     }
2338   }
2339 
2340   /**
2341    * Closes all regions.  Called on our way out.
2342    * Assumes that its not possible for new regions to be added to onlineRegions
2343    * while this method runs.
2344    */
2345   protected void closeAllRegions(final boolean abort) {
2346     closeUserRegions(abort);
2347     closeMetaTableRegions(abort);
2348   }
2349 
2350   /**
2351    * Close meta region if we carry it
2352    * @param abort Whether we're running an abort.
2353    */
2354   void closeMetaTableRegions(final boolean abort) {
2355     Region meta = null;
2356     this.lock.writeLock().lock();
2357     try {
2358       for (Map.Entry<String, Region> e: onlineRegions.entrySet()) {
2359         HRegionInfo hri = e.getValue().getRegionInfo();
2360         if (hri.isMetaRegion()) {
2361           meta = e.getValue();
2362         }
2363         if (meta != null) break;
2364       }
2365     } finally {
2366       this.lock.writeLock().unlock();
2367     }
2368     if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2369   }
2370 
2371   /**
2372    * Schedule closes on all user regions.
2373    * Should be safe calling multiple times because it wont' close regions
2374    * that are already closed or that are closing.
2375    * @param abort Whether we're running an abort.
2376    */
2377   void closeUserRegions(final boolean abort) {
2378     this.lock.writeLock().lock();
2379     try {
2380       for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
2381         Region r = e.getValue();
2382         if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) {
2383           // Don't update zk with this close transition; pass false.
2384           closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2385         }
2386       }
2387     } finally {
2388       this.lock.writeLock().unlock();
2389     }
2390   }
2391 
2392   /** @return the info server */
2393   public InfoServer getInfoServer() {
2394     return infoServer;
2395   }
2396 
2397   /**
2398    * @return true if a stop has been requested.
2399    */
2400   @Override
2401   public boolean isStopped() {
2402     return this.stopped;
2403   }
2404 
2405   @Override
2406   public boolean isStopping() {
2407     return this.stopping;
2408   }
2409 
2410   @Override
2411   public Map<String, Region> getRecoveringRegions() {
2412     return this.recoveringRegions;
2413   }
2414 
2415   /**
2416    *
2417    * @return the configuration
2418    */
2419   @Override
2420   public Configuration getConfiguration() {
2421     return conf;
2422   }
2423 
2424   /** @return the write lock for the server */
2425   ReentrantReadWriteLock.WriteLock getWriteLock() {
2426     return lock.writeLock();
2427   }
2428 
2429   public int getNumberOfOnlineRegions() {
2430     return this.onlineRegions.size();
2431   }
2432 
2433   boolean isOnlineRegionsEmpty() {
2434     return this.onlineRegions.isEmpty();
2435   }
2436 
2437   /**
2438    * For tests, web ui and metrics.
2439    * This method will only work if HRegionServer is in the same JVM as client;
2440    * HRegion cannot be serialized to cross an rpc.
2441    */
2442   public Collection<Region> getOnlineRegionsLocalContext() {
2443     Collection<Region> regions = this.onlineRegions.values();
2444     return Collections.unmodifiableCollection(regions);
2445   }
2446 
2447   @Override
2448   public void addToOnlineRegions(Region region) {
2449     this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2450     configurationManager.registerObserver(region);
2451   }
2452 
2453   /**
2454    * @return A new Map of online regions sorted by region size with the first entry being the
2455    * biggest.  If two regions are the same size, then the last one found wins; i.e. this method
2456    * may NOT return all regions.
2457    */
2458   SortedMap<Long, Region> getCopyOfOnlineRegionsSortedBySize() {
2459     // we'll sort the regions in reverse
2460     SortedMap<Long, Region> sortedRegions = new TreeMap<Long, Region>(
2461         new Comparator<Long>() {
2462           @Override
2463           public int compare(Long a, Long b) {
2464             return -1 * a.compareTo(b);
2465           }
2466         });
2467     // Copy over all regions. Regions are sorted by size with biggest first.
2468     for (Region region : this.onlineRegions.values()) {
2469       sortedRegions.put(region.getMemstoreSize(), region);
2470     }
2471     return sortedRegions;
2472   }
2473 
2474   /**
2475    * @return time stamp in millis of when this region server was started
2476    */
2477   public long getStartcode() {
2478     return this.startcode;
2479   }
2480 
2481   /** @return reference to FlushRequester */
2482   @Override
2483   public FlushRequester getFlushRequester() {
2484     return this.cacheFlusher;
2485   }
2486 
2487   /**
2488    * Get the top N most loaded regions this server is serving so we can tell the
2489    * master which regions it can reallocate if we're overloaded. TODO: actually
2490    * calculate which regions are most loaded. (Right now, we're just grabbing
2491    * the first N regions being served regardless of load.)
2492    */
2493   protected HRegionInfo[] getMostLoadedRegions() {
2494     ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2495     for (Region r : onlineRegions.values()) {
2496       if (!r.isAvailable()) {
2497         continue;
2498       }
2499       if (regions.size() < numRegionsToReport) {
2500         regions.add(r.getRegionInfo());
2501       } else {
2502         break;
2503       }
2504     }
2505     return regions.toArray(new HRegionInfo[regions.size()]);
2506   }
2507 
2508   @Override
2509   public Leases getLeases() {
2510     return leases;
2511   }
2512 
2513   /**
2514    * @return Return the rootDir.
2515    */
2516   protected Path getRootDir() {
2517     return rootDir;
2518   }
2519 
2520   /**
2521    * @return Return the fs.
2522    */
2523   @Override
2524   public FileSystem getFileSystem() {
2525     return fs;
2526   }
2527 
2528   @Override
2529   public String toString() {
2530     return getServerName().toString();
2531   }
2532 
2533   /**
2534    * Interval at which threads should run
2535    *
2536    * @return the interval
2537    */
2538   public int getThreadWakeFrequency() {
2539     return threadWakeFrequency;
2540   }
2541 
2542   @Override
2543   public ZooKeeperWatcher getZooKeeper() {
2544     return zooKeeper;
2545   }
2546 
2547   @Override
2548   public BaseCoordinatedStateManager getCoordinatedStateManager() {
2549     return csm;
2550   }
2551 
2552   @Override
2553   public ServerName getServerName() {
2554     return serverName;
2555   }
2556 
2557   @Override
2558   public CompactionRequestor getCompactionRequester() {
2559     return this.compactSplitThread;
2560   }
2561 
2562   public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
2563     return this.rsHost;
2564   }
2565 
2566   @Override
2567   public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2568     return this.regionsInTransitionInRS;
2569   }
2570 
2571   @Override
2572   public ExecutorService getExecutorService() {
2573     return service;
2574   }
2575 
2576   @Override
2577   public ChoreService getChoreService() {
2578     return choreService;
2579   }
2580   
2581   @Override
2582   public RegionServerQuotaManager getRegionServerQuotaManager() {
2583     return rsQuotaManager;
2584   }
2585 
2586   //
2587   // Main program and support routines
2588   //
2589 
2590   /**
2591    * Load the replication service objects, if any
2592    */
2593   static private void createNewReplicationInstance(Configuration conf,
2594     HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
2595 
2596     // If replication is not enabled, then return immediately.
2597     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
2598         HConstants.REPLICATION_ENABLE_DEFAULT)) {
2599       return;
2600     }
2601 
2602     // read in the name of the source replication class from the config file.
2603     String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2604                                HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2605 
2606     // read in the name of the sink replication class from the config file.
2607     String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2608                              HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2609 
2610     // If both the sink and the source class names are the same, then instantiate
2611     // only one object.
2612     if (sourceClassname.equals(sinkClassname)) {
2613       server.replicationSourceHandler = (ReplicationSourceService)
2614                                          newReplicationInstance(sourceClassname,
2615                                          conf, server, fs, logDir, oldLogDir);
2616       server.replicationSinkHandler = (ReplicationSinkService)
2617                                          server.replicationSourceHandler;
2618     } else {
2619       server.replicationSourceHandler = (ReplicationSourceService)
2620                                          newReplicationInstance(sourceClassname,
2621                                          conf, server, fs, logDir, oldLogDir);
2622       server.replicationSinkHandler = (ReplicationSinkService)
2623                                          newReplicationInstance(sinkClassname,
2624                                          conf, server, fs, logDir, oldLogDir);
2625     }
2626   }
2627 
2628   static private ReplicationService newReplicationInstance(String classname,
2629     Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
2630     Path oldLogDir) throws IOException{
2631 
2632     Class<?> clazz = null;
2633     try {
2634       ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2635       clazz = Class.forName(classname, true, classLoader);
2636     } catch (java.lang.ClassNotFoundException nfe) {
2637       throw new IOException("Could not find class for " + classname);
2638     }
2639 
2640     // create an instance of the replication object.
2641     ReplicationService service = (ReplicationService)
2642                               ReflectionUtils.newInstance(clazz, conf);
2643     service.initialize(server, fs, logDir, oldLogDir);
2644     return service;
2645   }
2646 
2647   /**
2648    * Utility for constructing an instance of the passed HRegionServer class.
2649    *
2650    * @param regionServerClass
2651    * @param conf2
2652    * @return HRegionServer instance.
2653    */
2654   public static HRegionServer constructRegionServer(
2655       Class<? extends HRegionServer> regionServerClass,
2656       final Configuration conf2, CoordinatedStateManager cp) {
2657     try {
2658       Constructor<? extends HRegionServer> c = regionServerClass
2659           .getConstructor(Configuration.class, CoordinatedStateManager.class);
2660       return c.newInstance(conf2, cp);
2661     } catch (Exception e) {
2662       throw new RuntimeException("Failed construction of " + "Regionserver: "
2663           + regionServerClass.toString(), e);
2664     }
2665   }
2666 
2667   /**
2668    * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2669    */
2670   public static void main(String[] args) throws Exception {
2671     VersionInfo.logVersion();
2672     Configuration conf = HBaseConfiguration.create();
2673     @SuppressWarnings("unchecked")
2674     Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2675         .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2676 
2677     new HRegionServerCommandLine(regionServerClass).doMain(args);
2678   }
2679 
2680   /**
2681    * Gets the online regions of the specified table.
2682    * This method looks at the in-memory onlineRegions.  It does not go to <code>hbase:meta</code>.
2683    * Only returns <em>online</em> regions.  If a region on this table has been
2684    * closed during a disable, etc., it will not be included in the returned list.
2685    * So, the returned list may not necessarily be ALL regions in this table, its
2686    * all the ONLINE regions in the table.
2687    * @param tableName
2688    * @return Online regions from <code>tableName</code>
2689    */
2690   @Override
2691   public List<Region> getOnlineRegions(TableName tableName) {
2692      List<Region> tableRegions = new ArrayList<Region>();
2693      synchronized (this.onlineRegions) {
2694        for (Region region: this.onlineRegions.values()) {
2695          HRegionInfo regionInfo = region.getRegionInfo();
2696          if(regionInfo.getTable().equals(tableName)) {
2697            tableRegions.add(region);
2698          }
2699        }
2700      }
2701      return tableRegions;
2702    }
2703   
2704   /**
2705    * Gets the online tables in this RS.
2706    * This method looks at the in-memory onlineRegions.
2707    * @return all the online tables in this RS
2708    */
2709   @Override
2710   public Set<TableName> getOnlineTables() {
2711     Set<TableName> tables = new HashSet<TableName>();
2712     synchronized (this.onlineRegions) {
2713       for (Region region: this.onlineRegions.values()) {
2714         tables.add(region.getTableDesc().getTableName());
2715       }
2716     }
2717     return tables;
2718   }
2719 
2720   // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
2721   public String[] getRegionServerCoprocessors() {
2722     TreeSet<String> coprocessors = new TreeSet<String>();
2723     try {
2724       coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
2725     } catch (IOException exception) {
2726       LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
2727           "skipping.");
2728       LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2729     }
2730     Collection<Region> regions = getOnlineRegionsLocalContext();
2731     for (Region region: regions) {
2732       coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2733       try {
2734         coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
2735       } catch (IOException exception) {
2736         LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
2737             "; skipping.");
2738         LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2739       }
2740     }
2741     return coprocessors.toArray(new String[coprocessors.size()]);
2742   }
2743 
2744   /**
2745    * Try to close the region, logs a warning on failure but continues.
2746    * @param region Region to close
2747    */
2748   private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
2749     try {
2750       CloseRegionCoordination.CloseRegionDetails details =
2751         csm.getCloseRegionCoordination().getDetaultDetails();
2752       if (!closeRegion(region.getEncodedName(), abort, details, null)) {
2753         LOG.warn("Failed to close " + region.getRegionNameAsString() +
2754             " - ignoring and continuing");
2755       }
2756     } catch (IOException e) {
2757       LOG.warn("Failed to close " + region.getRegionNameAsString() +
2758           " - ignoring and continuing", e);
2759     }
2760   }
2761 
2762   /**
2763    * Close asynchronously a region, can be called from the master or internally by the regionserver
2764    * when stopping. If called from the master, the region will update the znode status.
2765    *
2766    * <p>
2767    * If an opening was in progress, this method will cancel it, but will not start a new close. The
2768    * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
2769    * </p>
2770 
2771    * <p>
2772    *   If a close was in progress, this new request will be ignored, and an exception thrown.
2773    * </p>
2774    *
2775    * @param encodedName Region to close
2776    * @param abort True if we are aborting
2777    * @param crd details about closing region coordination-coordinated task
2778    * @return True if closed a region.
2779    * @throws NotServingRegionException if the region is not online
2780    * @throws RegionAlreadyInTransitionException if the region is already closing
2781    */
2782   protected boolean closeRegion(String encodedName, final boolean abort,
2783       CloseRegionCoordination.CloseRegionDetails crd, final ServerName sn)
2784       throws NotServingRegionException, RegionAlreadyInTransitionException {
2785     //Check for permissions to close.
2786     Region actualRegion = this.getFromOnlineRegions(encodedName);
2787     // Can be null if we're calling close on a region that's not online
2788     if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2789       try {
2790         actualRegion.getCoprocessorHost().preClose(false);
2791       } catch (IOException exp) {
2792         LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2793         return false;
2794       }
2795     }
2796 
2797     final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(encodedName.getBytes(),
2798         Boolean.FALSE);
2799 
2800     if (Boolean.TRUE.equals(previous)) {
2801       LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
2802           "trying to OPEN. Cancelling OPENING.");
2803       if (!regionsInTransitionInRS.replace(encodedName.getBytes(), previous, Boolean.FALSE)){
2804         // The replace failed. That should be an exceptional case, but theoretically it can happen.
2805         // We're going to try to do a standard close then.
2806         LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
2807             " Doing a standard close now");
2808         return closeRegion(encodedName, abort, crd, sn);
2809       }
2810       // Let's get the region from the online region list again
2811       actualRegion = this.getFromOnlineRegions(encodedName);
2812       if (actualRegion == null) { // If already online, we still need to close it.
2813         LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
2814         // The master deletes the znode when it receives this exception.
2815         throw new RegionAlreadyInTransitionException("The region " + encodedName +
2816           " was opening but not yet served. Opening is cancelled.");
2817       }
2818     } else if (Boolean.FALSE.equals(previous)) {
2819       LOG.info("Received CLOSE for the region: " + encodedName +
2820         ", which we are already trying to CLOSE, but not completed yet");
2821       // The master will retry till the region is closed. We need to do this since
2822       // the region could fail to close somehow. If we mark the region closed in master
2823       // while it is not, there could be data loss.
2824       // If the region stuck in closing for a while, and master runs out of retries,
2825       // master will move the region to failed_to_close. Later on, if the region
2826       // is indeed closed, master can properly re-assign it.
2827       throw new RegionAlreadyInTransitionException("The region " + encodedName +
2828         " was already closing. New CLOSE request is ignored.");
2829     }
2830 
2831     if (actualRegion == null) {
2832       LOG.error("Received CLOSE for a region which is not online, and we're not opening.");
2833       this.regionsInTransitionInRS.remove(encodedName.getBytes());
2834       // The master deletes the znode when it receives this exception.
2835       throw new NotServingRegionException("The region " + encodedName +
2836           " is not online, and is not opening.");
2837     }
2838 
2839     CloseRegionHandler crh;
2840     final HRegionInfo hri = actualRegion.getRegionInfo();
2841     if (hri.isMetaRegion()) {
2842       crh = new CloseMetaHandler(this, this, hri, abort,
2843         csm.getCloseRegionCoordination(), crd);
2844     } else {
2845       crh = new CloseRegionHandler(this, this, hri, abort,
2846         csm.getCloseRegionCoordination(), crd, sn);
2847     }
2848     this.service.submit(crh);
2849     return true;
2850   }
2851 
2852    /**
2853    * @param regionName
2854    * @return HRegion for the passed binary <code>regionName</code> or null if
2855    *         named region is not member of the online regions.
2856    */
2857   public Region getOnlineRegion(final byte[] regionName) {
2858     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2859     return this.onlineRegions.get(encodedRegionName);
2860   }
2861 
2862   public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) {
2863     return this.regionFavoredNodesMap.get(encodedRegionName);
2864   }
2865 
2866   @Override
2867   public Region getFromOnlineRegions(final String encodedRegionName) {
2868     return this.onlineRegions.get(encodedRegionName);
2869   }
2870 
2871 
2872   @Override
2873   public boolean removeFromOnlineRegions(final Region r, ServerName destination) {
2874     Region toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
2875     if (destination != null) {
2876       long closeSeqNum = r.getMaxFlushedSeqId();
2877       if (closeSeqNum == HConstants.NO_SEQNUM) {
2878         // No edits in WAL for this region; get the sequence number when the region was opened.
2879         closeSeqNum = r.getOpenSeqNum();
2880         if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0;
2881       }
2882       addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
2883     }
2884     this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
2885     return toReturn != null;
2886   }
2887 
2888   /**
2889    * Protected utility method for safely obtaining an HRegion handle.
2890    *
2891    * @param regionName
2892    *          Name of online {@link Region} to return
2893    * @return {@link Region} for <code>regionName</code>
2894    * @throws NotServingRegionException
2895    */
2896   protected Region getRegion(final byte[] regionName)
2897       throws NotServingRegionException {
2898     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2899     return getRegionByEncodedName(regionName, encodedRegionName);
2900   }
2901 
2902   public Region getRegionByEncodedName(String encodedRegionName)
2903       throws NotServingRegionException {
2904     return getRegionByEncodedName(null, encodedRegionName);
2905   }
2906 
2907   protected Region getRegionByEncodedName(byte[] regionName, String encodedRegionName)
2908     throws NotServingRegionException {
2909     Region region = this.onlineRegions.get(encodedRegionName);
2910     if (region == null) {
2911       MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
2912       if (moveInfo != null) {
2913         throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
2914       }
2915       Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
2916       String regionNameStr = regionName == null?
2917         encodedRegionName: Bytes.toStringBinary(regionName);
2918       if (isOpening != null && isOpening.booleanValue()) {
2919         throw new RegionOpeningException("Region " + regionNameStr +
2920           " is opening on " + this.serverName);
2921       }
2922       throw new NotServingRegionException("Region " + regionNameStr +
2923         " is not online on " + this.serverName);
2924     }
2925     return region;
2926   }
2927 
2928   /*
2929    * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
2930    * IOE if it isn't already.
2931    *
2932    * @param t Throwable
2933    *
2934    * @param msg Message to log in error. Can be null.
2935    *
2936    * @return Throwable converted to an IOE; methods can only let out IOEs.
2937    */
2938   private Throwable cleanup(final Throwable t, final String msg) {
2939     // Don't log as error if NSRE; NSRE is 'normal' operation.
2940     if (t instanceof NotServingRegionException) {
2941       LOG.debug("NotServingRegionException; " + t.getMessage());
2942       return t;
2943     }
2944     if (msg == null) {
2945       LOG.error("", RemoteExceptionHandler.checkThrowable(t));
2946     } else {
2947       LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
2948     }
2949     if (!rpcServices.checkOOME(t)) {
2950       checkFileSystem();
2951     }
2952     return t;
2953   }
2954 
2955   /*
2956    * @param t
2957    *
2958    * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
2959    *
2960    * @return Make <code>t</code> an IOE if it isn't already.
2961    */
2962   protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
2963     return (t instanceof IOException ? (IOException) t : msg == null
2964         || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
2965   }
2966 
2967   /**
2968    * Checks to see if the file system is still accessible. If not, sets
2969    * abortRequested and stopRequested
2970    *
2971    * @return false if file system is not available
2972    */
2973   public boolean checkFileSystem() {
2974     if (this.fsOk && this.fs != null) {
2975       try {
2976         FSUtils.checkFileSystemAvailable(this.fs);
2977       } catch (IOException e) {
2978         abort("File System not available", e);
2979         this.fsOk = false;
2980       }
2981     }
2982     return this.fsOk;
2983   }
2984 
2985   @Override
2986   public void updateRegionFavoredNodesMapping(String encodedRegionName,
2987       List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
2988     InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
2989     // Refer to the comment on the declaration of regionFavoredNodesMap on why
2990     // it is a map of region name to InetSocketAddress[]
2991     for (int i = 0; i < favoredNodes.size(); i++) {
2992       addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
2993           favoredNodes.get(i).getPort());
2994     }
2995     regionFavoredNodesMap.put(encodedRegionName, addr);
2996   }
2997 
2998   /**
2999    * Return the favored nodes for a region given its encoded name. Look at the
3000    * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[]
3001    * @param encodedRegionName
3002    * @return array of favored locations
3003    */
3004   @Override
3005   public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3006     return regionFavoredNodesMap.get(encodedRegionName);
3007   }
3008 
3009   @Override
3010   public ServerNonceManager getNonceManager() {
3011     return this.nonceManager;
3012   }
3013 
3014   private static class MovedRegionInfo {
3015     private final ServerName serverName;
3016     private final long seqNum;
3017     private final long ts;
3018 
3019     public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3020       this.serverName = serverName;
3021       this.seqNum = closeSeqNum;
3022       ts = EnvironmentEdgeManager.currentTime();
3023      }
3024 
3025     public ServerName getServerName() {
3026       return serverName;
3027     }
3028 
3029     public long getSeqNum() {
3030       return seqNum;
3031     }
3032 
3033     public long getMoveTime() {
3034       return ts;
3035     }
3036   }
3037 
3038   // This map will contains all the regions that we closed for a move.
3039   //  We add the time it was moved as we don't want to keep too old information
3040   protected Map<String, MovedRegionInfo> movedRegions =
3041       new ConcurrentHashMap<String, MovedRegionInfo>(3000);
3042 
3043   // We need a timeout. If not there is a risk of giving a wrong information: this would double
3044   //  the number of network calls instead of reducing them.
3045   private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3046 
3047   protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
3048     if (ServerName.isSameHostnameAndPort(destination, this.getServerName())) {
3049       LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3050       return;
3051     }
3052     LOG.info("Adding moved region record: "
3053       + encodedName + " to " + destination + " as of " + closeSeqNum);
3054     movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3055   }
3056 
3057   void removeFromMovedRegions(String encodedName) {
3058     movedRegions.remove(encodedName);
3059   }
3060 
3061   private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
3062     MovedRegionInfo dest = movedRegions.get(encodedRegionName);
3063 
3064     long now = EnvironmentEdgeManager.currentTime();
3065     if (dest != null) {
3066       if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
3067         return dest;
3068       } else {
3069         movedRegions.remove(encodedRegionName);
3070       }
3071     }
3072 
3073     return null;
3074   }
3075 
3076   /**
3077    * Remove the expired entries from the moved regions list.
3078    */
3079   protected void cleanMovedRegions() {
3080     final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
3081     Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();
3082 
3083     while (it.hasNext()){
3084       Map.Entry<String, MovedRegionInfo> e = it.next();
3085       if (e.getValue().getMoveTime() < cutOff) {
3086         it.remove();
3087       }
3088     }
3089   }
3090 
3091   /*
3092    * Use this to allow tests to override and schedule more frequently.
3093    */
3094 
3095   protected int movedRegionCleanerPeriod() {
3096         return TIMEOUT_REGION_MOVED;
3097   }
3098 
3099   /**
3100    * Creates a Chore thread to clean the moved region cache.
3101    */
3102 
3103   protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable {
3104     private HRegionServer regionServer;
3105     Stoppable stoppable;
3106 
3107     private MovedRegionsCleaner(
3108       HRegionServer regionServer, Stoppable stoppable){
3109       super("MovedRegionsCleaner for region " + regionServer, stoppable,
3110           regionServer.movedRegionCleanerPeriod());
3111       this.regionServer = regionServer;
3112       this.stoppable = stoppable;
3113     }
3114 
3115     static MovedRegionsCleaner create(HRegionServer rs){
3116       Stoppable stoppable = new Stoppable() {
3117         private volatile boolean isStopped = false;
3118         @Override public void stop(String why) { isStopped = true;}
3119         @Override public boolean isStopped() {return isStopped;}
3120       };
3121 
3122       return new MovedRegionsCleaner(rs, stoppable);
3123     }
3124 
3125     @Override
3126     protected void chore() {
3127       regionServer.cleanMovedRegions();
3128     }
3129 
3130     @Override
3131     public void stop(String why) {
3132       stoppable.stop(why);
3133     }
3134 
3135     @Override
3136     public boolean isStopped() {
3137       return stoppable.isStopped();
3138     }
3139   }
3140 
3141   private String getMyEphemeralNodePath() {
3142     return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
3143   }
3144 
3145   private boolean isHealthCheckerConfigured() {
3146     String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3147     return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
3148   }
3149 
3150   /**
3151    * @return the underlying {@link CompactSplitThread} for the servers
3152    */
3153   public CompactSplitThread getCompactSplitThread() {
3154     return this.compactSplitThread;
3155   }
3156 
3157   /**
3158    * A helper function to store the last flushed sequence Id with the previous failed RS for a
3159    * recovering region. The Id is used to skip wal edits which are flushed. Since the flushed
3160    * sequence id is only valid for each RS, we associate the Id with corresponding failed RS.
3161    * @throws KeeperException
3162    * @throws IOException
3163    */
3164   private void updateRecoveringRegionLastFlushedSequenceId(Region r) throws KeeperException,
3165       IOException {
3166     if (!r.isRecovering()) {
3167       // return immdiately for non-recovering regions
3168       return;
3169     }
3170 
3171     HRegionInfo regionInfo = r.getRegionInfo();
3172     ZooKeeperWatcher zkw = getZooKeeper();
3173     String previousRSName = this.getLastFailedRSFromZK(regionInfo.getEncodedName());
3174     Map<byte[], Long> maxSeqIdInStores = r.getMaxStoreSeqId();
3175     long minSeqIdForLogReplay = -1;
3176     for (Long storeSeqIdForReplay : maxSeqIdInStores.values()) {
3177       if (minSeqIdForLogReplay == -1 || storeSeqIdForReplay < minSeqIdForLogReplay) {
3178         minSeqIdForLogReplay = storeSeqIdForReplay;
3179       }
3180     }
3181 
3182     try {
3183       long lastRecordedFlushedSequenceId = -1;
3184       String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode,
3185         regionInfo.getEncodedName());
3186       // recovering-region level
3187       byte[] data;
3188       try {
3189         data = ZKUtil.getData(zkw, nodePath);
3190       } catch (InterruptedException e) {
3191         throw new InterruptedIOException();
3192       }
3193       if (data != null) {
3194         lastRecordedFlushedSequenceId = ZKSplitLog.parseLastFlushedSequenceIdFrom(data);
3195       }
3196       if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
3197         ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
3198       }
3199       if (previousRSName != null) {
3200         // one level deeper for the failed RS
3201         nodePath = ZKUtil.joinZNode(nodePath, previousRSName);
3202         ZKUtil.setData(zkw, nodePath,
3203           ZKUtil.regionSequenceIdsToByteArray(minSeqIdForLogReplay, maxSeqIdInStores));
3204         LOG.debug("Update last flushed sequence id of region " + regionInfo.getEncodedName() +
3205           " for " + previousRSName);
3206       } else {
3207         LOG.warn("Can't find failed region server for recovering region " +
3208             regionInfo.getEncodedName());
3209       }
3210     } catch (NoNodeException ignore) {
3211       LOG.debug("Region " + regionInfo.getEncodedName() +
3212         " must have completed recovery because its recovery znode has been removed", ignore);
3213     }
3214   }
3215 
3216   /**
3217    * Return the last failed RS name under /hbase/recovering-regions/encodedRegionName
3218    * @param encodedRegionName
3219    * @throws KeeperException
3220    */
3221   private String getLastFailedRSFromZK(String encodedRegionName) throws KeeperException {
3222     String result = null;
3223     long maxZxid = 0;
3224     ZooKeeperWatcher zkw = this.getZooKeeper();
3225     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
3226     List<String> failedServers = ZKUtil.listChildrenNoWatch(zkw, nodePath);
3227     if (failedServers == null || failedServers.isEmpty()) {
3228       return result;
3229     }
3230     for (String failedServer : failedServers) {
3231       String rsPath = ZKUtil.joinZNode(nodePath, failedServer);
3232       Stat stat = new Stat();
3233       ZKUtil.getDataNoWatch(zkw, rsPath, stat);
3234       if (maxZxid < stat.getCzxid()) {
3235         maxZxid = stat.getCzxid();
3236         result = failedServer;
3237       }
3238     }
3239     return result;
3240   }
3241 
3242   public CoprocessorServiceResponse execRegionServerService(
3243       @SuppressWarnings("UnusedParameters") final RpcController controller,
3244       final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3245     try {
3246       ServerRpcController serviceController = new ServerRpcController();
3247       CoprocessorServiceCall call = serviceRequest.getCall();
3248       String serviceName = call.getServiceName();
3249       String methodName = call.getMethodName();
3250       if (!coprocessorServiceHandlers.containsKey(serviceName)) {
3251         throw new UnknownProtocolException(null,
3252             "No registered coprocessor service found for name " + serviceName);
3253       }
3254       Service service = coprocessorServiceHandlers.get(serviceName);
3255       Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
3256       Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3257       if (methodDesc == null) {
3258         throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName
3259             + " called on service " + serviceName);
3260       }
3261       Message.Builder builderForType = service.getRequestPrototype(methodDesc).newBuilderForType();
3262       ProtobufUtil.mergeFrom(builderForType, call.getRequest());
3263       Message request = builderForType.build();
3264       final Message.Builder responseBuilder =
3265           service.getResponsePrototype(methodDesc).newBuilderForType();
3266       service.callMethod(methodDesc, serviceController, request, new RpcCallback<Message>() {
3267         @Override
3268         public void run(Message message) {
3269           if (message != null) {
3270             responseBuilder.mergeFrom(message);
3271           }
3272         }
3273       });
3274       IOException exception = ResponseConverter.getControllerException(serviceController);
3275       if (exception != null) {
3276         throw exception;
3277       }
3278       Message execResult = responseBuilder.build();
3279       ClientProtos.CoprocessorServiceResponse.Builder builder =
3280           ClientProtos.CoprocessorServiceResponse.newBuilder();
3281       builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
3282         HConstants.EMPTY_BYTE_ARRAY));
3283       builder.setValue(builder.getValueBuilder().setName(execResult.getClass().getName())
3284           .setValue(execResult.toByteString()));
3285       return builder.build();
3286     } catch (IOException ie) {
3287       throw new ServiceException(ie);
3288     }
3289   }
3290 
3291   /**
3292    * @return The cache config instance used by the regionserver.
3293    */
3294   public CacheConfig getCacheConfig() {
3295     return this.cacheConfig;
3296   }
3297 
3298   /**
3299    * @return : Returns the ConfigurationManager object for testing purposes.
3300    */
3301   protected ConfigurationManager getConfigurationManager() {
3302     return configurationManager;
3303   }
3304 
3305   /**
3306    * @return Return table descriptors implementation.
3307    */
3308   public TableDescriptors getTableDescriptors() {
3309     return this.tableDescriptors;
3310   }
3311 
3312   /**
3313    * Reload the configuration from disk.
3314    */
3315   public void updateConfiguration() {
3316     LOG.info("Reloading the configuration from disk.");
3317     // Reload the configuration from disk.
3318     conf.reloadConfiguration();
3319     configurationManager.notifyAllObservers(conf);
3320   }
3321 
3322   @Override
3323   public HeapMemoryManager getHeapMemoryManager() {
3324     return hMemManager;
3325   }
3326 
3327   @Override
3328   public double getCompactionPressure() {
3329     double max = 0;
3330     for (Region region : onlineRegions.values()) {
3331       for (Store store : region.getStores()) {
3332         double normCount = store.getCompactionPressure();
3333         if (normCount > max) {
3334           max = normCount;
3335         }
3336       }
3337     }
3338     return max;
3339   }
3340 
3341   /**
3342    * For testing
3343    * @return whether all wal roll request finished for this regionserver
3344    */
3345   @VisibleForTesting
3346   public boolean walRollRequestFinished() {
3347     return this.walRoller.walRollFinished();
3348   }
3349 }