View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.lang.Thread.UncaughtExceptionHandler;
24  import java.lang.management.ManagementFactory;
25  import java.lang.management.MemoryUsage;
26  import java.lang.reflect.Constructor;
27  import java.net.BindException;
28  import java.net.InetAddress;
29  import java.net.InetSocketAddress;
30  import java.util.ArrayList;
31  import java.util.Collection;
32  import java.util.Collections;
33  import java.util.Comparator;
34  import java.util.HashMap;
35  import java.util.HashSet;
36  import java.util.Iterator;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.Map.Entry;
40  import java.util.Set;
41  import java.util.SortedMap;
42  import java.util.TreeMap;
43  import java.util.TreeSet;
44  import java.util.concurrent.ConcurrentHashMap;
45  import java.util.concurrent.ConcurrentMap;
46  import java.util.concurrent.ConcurrentSkipListMap;
47  import java.util.concurrent.atomic.AtomicBoolean;
48  import java.util.concurrent.atomic.AtomicReference;
49  import java.util.concurrent.locks.ReentrantReadWriteLock;
50  
51  import javax.management.MalformedObjectNameException;
52  import javax.management.ObjectName;
53  import javax.servlet.http.HttpServlet;
54  
55  import org.apache.commons.lang.math.RandomUtils;
56  import org.apache.commons.logging.Log;
57  import org.apache.commons.logging.LogFactory;
58  import org.apache.hadoop.conf.Configuration;
59  import org.apache.hadoop.fs.FileSystem;
60  import org.apache.hadoop.fs.Path;
61  import org.apache.hadoop.hbase.ChoreService;
62  import org.apache.hadoop.hbase.ClockOutOfSyncException;
63  import org.apache.hadoop.hbase.CoordinatedStateManager;
64  import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
65  import org.apache.hadoop.hbase.HBaseConfiguration;
66  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
67  import org.apache.hadoop.hbase.HConstants;
68  import org.apache.hadoop.hbase.HRegionInfo;
69  import org.apache.hadoop.hbase.HealthCheckChore;
70  import org.apache.hadoop.hbase.MetaTableAccessor;
71  import org.apache.hadoop.hbase.NotServingRegionException;
72  import org.apache.hadoop.hbase.ScheduledChore;
73  import org.apache.hadoop.hbase.ServerName;
74  import org.apache.hadoop.hbase.Stoppable;
75  import org.apache.hadoop.hbase.TableDescriptors;
76  import org.apache.hadoop.hbase.TableName;
77  import org.apache.hadoop.hbase.YouAreDeadException;
78  import org.apache.hadoop.hbase.ZNodeClearer;
79  import org.apache.hadoop.hbase.classification.InterfaceAudience;
80  import org.apache.hadoop.hbase.client.ClusterConnection;
81  import org.apache.hadoop.hbase.client.ConnectionFactory;
82  import org.apache.hadoop.hbase.client.ConnectionUtils;
83  import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
84  import org.apache.hadoop.hbase.conf.ConfigurationManager;
85  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
86  import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
87  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
88  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
89  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
90  import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
91  import org.apache.hadoop.hbase.executor.ExecutorService;
92  import org.apache.hadoop.hbase.executor.ExecutorType;
93  import org.apache.hadoop.hbase.fs.HFileSystem;
94  import org.apache.hadoop.hbase.http.InfoServer;
95  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
96  import org.apache.hadoop.hbase.ipc.RpcClient;
97  import org.apache.hadoop.hbase.ipc.RpcClientFactory;
98  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
99  import org.apache.hadoop.hbase.ipc.RpcServerInterface;
100 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
101 import org.apache.hadoop.hbase.ipc.ServerRpcController;
102 import org.apache.hadoop.hbase.master.HMaster;
103 import org.apache.hadoop.hbase.master.RegionState.State;
104 import org.apache.hadoop.hbase.master.TableLockManager;
105 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
106 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
107 import org.apache.hadoop.hbase.protobuf.RequestConverter;
108 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
109 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
110 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
111 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
112 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
113 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
114 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
115 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
116 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
117 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
118 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
119 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
120 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
121 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
122 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
123 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
124 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
125 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
126 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
127 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
128 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
129 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
130 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
131 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
132 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
133 import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
134 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
135 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
136 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
137 import org.apache.hadoop.hbase.security.UserProvider;
138 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
139 import org.apache.hadoop.hbase.util.Addressing;
140 import org.apache.hadoop.hbase.util.ByteStringer;
141 import org.apache.hadoop.hbase.util.Bytes;
142 import org.apache.hadoop.hbase.util.CompressionTest;
143 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
144 import org.apache.hadoop.hbase.util.FSTableDescriptors;
145 import org.apache.hadoop.hbase.util.FSUtils;
146 import org.apache.hadoop.hbase.util.HasThread;
147 import org.apache.hadoop.hbase.util.JSONBean;
148 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
149 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
150 import org.apache.hadoop.hbase.util.Sleeper;
151 import org.apache.hadoop.hbase.util.Threads;
152 import org.apache.hadoop.hbase.util.VersionInfo;
153 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
154 import org.apache.hadoop.hbase.wal.WAL;
155 import org.apache.hadoop.hbase.wal.WALFactory;
156 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
157 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
158 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
159 import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
160 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
161 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
162 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
163 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
164 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
165 import org.apache.hadoop.ipc.RemoteException;
166 import org.apache.hadoop.metrics.util.MBeanUtil;
167 import org.apache.hadoop.util.ReflectionUtils;
168 import org.apache.hadoop.util.StringUtils;
169 import org.apache.zookeeper.KeeperException;
170 import org.apache.zookeeper.KeeperException.NoNodeException;
171 import org.apache.zookeeper.data.Stat;
172 
173 import com.google.common.annotations.VisibleForTesting;
174 import com.google.common.base.Preconditions;
175 import com.google.common.collect.Maps;
176 import com.google.protobuf.BlockingRpcChannel;
177 import com.google.protobuf.Descriptors;
178 import com.google.protobuf.Message;
179 import com.google.protobuf.RpcCallback;
180 import com.google.protobuf.RpcController;
181 import com.google.protobuf.Service;
182 import com.google.protobuf.ServiceException;
183 
184 /**
185  * HRegionServer makes a set of HRegions available to clients. It checks in with
186  * the HMaster. There are many HRegionServers in a single HBase deployment.
187  */
188 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
189 @SuppressWarnings("deprecation")
190 public class HRegionServer extends HasThread implements
191     RegionServerServices, LastSequenceId {
192 
193   public static final Log LOG = LogFactory.getLog(HRegionServer.class);
194 
195   /**
196    * For testing only!  Set to true to skip notifying region assignment to master .
197    */
198   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
199   public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
200 
201   /*
202    * Strings to be used in forming the exception message for
203    * RegionsAlreadyInTransitionException.
204    */
205   protected static final String OPEN = "OPEN";
206   protected static final String CLOSE = "CLOSE";
207 
208   //RegionName vs current action in progress
209   //true - if open region action in progress
210   //false - if close region action in progress
211   protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
212     new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
213 
214   // Cache flushing
215   protected MemStoreFlusher cacheFlusher;
216 
217   protected HeapMemoryManager hMemManager;
218 
219   /**
220    * Cluster connection to be shared by services.
221    * Initialized at server startup and closed when server shuts down.
222    * Clients must never close it explicitly.
223    */
224   protected ClusterConnection clusterConnection;
225 
226   /*
227    * Long-living meta table locator, which is created when the server is started and stopped
228    * when server shuts down. References to this locator shall be used to perform according
229    * operations in EventHandlers. Primary reason for this decision is to make it mockable
230    * for tests.
231    */
232   protected MetaTableLocator metaTableLocator;
233 
234   // Watch if a region is out of recovering state from ZooKeeper
235   @SuppressWarnings("unused")
236   private RecoveringRegionWatcher recoveringRegionWatcher;
237 
238   /**
239    * Go here to get table descriptors.
240    */
241   protected TableDescriptors tableDescriptors;
242 
243   // Replication services. If no replication, this handler will be null.
244   protected ReplicationSourceService replicationSourceHandler;
245   protected ReplicationSinkService replicationSinkHandler;
246 
247   // Compactions
248   public CompactSplitThread compactSplitThread;
249 
250   /**
251    * Map of regions currently being served by this region server. Key is the
252    * encoded region name.  All access should be synchronized.
253    */
254   protected final Map<String, HRegion> onlineRegions =
255     new ConcurrentHashMap<String, HRegion>();
256 
257   /**
258    * Map of encoded region names to the DataNode locations they should be hosted on
259    * We store the value as InetSocketAddress since this is used only in HDFS
260    * API (create() that takes favored nodes as hints for placing file blocks).
261    * We could have used ServerName here as the value class, but we'd need to
262    * convert it to InetSocketAddress at some point before the HDFS API call, and
263    * it seems a bit weird to store ServerName since ServerName refers to RegionServers
264    * and here we really mean DataNode locations.
265    */
266   protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
267       new ConcurrentHashMap<String, InetSocketAddress[]>();
268 
269   /**
270    * Set of regions currently being in recovering state which means it can accept writes(edits from
271    * previous failed region server) but not reads. A recovering region is also an online region.
272    */
273   protected final Map<String, HRegion> recoveringRegions = Collections
274       .synchronizedMap(new HashMap<String, HRegion>());
275 
276   // Leases
277   protected Leases leases;
278 
279   // Instance of the hbase executor service.
280   protected ExecutorService service;
281 
282   // If false, the file system has become unavailable
283   protected volatile boolean fsOk;
284   protected HFileSystem fs;
285 
286   // Set when a report to the master comes back with a message asking us to
287   // shutdown. Also set by call to stop when debugging or running unit tests
288   // of HRegionServer in isolation.
289   private volatile boolean stopped = false;
290 
291   // Go down hard. Used if file system becomes unavailable and also in
292   // debugging and unit tests.
293   private volatile boolean abortRequested;
294 
295   ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
296 
297   // A state before we go into stopped state.  At this stage we're closing user
298   // space regions.
299   private boolean stopping = false;
300 
301   private volatile boolean killed = false;
302 
303   protected final Configuration conf;
304 
305   private Path rootDir;
306 
307   protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
308 
309   final int numRetries;
310   protected final int threadWakeFrequency;
311   protected final int msgInterval;
312 
313   protected final int numRegionsToReport;
314 
315   // Stub to do region server status calls against the master.
316   private volatile RegionServerStatusService.BlockingInterface rssStub;
317   // RPC client. Used to make the stub above that does region server status checking.
318   RpcClient rpcClient;
319 
320   private RpcRetryingCallerFactory rpcRetryingCallerFactory;
321   private RpcControllerFactory rpcControllerFactory;
322 
323   private UncaughtExceptionHandler uncaughtExceptionHandler;
324 
325   // Info server. Default access so can be used by unit tests. REGIONSERVER
326   // is name of the webapp and the attribute name used stuffing this instance
327   // into web context.
328   protected InfoServer infoServer;
329   private JvmPauseMonitor pauseMonitor;
330 
331   /** region server process name */
332   public static final String REGIONSERVER = "regionserver";
333 
334   MetricsRegionServer metricsRegionServer;
335   private SpanReceiverHost spanReceiverHost;
336 
337   /**
338    * ChoreService used to schedule tasks that we want to run periodically
339    */
340   private final ChoreService choreService;
341 
342   /*
343    * Check for compactions requests.
344    */
345   ScheduledChore compactionChecker;
346 
347   /*
348    * Check for flushes
349    */
350   ScheduledChore periodicFlusher;
351 
352   protected volatile WALFactory walFactory;
353 
354   // WAL roller. log is protected rather than private to avoid
355   // eclipse warning when accessed by inner classes
356   final LogRoller walRoller;
357   // Lazily initialized if this RegionServer hosts a meta table.
358   final AtomicReference<LogRoller> metawalRoller = new AtomicReference<LogRoller>();
359 
360   // flag set after we're done setting up server threads
361   final AtomicBoolean online = new AtomicBoolean(false);
362 
363   // zookeeper connection and watcher
364   protected ZooKeeperWatcher zooKeeper;
365 
366   // master address tracker
367   private MasterAddressTracker masterAddressTracker;
368 
369   // Cluster Status Tracker
370   protected ClusterStatusTracker clusterStatusTracker;
371 
372   // Log Splitting Worker
373   private SplitLogWorker splitLogWorker;
374 
375   // A sleeper that sleeps for msgInterval.
376   protected final Sleeper sleeper;
377 
378   private final int operationTimeout;
379   private final int shortOperationTimeout;
380 
381   private final RegionServerAccounting regionServerAccounting;
382 
383   // Cache configuration and block cache reference
384   protected CacheConfig cacheConfig;
385 
386   /** The health check chore. */
387   private HealthCheckChore healthCheckChore;
388 
389   /** The nonce manager chore. */
390   private ScheduledChore nonceManagerChore;
391 
392   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
393 
394   /**
395    * The server name the Master sees us as.  Its made from the hostname the
396    * master passes us, port, and server startcode. Gets set after registration
397    * against  Master.
398    */
399   protected ServerName serverName;
400 
401   /**
402    * This servers startcode.
403    */
404   protected final long startcode;
405 
406   /**
407    * Unique identifier for the cluster we are a part of.
408    */
409   private String clusterId;
410 
411   /**
412    * MX Bean for RegionServerInfo
413    */
414   private ObjectName mxBean = null;
415 
416   /**
417    * Chore to clean periodically the moved region list
418    */
419   private MovedRegionsCleaner movedRegionsCleaner;
420 
421   // chore for refreshing store files for secondary regions
422   private StorefileRefresherChore storefileRefresher;
423 
424   private RegionServerCoprocessorHost rsHost;
425 
426   private RegionServerProcedureManagerHost rspmHost;
427 
428   private RegionServerQuotaManager rsQuotaManager;
429 
430   // Table level lock manager for locking for region operations
431   protected TableLockManager tableLockManager;
432 
433   /**
434    * Nonce manager. Nonces are used to make operations like increment and append idempotent
435    * in the case where client doesn't receive the response from a successful operation and
436    * retries. We track the successful ops for some time via a nonce sent by client and handle
437    * duplicate operations (currently, by failing them; in future we might use MVCC to return
438    * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
439    * HBASE-3787) are:
440    * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
441    *   of past records. If we don't read the records, we don't read and recover the nonces.
442    *   Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
443    * - There's no WAL recovery during normal region move, so nonces will not be transfered.
444    * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
445    * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
446    * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
447    * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
448    * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
449    * latest nonce in it expired. It can also be recovered during move.
450    */
451   final ServerNonceManager nonceManager;
452 
453   private UserProvider userProvider;
454 
455   protected final RSRpcServices rpcServices;
456 
457   protected BaseCoordinatedStateManager csm;
458 
459   /**
460    * Configuration manager is used to register/deregister and notify the configuration observers
461    * when the regionserver is notified that there was a change in the on disk configs.
462    */
463   protected final ConfigurationManager configurationManager;
464 
465   /**
466    * Starts a HRegionServer at the default location.
467    * @param conf
468    * @throws IOException
469    * @throws InterruptedException
470    */
471   public HRegionServer(Configuration conf) throws IOException, InterruptedException {
472     this(conf, CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
473   }
474 
475   /**
476    * Starts a HRegionServer at the default location
477    * @param conf
478    * @param csm implementation of CoordinatedStateManager to be used
479    * @throws IOException
480    */
481   public HRegionServer(Configuration conf, CoordinatedStateManager csm)
482       throws IOException {
483     this.fsOk = true;
484     this.conf = conf;
485     checkCodecs(this.conf);
486     this.userProvider = UserProvider.instantiate(conf);
487     FSUtils.setupShortCircuitRead(this.conf);
488     // Disable usage of meta replicas in the regionserver
489     this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
490 
491     // Config'ed params
492     this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
493         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
494     this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
495     this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
496 
497     this.sleeper = new Sleeper(this.msgInterval, this);
498 
499     boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
500     this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
501 
502     this.numRegionsToReport = conf.getInt(
503       "hbase.regionserver.numregionstoreport", 10);
504 
505     this.operationTimeout = conf.getInt(
506       HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
507       HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
508 
509     this.shortOperationTimeout = conf.getInt(
510       HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
511       HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
512 
513     this.abortRequested = false;
514     this.stopped = false;
515 
516     rpcServices = createRpcServices();
517     this.startcode = System.currentTimeMillis();
518     String hostName = rpcServices.isa.getHostName();
519     serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode);
520 
521     rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
522     rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
523 
524     // login the zookeeper client principal (if using security)
525     ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
526       "hbase.zookeeper.client.kerberos.principal", hostName);
527     // login the server principal (if using secure Hadoop)
528     login(userProvider, hostName);
529 
530     regionServerAccounting = new RegionServerAccounting();
531     uncaughtExceptionHandler = new UncaughtExceptionHandler() {
532       @Override
533       public void uncaughtException(Thread t, Throwable e) {
534         abort("Uncaught exception in service thread " + t.getName(), e);
535       }
536     };
537 
538     // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
539     // underlying hadoop hdfs accessors will be going against wrong filesystem
540     // (unless all is set to defaults).
541     FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
542     // Get fs instance used by this RS.  Do we use checksum verification in the hbase? If hbase
543     // checksum verification enabled, then automatically switch off hdfs checksum verification.
544     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
545     this.fs = new HFileSystem(this.conf, useHBaseChecksum);
546     this.rootDir = FSUtils.getRootDir(this.conf);
547     this.tableDescriptors = getFsTableDescriptors();
548 
549     service = new ExecutorService(getServerName().toShortString());
550     spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
551 
552     // Some unit tests don't need a cluster, so no zookeeper at all
553     if (!conf.getBoolean("hbase.testing.nocluster", false)) {
554       // Open connection to zookeeper and set primary watcher
555       zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
556         rpcServices.isa.getPort(), this, canCreateBaseZNode());
557 
558       this.csm = (BaseCoordinatedStateManager) csm;
559       this.csm.initialize(this);
560       this.csm.start();
561 
562       tableLockManager = TableLockManager.createTableLockManager(
563         conf, zooKeeper, serverName);
564 
565       masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
566       masterAddressTracker.start();
567 
568       clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
569       clusterStatusTracker.start();
570     }
571     this.configurationManager = new ConfigurationManager();
572 
573     rpcServices.start();
574     putUpWebUI();
575     this.walRoller = new LogRoller(this, this);
576     this.choreService = new ChoreService(getServerName().toString());
577   }
578 
579   protected TableDescriptors getFsTableDescriptors() throws IOException {
580     return new FSTableDescriptors(this.conf,
581       this.fs, this.rootDir, !canUpdateTableDescriptor(), false);
582   }
583 
584   protected void login(UserProvider user, String host) throws IOException {
585     user.login("hbase.regionserver.keytab.file",
586       "hbase.regionserver.kerberos.principal", host);
587   }
588 
589   protected void waitForMasterActive(){
590   }
591 
592   protected String getProcessName() {
593     return REGIONSERVER;
594   }
595 
596   protected boolean canCreateBaseZNode() {
597     return false;
598   }
599 
600   protected boolean canUpdateTableDescriptor() {
601     return false;
602   }
603 
604   protected RSRpcServices createRpcServices() throws IOException {
605     return new RSRpcServices(this);
606   }
607 
608   protected void configureInfoServer() {
609     infoServer.addServlet("rs-status", "/rs-status", RSStatusServlet.class);
610     infoServer.setAttribute(REGIONSERVER, this);
611   }
612 
613   protected Class<? extends HttpServlet> getDumpServlet() {
614     return RSDumpServlet.class;
615   }
616 
617   protected void doMetrics() {
618   }
619 
620   @Override
621   public boolean registerService(Service instance) {
622     /*
623      * No stacking of instances is allowed for a single service name
624      */
625     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
626     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
627       LOG.error("Coprocessor service " + serviceDesc.getFullName()
628           + " already registered, rejecting request from " + instance);
629       return false;
630     }
631 
632     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
633     if (LOG.isDebugEnabled()) {
634       LOG.debug("Registered regionserver coprocessor service: service=" + serviceDesc.getFullName());
635     }
636     return true;
637   }
638 
639   /**
640    * Create a 'smarter' HConnection, one that is capable of by-passing RPC if the request is to
641    * the local server.  Safe to use going to local or remote server.
642    * Create this instance in a method can be intercepted and mocked in tests.
643    * @throws IOException
644    */
645   @VisibleForTesting
646   protected ClusterConnection createClusterConnection() throws IOException {
647     // Create a cluster connection that when appropriate, can short-circuit and go directly to the
648     // local server if the request is to the local server bypassing RPC. Can be used for both local
649     // and remote invocations.
650     return ConnectionUtils.createShortCircuitHConnection(
651       ConnectionFactory.createConnection(conf), serverName, rpcServices, rpcServices);
652   }
653 
654   /**
655    * Run test on configured codecs to make sure supporting libs are in place.
656    * @param c
657    * @throws IOException
658    */
659   private static void checkCodecs(final Configuration c) throws IOException {
660     // check to see if the codec list is available:
661     String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
662     if (codecs == null) return;
663     for (String codec : codecs) {
664       if (!CompressionTest.testCompression(codec)) {
665         throw new IOException("Compression codec " + codec +
666           " not supported, aborting RS construction");
667       }
668     }
669   }
670 
671   public String getClusterId() {
672     return this.clusterId;
673   }
674 
675   /**
676    * Setup our cluster connection if not already initialized.
677    * @throws IOException
678    */
679   protected synchronized void setupClusterConnection() throws IOException {
680     if (clusterConnection == null) {
681       clusterConnection = createClusterConnection();
682       metaTableLocator = new MetaTableLocator();
683     }
684   }
685 
686   /**
687    * All initialization needed before we go register with Master.
688    *
689    * @throws IOException
690    * @throws InterruptedException
691    */
692   private void preRegistrationInitialization(){
693     try {
694       setupClusterConnection();
695 
696       // Health checker thread.
697       if (isHealthCheckerConfigured()) {
698         int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
699           HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
700         healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
701       }
702       this.pauseMonitor = new JvmPauseMonitor(conf);
703       pauseMonitor.start();
704 
705       initializeZooKeeper();
706       if (!isStopped() && !isAborted()) {
707         initializeThreads();
708       }
709     } catch (Throwable t) {
710       // Call stop if error or process will stick around for ever since server
711       // puts up non-daemon threads.
712       this.rpcServices.stop();
713       abort("Initialization of RS failed.  Hence aborting RS.", t);
714     }
715   }
716 
717   /**
718    * Bring up connection to zk ensemble and then wait until a master for this
719    * cluster and then after that, wait until cluster 'up' flag has been set.
720    * This is the order in which master does things.
721    * Finally open long-living server short-circuit connection.
722    * @throws IOException
723    * @throws InterruptedException
724    */
725   private void initializeZooKeeper() throws IOException, InterruptedException {
726     // Create the master address tracker, register with zk, and start it.  Then
727     // block until a master is available.  No point in starting up if no master
728     // running.
729     blockAndCheckIfStopped(this.masterAddressTracker);
730 
731     // Wait on cluster being up.  Master will set this flag up in zookeeper
732     // when ready.
733     blockAndCheckIfStopped(this.clusterStatusTracker);
734 
735     // Retrieve clusterId
736     // Since cluster status is now up
737     // ID should have already been set by HMaster
738     try {
739       clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
740       if (clusterId == null) {
741         this.abort("Cluster ID has not been set");
742       }
743       LOG.info("ClusterId : "+clusterId);
744     } catch (KeeperException e) {
745       this.abort("Failed to retrieve Cluster ID",e);
746     }
747 
748     // In case colocated master, wait here till it's active.
749     // So backup masters won't start as regionservers.
750     // This is to avoid showing backup masters as regionservers
751     // in master web UI, or assigning any region to them.
752     waitForMasterActive();
753     if (isStopped() || isAborted()) {
754       return; // No need for further initialization
755     }
756 
757     // watch for snapshots and other procedures
758     try {
759       rspmHost = new RegionServerProcedureManagerHost();
760       rspmHost.loadProcedures(conf);
761       rspmHost.initialize(this);
762     } catch (KeeperException e) {
763       this.abort("Failed to reach zk cluster when creating procedure handler.", e);
764     }
765     // register watcher for recovering regions
766     this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
767   }
768 
769   /**
770    * Utilty method to wait indefinitely on a znode availability while checking
771    * if the region server is shut down
772    * @param tracker znode tracker to use
773    * @throws IOException any IO exception, plus if the RS is stopped
774    * @throws InterruptedException
775    */
776   private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker)
777       throws IOException, InterruptedException {
778     while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
779       if (this.stopped) {
780         throw new IOException("Received the shutdown message while waiting.");
781       }
782     }
783   }
784 
785   /**
786    * @return False if cluster shutdown in progress
787    */
788   private boolean isClusterUp() {
789     return clusterStatusTracker != null && clusterStatusTracker.isClusterUp();
790   }
791 
792   private void initializeThreads() throws IOException {
793     // Cache flushing thread.
794     this.cacheFlusher = new MemStoreFlusher(conf, this);
795 
796     // Compaction thread
797     this.compactSplitThread = new CompactSplitThread(this);
798 
799     // Background thread to check for compactions; needed if region has not gotten updates
800     // in a while. It will take care of not checking too frequently on store-by-store basis.
801     this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
802     this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
803     this.leases = new Leases(this.threadWakeFrequency);
804 
805     // Create the thread to clean the moved regions list
806     movedRegionsCleaner = MovedRegionsCleaner.create(this);
807 
808     if (this.nonceManager != null) {
809       // Create the scheduled chore that cleans up nonces.
810       nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
811     }
812 
813     // Setup the Quota Manager
814     rsQuotaManager = new RegionServerQuotaManager(this);
815 
816     // Setup RPC client for master communication
817     rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
818         rpcServices.isa.getAddress(), 0));
819 
820     boolean onlyMetaRefresh = false;
821     int storefileRefreshPeriod = conf.getInt(
822         StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
823       , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
824     if (storefileRefreshPeriod == 0) {
825       storefileRefreshPeriod = conf.getInt(
826           StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
827           StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
828       onlyMetaRefresh = true;
829     }
830     if (storefileRefreshPeriod > 0) {
831       this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
832           onlyMetaRefresh, this, this);
833     }
834     registerConfigurationObservers();
835   }
836 
837   private void registerConfigurationObservers() {
838     // Registering the compactSplitThread object with the ConfigurationManager.
839     configurationManager.registerObserver(this.compactSplitThread);
840   }
841 
842   /**
843    * The HRegionServer sticks in this loop until closed.
844    */
845   @Override
846   public void run() {
847     try {
848       // Do pre-registration initializations; zookeeper, lease threads, etc.
849       preRegistrationInitialization();
850     } catch (Throwable e) {
851       abort("Fatal exception during initialization", e);
852     }
853 
854     try {
855       if (!isStopped() && !isAborted()) {
856         ShutdownHook.install(conf, fs, this, Thread.currentThread());
857         // Set our ephemeral znode up in zookeeper now we have a name.
858         createMyEphemeralNode();
859         // Initialize the RegionServerCoprocessorHost now that our ephemeral
860         // node was created, in case any coprocessors want to use ZooKeeper
861         this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
862       }
863 
864       // Try and register with the Master; tell it we are here.  Break if
865       // server is stopped or the clusterup flag is down or hdfs went wacky.
866       while (keepLooping()) {
867         RegionServerStartupResponse w = reportForDuty();
868         if (w == null) {
869           LOG.warn("reportForDuty failed; sleeping and then retrying.");
870           this.sleeper.sleep();
871         } else {
872           handleReportForDutyResponse(w);
873           break;
874         }
875       }
876 
877       if (!isStopped() && isHealthy()){
878         // start the snapshot handler and other procedure handlers,
879         // since the server is ready to run
880         rspmHost.start();
881 
882         // Start the Quota Manager
883         rsQuotaManager.start(getRpcServer().getScheduler());
884       }
885 
886       // We registered with the Master.  Go into run mode.
887       long lastMsg = System.currentTimeMillis();
888       long oldRequestCount = -1;
889       // The main run loop.
890       while (!isStopped() && isHealthy()) {
891         if (!isClusterUp()) {
892           if (isOnlineRegionsEmpty()) {
893             stop("Exiting; cluster shutdown set and not carrying any regions");
894           } else if (!this.stopping) {
895             this.stopping = true;
896             LOG.info("Closing user regions");
897             closeUserRegions(this.abortRequested);
898           } else if (this.stopping) {
899             boolean allUserRegionsOffline = areAllUserRegionsOffline();
900             if (allUserRegionsOffline) {
901               // Set stopped if no more write requests tp meta tables
902               // since last time we went around the loop.  Any open
903               // meta regions will be closed on our way out.
904               if (oldRequestCount == getWriteRequestCount()) {
905                 stop("Stopped; only catalog regions remaining online");
906                 break;
907               }
908               oldRequestCount = getWriteRequestCount();
909             } else {
910               // Make sure all regions have been closed -- some regions may
911               // have not got it because we were splitting at the time of
912               // the call to closeUserRegions.
913               closeUserRegions(this.abortRequested);
914             }
915             LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
916           }
917         }
918         long now = System.currentTimeMillis();
919         if ((now - lastMsg) >= msgInterval) {
920           tryRegionServerReport(lastMsg, now);
921           lastMsg = System.currentTimeMillis();
922           doMetrics();
923         }
924         if (!isStopped() && !isAborted()) {
925           this.sleeper.sleep();
926         }
927       } // for
928     } catch (Throwable t) {
929       if (!rpcServices.checkOOME(t)) {
930         String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
931         abort(prefix + t.getMessage(), t);
932       }
933     }
934     // Run shutdown.
935     if (mxBean != null) {
936       MBeanUtil.unregisterMBean(mxBean);
937       mxBean = null;
938     }
939     if (this.leases != null) this.leases.closeAfterLeasesExpire();
940     if (this.splitLogWorker != null) {
941       splitLogWorker.stop();
942     }
943     if (this.infoServer != null) {
944       LOG.info("Stopping infoServer");
945       try {
946         this.infoServer.stop();
947       } catch (Exception e) {
948         LOG.error("Failed to stop infoServer", e);
949       }
950     }
951     // Send cache a shutdown.
952     if (cacheConfig != null && cacheConfig.isBlockCacheEnabled()) {
953       cacheConfig.getBlockCache().shutdown();
954     }
955 
956     if (movedRegionsCleaner != null) {
957       movedRegionsCleaner.stop("Region Server stopping");
958     }
959 
960     // Send interrupts to wake up threads if sleeping so they notice shutdown.
961     // TODO: Should we check they are alive? If OOME could have exited already
962     if(this.hMemManager != null) this.hMemManager.stop();
963     if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
964     if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
965     if (this.compactionChecker != null) this.compactionChecker.cancel(true);
966     if (this.healthCheckChore != null) this.healthCheckChore.cancel(true);
967     if (this.nonceManagerChore != null) this.nonceManagerChore.cancel(true);
968     if (this.storefileRefresher != null) this.storefileRefresher.cancel(true);
969 
970     // Stop the quota manager
971     if (rsQuotaManager != null) {
972       rsQuotaManager.stop();
973     }
974 
975     // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
976     if (rspmHost != null) {
977       rspmHost.stop(this.abortRequested || this.killed);
978     }
979 
980     if (this.killed) {
981       // Just skip out w/o closing regions.  Used when testing.
982     } else if (abortRequested) {
983       if (this.fsOk) {
984         closeUserRegions(abortRequested); // Don't leave any open file handles
985       }
986       LOG.info("aborting server " + this.serverName);
987     } else {
988       closeUserRegions(abortRequested);
989       LOG.info("stopping server " + this.serverName);
990     }
991 
992     // so callers waiting for meta without timeout can stop
993     if (this.metaTableLocator != null) this.metaTableLocator.stop();
994     if (this.clusterConnection != null && !clusterConnection.isClosed()) {
995       try {
996         this.clusterConnection.close();
997       } catch (IOException e) {
998         // Although the {@link Closeable} interface throws an {@link
999         // IOException}, in reality, the implementation would never do that.
1000         LOG.warn("Attempt to close server's short circuit HConnection failed.", e);
1001       }
1002     }
1003 
1004     // Closing the compactSplit thread before closing meta regions
1005     if (!this.killed && containsMetaTableRegions()) {
1006       if (!abortRequested || this.fsOk) {
1007         if (this.compactSplitThread != null) {
1008           this.compactSplitThread.join();
1009           this.compactSplitThread = null;
1010         }
1011         closeMetaTableRegions(abortRequested);
1012       }
1013     }
1014 
1015     if (!this.killed && this.fsOk) {
1016       waitOnAllRegionsToClose(abortRequested);
1017       LOG.info("stopping server " + this.serverName +
1018         "; all regions closed.");
1019     }
1020 
1021     //fsOk flag may be changed when closing regions throws exception.
1022     if (this.fsOk) {
1023       shutdownWAL(!abortRequested);
1024     }
1025 
1026     // Make sure the proxy is down.
1027     if (this.rssStub != null) {
1028       this.rssStub = null;
1029     }
1030     if (this.rpcClient != null) {
1031       this.rpcClient.close();
1032     }
1033     if (this.leases != null) {
1034       this.leases.close();
1035     }
1036     if (this.pauseMonitor != null) {
1037       this.pauseMonitor.stop();
1038     }
1039 
1040     if (!killed) {
1041       stopServiceThreads();
1042     }
1043 
1044     if (this.rpcServices != null) {
1045       this.rpcServices.stop();
1046     }
1047 
1048     try {
1049       deleteMyEphemeralNode();
1050     } catch (KeeperException.NoNodeException nn) {
1051     } catch (KeeperException e) {
1052       LOG.warn("Failed deleting my ephemeral node", e);
1053     }
1054     // We may have failed to delete the znode at the previous step, but
1055     //  we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1056     ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1057 
1058     if (this.zooKeeper != null) {
1059       this.zooKeeper.close();
1060     }
1061     LOG.info("stopping server " + this.serverName +
1062       "; zookeeper connection closed.");
1063 
1064     LOG.info(Thread.currentThread().getName() + " exiting");
1065   }
1066 
1067   private boolean containsMetaTableRegions() {
1068     return onlineRegions.containsKey(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1069   }
1070 
1071   private boolean areAllUserRegionsOffline() {
1072     if (getNumberOfOnlineRegions() > 2) return false;
1073     boolean allUserRegionsOffline = true;
1074     for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1075       if (!e.getValue().getRegionInfo().isMetaTable()) {
1076         allUserRegionsOffline = false;
1077         break;
1078       }
1079     }
1080     return allUserRegionsOffline;
1081   }
1082 
1083   /**
1084    * @return Current write count for all online regions.
1085    */
1086   private long getWriteRequestCount() {
1087     int writeCount = 0;
1088     for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1089       writeCount += e.getValue().getWriteRequestsCount();
1090     }
1091     return writeCount;
1092   }
1093 
1094   @VisibleForTesting
1095   protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1096   throws IOException {
1097     RegionServerStatusService.BlockingInterface rss = rssStub;
1098     if (rss == null) {
1099       // the current server could be stopping.
1100       return;
1101     }
1102     ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1103     try {
1104       RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1105       ServerName sn = ServerName.parseVersionedServerName(
1106         this.serverName.getVersionedBytes());
1107       request.setServer(ProtobufUtil.toServerName(sn));
1108       request.setLoad(sl);
1109       rss.regionServerReport(null, request.build());
1110     } catch (ServiceException se) {
1111       IOException ioe = ProtobufUtil.getRemoteException(se);
1112       if (ioe instanceof YouAreDeadException) {
1113         // This will be caught and handled as a fatal error in run()
1114         throw ioe;
1115       }
1116       if (rssStub == rss) {
1117         rssStub = null;
1118       }
1119       // Couldn't connect to the master, get location from zk and reconnect
1120       // Method blocks until new master is found or we are stopped
1121       createRegionServerStatusStub();
1122     }
1123   }
1124 
1125   ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1126       throws IOException {
1127     // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1128     // per second, and other metrics  As long as metrics are part of ServerLoad it's best to use
1129     // the wrapper to compute those numbers in one place.
1130     // In the long term most of these should be moved off of ServerLoad and the heart beat.
1131     // Instead they should be stored in an HBase table so that external visibility into HBase is
1132     // improved; Additionally the load balancer will be able to take advantage of a more complete
1133     // history.
1134     MetricsRegionServerWrapper regionServerWrapper = this.metricsRegionServer.getRegionServerWrapper();
1135     Collection<HRegion> regions = getOnlineRegionsLocalContext();
1136     MemoryUsage memory =
1137       ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
1138 
1139     ClusterStatusProtos.ServerLoad.Builder serverLoad =
1140       ClusterStatusProtos.ServerLoad.newBuilder();
1141     serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1142     serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
1143     serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024));
1144     serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024));
1145     Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1146     for (String coprocessor : coprocessors) {
1147       serverLoad.addCoprocessors(
1148         Coprocessor.newBuilder().setName(coprocessor).build());
1149     }
1150     RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1151     RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1152     for (HRegion region : regions) {
1153       serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1154       for (String coprocessor :
1155           getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()) {
1156         serverLoad.addCoprocessors(Coprocessor.newBuilder().setName(coprocessor).build());
1157       }
1158     }
1159     serverLoad.setReportStartTime(reportStartTime);
1160     serverLoad.setReportEndTime(reportEndTime);
1161     if (this.infoServer != null) {
1162       serverLoad.setInfoServerPort(this.infoServer.getPort());
1163     } else {
1164       serverLoad.setInfoServerPort(-1);
1165     }
1166 
1167     // for the replicationLoad purpose. Only need to get from one service
1168     // either source or sink will get the same info
1169     ReplicationSourceService rsources = getReplicationSourceService();
1170 
1171     if (rsources != null) {
1172       // always refresh first to get the latest value
1173       ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
1174       if (rLoad != null) {
1175         serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1176         for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) {
1177           serverLoad.addReplLoadSource(rLS);
1178         }
1179       }
1180     }
1181 
1182     return serverLoad.build();
1183   }
1184 
1185   String getOnlineRegionsAsPrintableString() {
1186     StringBuilder sb = new StringBuilder();
1187     for (HRegion r: this.onlineRegions.values()) {
1188       if (sb.length() > 0) sb.append(", ");
1189       sb.append(r.getRegionInfo().getEncodedName());
1190     }
1191     return sb.toString();
1192   }
1193 
1194   /**
1195    * Wait on regions close.
1196    */
1197   private void waitOnAllRegionsToClose(final boolean abort) {
1198     // Wait till all regions are closed before going out.
1199     int lastCount = -1;
1200     long previousLogTime = 0;
1201     Set<String> closedRegions = new HashSet<String>();
1202     boolean interrupted = false;
1203     try {
1204       while (!isOnlineRegionsEmpty()) {
1205         int count = getNumberOfOnlineRegions();
1206         // Only print a message if the count of regions has changed.
1207         if (count != lastCount) {
1208           // Log every second at most
1209           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1210             previousLogTime = System.currentTimeMillis();
1211             lastCount = count;
1212             LOG.info("Waiting on " + count + " regions to close");
1213             // Only print out regions still closing if a small number else will
1214             // swamp the log.
1215             if (count < 10 && LOG.isDebugEnabled()) {
1216               LOG.debug(this.onlineRegions);
1217             }
1218           }
1219         }
1220         // Ensure all user regions have been sent a close. Use this to
1221         // protect against the case where an open comes in after we start the
1222         // iterator of onlineRegions to close all user regions.
1223         for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1224           HRegionInfo hri = e.getValue().getRegionInfo();
1225           if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1226               && !closedRegions.contains(hri.getEncodedName())) {
1227             closedRegions.add(hri.getEncodedName());
1228             // Don't update zk with this close transition; pass false.
1229             closeRegionIgnoreErrors(hri, abort);
1230               }
1231         }
1232         // No regions in RIT, we could stop waiting now.
1233         if (this.regionsInTransitionInRS.isEmpty()) {
1234           if (!isOnlineRegionsEmpty()) {
1235             LOG.info("We were exiting though online regions are not empty," +
1236                 " because some regions failed closing");
1237           }
1238           break;
1239         }
1240         if (sleep(200)) {
1241           interrupted = true;
1242         }
1243       }
1244     } finally {
1245       if (interrupted) {
1246         Thread.currentThread().interrupt();
1247       }
1248     }
1249   }
1250 
1251   private boolean sleep(long millis) {
1252     boolean interrupted = false;
1253     try {
1254       Thread.sleep(millis);
1255     } catch (InterruptedException e) {
1256       LOG.warn("Interrupted while sleeping");
1257       interrupted = true;
1258     }
1259     return interrupted;
1260   }
1261 
1262   private void shutdownWAL(final boolean close) {
1263     if (this.walFactory != null) {
1264       try {
1265         if (close) {
1266           walFactory.close();
1267         } else {
1268           walFactory.shutdown();
1269         }
1270       } catch (Throwable e) {
1271         e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1272         LOG.error("Shutdown / close of WAL failed: " + e);
1273         LOG.debug("Shutdown / close exception details:", e);
1274       }
1275     }
1276   }
1277 
1278   /*
1279    * Run init. Sets up wal and starts up all server threads.
1280    *
1281    * @param c Extra configuration.
1282    */
1283   protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1284   throws IOException {
1285     try {
1286       for (NameStringPair e : c.getMapEntriesList()) {
1287         String key = e.getName();
1288         // The hostname the master sees us as.
1289         if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1290           String hostnameFromMasterPOV = e.getValue();
1291           this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1292             rpcServices.isa.getPort(), this.startcode);
1293           if (!hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) {
1294             LOG.info("Master passed us a different hostname to use; was=" +
1295               rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV);
1296           }
1297           continue;
1298         }
1299         String value = e.getValue();
1300         if (LOG.isDebugEnabled()) {
1301           LOG.info("Config from master: " + key + "=" + value);
1302         }
1303         this.conf.set(key, value);
1304       }
1305 
1306       // hack! Maps DFSClient => RegionServer for logs.  HDFS made this
1307       // config param for task trackers, but we can piggyback off of it.
1308       if (this.conf.get("mapreduce.task.attempt.id") == null) {
1309         this.conf.set("mapreduce.task.attempt.id", "hb_rs_" +
1310           this.serverName.toString());
1311       }
1312 
1313       // Save it in a file, this will allow to see if we crash
1314       ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1315 
1316       this.cacheConfig = new CacheConfig(conf);
1317       this.walFactory = setupWALAndReplication();
1318       // Init in here rather than in constructor after thread name has been set
1319       this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
1320 
1321       startServiceThreads();
1322       startHeapMemoryManager();
1323       LOG.info("Serving as " + this.serverName +
1324         ", RpcServer on " + rpcServices.isa +
1325         ", sessionid=0x" +
1326         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1327 
1328       // Wake up anyone waiting for this server to online
1329       synchronized (online) {
1330         online.set(true);
1331         online.notifyAll();
1332       }
1333     } catch (Throwable e) {
1334       stop("Failed initialization");
1335       throw convertThrowableToIOE(cleanup(e, "Failed init"),
1336           "Region server startup failed");
1337     } finally {
1338       sleeper.skipSleepCycle();
1339     }
1340   }
1341 
1342   private void startHeapMemoryManager() {
1343     this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this);
1344     if (this.hMemManager != null) {
1345       this.hMemManager.start(getChoreService());
1346     }
1347   }
1348 
1349   private void createMyEphemeralNode() throws KeeperException, IOException {
1350     RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1351     rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1352     byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1353     ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper,
1354       getMyEphemeralNodePath(), data);
1355   }
1356 
1357   private void deleteMyEphemeralNode() throws KeeperException {
1358     ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1359   }
1360 
1361   @Override
1362   public RegionServerAccounting getRegionServerAccounting() {
1363     return regionServerAccounting;
1364   }
1365 
1366   @Override
1367   public TableLockManager getTableLockManager() {
1368     return tableLockManager;
1369   }
1370 
1371   /*
1372    * @param r Region to get RegionLoad for.
1373    * @param regionLoadBldr the RegionLoad.Builder, can be null
1374    * @param regionSpecifier the RegionSpecifier.Builder, can be null
1375    * @return RegionLoad instance.
1376    *
1377    * @throws IOException
1378    */
1379   private RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1380       RegionSpecifier.Builder regionSpecifier) throws IOException {
1381     byte[] name = r.getRegionName();
1382     int stores = 0;
1383     int storefiles = 0;
1384     int storeUncompressedSizeMB = 0;
1385     int storefileSizeMB = 0;
1386     int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
1387     int storefileIndexSizeMB = 0;
1388     int rootIndexSizeKB = 0;
1389     int totalStaticIndexSizeKB = 0;
1390     int totalStaticBloomSizeKB = 0;
1391     long totalCompactingKVs = 0;
1392     long currentCompactedKVs = 0;
1393     synchronized (r.stores) {
1394       stores += r.stores.size();
1395       for (Store store : r.stores.values()) {
1396         storefiles += store.getStorefilesCount();
1397         storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed()
1398             / 1024 / 1024);
1399         storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1400         storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
1401         CompactionProgress progress = store.getCompactionProgress();
1402         if (progress != null) {
1403           totalCompactingKVs += progress.totalCompactingKVs;
1404           currentCompactedKVs += progress.currentCompactedKVs;
1405         }
1406 
1407         rootIndexSizeKB +=
1408             (int) (store.getStorefilesIndexSize() / 1024);
1409 
1410         totalStaticIndexSizeKB +=
1411           (int) (store.getTotalStaticIndexSize() / 1024);
1412 
1413         totalStaticBloomSizeKB +=
1414           (int) (store.getTotalStaticBloomSize() / 1024);
1415       }
1416     }
1417     float dataLocality =
1418         r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname());
1419     if (regionLoadBldr == null) {
1420       regionLoadBldr = RegionLoad.newBuilder();
1421     }
1422     if (regionSpecifier == null) {
1423       regionSpecifier = RegionSpecifier.newBuilder();
1424     }
1425     regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1426     regionSpecifier.setValue(ByteStringer.wrap(name));
1427     regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1428       .setStores(stores)
1429       .setStorefiles(storefiles)
1430       .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1431       .setStorefileSizeMB(storefileSizeMB)
1432       .setMemstoreSizeMB(memstoreSizeMB)
1433       .setStorefileIndexSizeMB(storefileIndexSizeMB)
1434       .setRootIndexSizeKB(rootIndexSizeKB)
1435       .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1436       .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1437       .setReadRequestsCount(r.readRequestsCount.get())
1438       .setWriteRequestsCount(r.writeRequestsCount.get())
1439       .setTotalCompactingKVs(totalCompactingKVs)
1440       .setCurrentCompactedKVs(currentCompactedKVs)
1441       .setCompleteSequenceId(r.maxFlushedSeqId)
1442       .setDataLocality(dataLocality)
1443       .setLastMajorCompactionTs(r.getOldestHfileTs(true));
1444     return regionLoadBldr.build();
1445   }
1446 
1447   /**
1448    * @param encodedRegionName
1449    * @return An instance of RegionLoad.
1450    */
1451   public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1452     HRegion r = null;
1453     r = this.onlineRegions.get(encodedRegionName);
1454     return r != null ? createRegionLoad(r, null, null) : null;
1455   }
1456 
1457   /*
1458    * Inner class that runs on a long period checking if regions need compaction.
1459    */
1460   private static class CompactionChecker extends ScheduledChore {
1461     private final HRegionServer instance;
1462     private final int majorCompactPriority;
1463     private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1464     private long iteration = 0;
1465 
1466     CompactionChecker(final HRegionServer h, final int sleepTime,
1467         final Stoppable stopper) {
1468       super("CompactionChecker", stopper, sleepTime);
1469       this.instance = h;
1470       LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));
1471 
1472       /* MajorCompactPriority is configurable.
1473        * If not set, the compaction will use default priority.
1474        */
1475       this.majorCompactPriority = this.instance.conf.
1476         getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1477         DEFAULT_PRIORITY);
1478     }
1479 
1480     @Override
1481     protected void chore() {
1482       for (HRegion r : this.instance.onlineRegions.values()) {
1483         if (r == null)
1484           continue;
1485         for (Store s : r.getStores().values()) {
1486           try {
1487             long multiplier = s.getCompactionCheckMultiplier();
1488             assert multiplier > 0;
1489             if (iteration % multiplier != 0) continue;
1490             if (s.needsCompaction()) {
1491               // Queue a compaction. Will recognize if major is needed.
1492               this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
1493                   + " requests compaction");
1494             } else if (s.isMajorCompaction()) {
1495               if (majorCompactPriority == DEFAULT_PRIORITY
1496                   || majorCompactPriority > r.getCompactPriority()) {
1497                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1498                     + " requests major compaction; use default priority", null);
1499               } else {
1500                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1501                     + " requests major compaction; use configured priority",
1502                   this.majorCompactPriority, null);
1503               }
1504             }
1505           } catch (IOException e) {
1506             LOG.warn("Failed major compaction check on " + r, e);
1507           }
1508         }
1509       }
1510       iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1511     }
1512   }
1513 
1514   static class PeriodicMemstoreFlusher extends ScheduledChore {
1515     final HRegionServer server;
1516     final static int RANGE_OF_DELAY = 20000; //millisec
1517     final static int MIN_DELAY_TIME = 3000; //millisec
1518     public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1519       super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval);
1520       this.server = server;
1521     }
1522 
1523     @Override
1524     protected void chore() {
1525       for (HRegion r : this.server.onlineRegions.values()) {
1526         if (r == null)
1527           continue;
1528         if (r.shouldFlush()) {
1529           FlushRequester requester = server.getFlushRequester();
1530           if (requester != null) {
1531             long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
1532             LOG.info(getName() + " requesting flush for region " + r.getRegionNameAsString() +
1533                 " after a delay of " + randomDelay);
1534             //Throttle the flushes by putting a delay. If we don't throttle, and there
1535             //is a balanced write-load on the regions in a table, we might end up
1536             //overwhelming the filesystem with too many flushes at once.
1537             requester.requestDelayedFlush(r, randomDelay, false);
1538           }
1539         }
1540       }
1541     }
1542   }
1543 
1544   /**
1545    * Report the status of the server. A server is online once all the startup is
1546    * completed (setting up filesystem, starting service threads, etc.). This
1547    * method is designed mostly to be useful in tests.
1548    *
1549    * @return true if online, false if not.
1550    */
1551   public boolean isOnline() {
1552     return online.get();
1553   }
1554 
1555   /**
1556    * Setup WAL log and replication if enabled.
1557    * Replication setup is done in here because it wants to be hooked up to WAL.
1558    * @return A WAL instance.
1559    * @throws IOException
1560    */
1561   private WALFactory setupWALAndReplication() throws IOException {
1562     // TODO Replication make assumptions here based on the default filesystem impl
1563     final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1564     final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString());
1565 
1566     Path logdir = new Path(rootDir, logName);
1567     if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1568     if (this.fs.exists(logdir)) {
1569       throw new RegionServerRunningException("Region server has already " +
1570         "created directory at " + this.serverName.toString());
1571     }
1572 
1573     // Instantiate replication manager if replication enabled.  Pass it the
1574     // log directories.
1575     createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
1576 
1577     // listeners the wal factory will add to wals it creates.
1578     final List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1579     listeners.add(new MetricsWAL());
1580     if (this.replicationSourceHandler != null &&
1581         this.replicationSourceHandler.getWALActionsListener() != null) {
1582       // Replication handler is an implementation of WALActionsListener.
1583       listeners.add(this.replicationSourceHandler.getWALActionsListener());
1584     }
1585 
1586     return new WALFactory(conf, listeners, serverName.toString());
1587   }
1588 
1589   /**
1590    * We initialize the roller for the wal that handles meta lazily
1591    * since we don't know if this regionserver will handle it. All calls to
1592    * this method return a reference to the that same roller. As newly referenced
1593    * meta regions are brought online, they will be offered to the roller for maintenance.
1594    * As a part of that registration process, the roller will add itself as a
1595    * listener on the wal.
1596    */
1597   protected LogRoller ensureMetaWALRoller() {
1598     // Using a tmp log roller to ensure metaLogRoller is alive once it is not
1599     // null
1600     LogRoller roller = metawalRoller.get();
1601     if (null == roller) {
1602       LogRoller tmpLogRoller = new LogRoller(this, this);
1603       String n = Thread.currentThread().getName();
1604       Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
1605           n + "-MetaLogRoller", uncaughtExceptionHandler);
1606       if (metawalRoller.compareAndSet(null, tmpLogRoller)) {
1607         roller = tmpLogRoller;
1608       } else {
1609         // Another thread won starting the roller
1610         Threads.shutdown(tmpLogRoller.getThread());
1611         roller = metawalRoller.get();
1612       }
1613     }
1614     return roller;
1615   }
1616 
1617   public MetricsRegionServer getRegionServerMetrics() {
1618     return this.metricsRegionServer;
1619   }
1620 
1621   /**
1622    * @return Master address tracker instance.
1623    */
1624   public MasterAddressTracker getMasterAddressTracker() {
1625     return this.masterAddressTracker;
1626   }
1627 
1628   /*
1629    * Start maintenance Threads, Server, Worker and lease checker threads.
1630    * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1631    * get an unhandled exception. We cannot set the handler on all threads.
1632    * Server's internal Listener thread is off limits. For Server, if an OOME, it
1633    * waits a while then retries. Meantime, a flush or a compaction that tries to
1634    * run should trigger same critical condition and the shutdown will run. On
1635    * its way out, this server will shut down Server. Leases are sort of
1636    * inbetween. It has an internal thread that while it inherits from Chore, it
1637    * keeps its own internal stop mechanism so needs to be stopped by this
1638    * hosting server. Worker logs the exception and exits.
1639    */
1640   private void startServiceThreads() throws IOException {
1641     // Start executor services
1642     this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
1643       conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1644     this.service.startExecutorService(ExecutorType.RS_OPEN_META,
1645       conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1646     this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1647       conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1648     this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
1649       conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1650     if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1651       this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
1652         conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
1653     }
1654     this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
1655        "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
1656 
1657     if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1658       this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
1659         conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1660           conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
1661     }
1662 
1663     Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
1664         uncaughtExceptionHandler);
1665     this.cacheFlusher.start(uncaughtExceptionHandler);
1666 
1667     if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);
1668     if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher);
1669     if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore);
1670     if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore);
1671     if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher);
1672     if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner);
1673 
1674     // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1675     // an unhandled exception, it will just exit.
1676     this.leases.setName(getName() + ".leaseChecker");
1677     this.leases.start();
1678 
1679     if (this.replicationSourceHandler == this.replicationSinkHandler &&
1680         this.replicationSourceHandler != null) {
1681       this.replicationSourceHandler.startReplicationService();
1682     } else {
1683       if (this.replicationSourceHandler != null) {
1684         this.replicationSourceHandler.startReplicationService();
1685       }
1686       if (this.replicationSinkHandler != null) {
1687         this.replicationSinkHandler.startReplicationService();
1688       }
1689     }
1690 
1691     // Create the log splitting worker and start it
1692     // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
1693     // quite a while inside HConnection layer. The worker won't be available for other
1694     // tasks even after current task is preempted after a split task times out.
1695     Configuration sinkConf = HBaseConfiguration.create(conf);
1696     sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1697       conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
1698     sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1699       conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
1700     sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
1701     this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory);
1702     splitLogWorker.start();
1703   }
1704 
1705   /**
1706    * Puts up the webui.
1707    * @return Returns final port -- maybe different from what we started with.
1708    * @throws IOException
1709    */
1710   private int putUpWebUI() throws IOException {
1711     int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
1712       HConstants.DEFAULT_REGIONSERVER_INFOPORT);
1713     // -1 is for disabling info server
1714     if (port < 0) return port;
1715     String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
1716     if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
1717       String msg =
1718           "Failed to start http info server. Address " + addr
1719               + " does not belong to this host. Correct configuration parameter: "
1720               + "hbase.regionserver.info.bindAddress";
1721       LOG.error(msg);
1722       throw new IOException(msg);
1723     }
1724     // check if auto port bind enabled
1725     boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
1726         false);
1727     while (true) {
1728       try {
1729         this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
1730         infoServer.addServlet("dump", "/dump", getDumpServlet());
1731         configureInfoServer();
1732         this.infoServer.start();
1733         break;
1734       } catch (BindException e) {
1735         if (!auto) {
1736           // auto bind disabled throw BindException
1737           LOG.error("Failed binding http info server to port: " + port);
1738           throw e;
1739         }
1740         // auto bind enabled, try to use another port
1741         LOG.info("Failed binding http info server to port: " + port);
1742         port++;
1743       }
1744     }
1745     port = this.infoServer.getPort();
1746     conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
1747     int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT,
1748       HConstants.DEFAULT_MASTER_INFOPORT);
1749     conf.setInt("hbase.master.info.port.orig", masterInfoPort);
1750     conf.setInt(HConstants.MASTER_INFO_PORT, port);
1751     return port;
1752   }
1753 
1754   /*
1755    * Verify that server is healthy
1756    */
1757   private boolean isHealthy() {
1758     if (!fsOk) {
1759       // File system problem
1760       return false;
1761     }
1762     // Verify that all threads are alive
1763     if (!(leases.isAlive()
1764         && cacheFlusher.isAlive() && walRoller.isAlive()
1765         && this.compactionChecker.isScheduled()
1766         && this.periodicFlusher.isScheduled())) {
1767       stop("One or more threads are no longer alive -- stop");
1768       return false;
1769     }
1770     final LogRoller metawalRoller = this.metawalRoller.get();
1771     if (metawalRoller != null && !metawalRoller.isAlive()) {
1772       stop("Meta WAL roller thread is no longer alive -- stop");
1773       return false;
1774     }
1775     return true;
1776   }
1777 
1778   private static final byte[] UNSPECIFIED_REGION = new byte[]{};
1779 
1780   @Override
1781   public WAL getWAL(HRegionInfo regionInfo) throws IOException {
1782     WAL wal;
1783     LogRoller roller = walRoller;
1784     //_ROOT_ and hbase:meta regions have separate WAL.
1785     if (regionInfo != null && regionInfo.isMetaTable() &&
1786         regionInfo.getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
1787       roller = ensureMetaWALRoller();
1788       wal = walFactory.getMetaWAL(regionInfo.getEncodedNameAsBytes());
1789     } else if (regionInfo == null) {
1790       wal = walFactory.getWAL(UNSPECIFIED_REGION);
1791     } else {
1792       wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes());
1793     }
1794     roller.addWAL(wal);
1795     return wal;
1796   }
1797 
1798   @Override
1799   public ClusterConnection getConnection() {
1800     return this.clusterConnection;
1801   }
1802 
1803   @Override
1804   public MetaTableLocator getMetaTableLocator() {
1805     return this.metaTableLocator;
1806   }
1807 
1808   @Override
1809   public void stop(final String msg) {
1810     if (!this.stopped) {
1811       try {
1812         if (this.rsHost != null) {
1813           this.rsHost.preStop(msg);
1814         }
1815         this.stopped = true;
1816         LOG.info("STOPPED: " + msg);
1817         // Wakes run() if it is sleeping
1818         sleeper.skipSleepCycle();
1819       } catch (IOException exp) {
1820         LOG.warn("The region server did not stop", exp);
1821       }
1822     }
1823   }
1824 
1825   public void waitForServerOnline(){
1826     while (!isStopped() && !isOnline()) {
1827       synchronized (online) {
1828         try {
1829           online.wait(msgInterval);
1830         } catch (InterruptedException ie) {
1831           Thread.currentThread().interrupt();
1832           break;
1833         }
1834       }
1835     }
1836   }
1837 
1838   @Override
1839   public void postOpenDeployTasks(final HRegion r)
1840   throws KeeperException, IOException {
1841     rpcServices.checkOpen();
1842     LOG.info("Post open deploy tasks for " + r.getRegionNameAsString());
1843     // Do checks to see if we need to compact (references or too many files)
1844     for (Store s : r.getStores().values()) {
1845       if (s.hasReferences() || s.needsCompaction()) {
1846        this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
1847       }
1848     }
1849     long openSeqNum = r.getOpenSeqNum();
1850     if (openSeqNum == HConstants.NO_SEQNUM) {
1851       // If we opened a region, we should have read some sequence number from it.
1852       LOG.error("No sequence number found when opening " + r.getRegionNameAsString());
1853       openSeqNum = 0;
1854     }
1855 
1856     // Update flushed sequence id of a recovering region in ZK
1857     updateRecoveringRegionLastFlushedSequenceId(r);
1858 
1859     // Notify master
1860     if (!reportRegionStateTransition(
1861         TransitionCode.OPENED, openSeqNum, r.getRegionInfo())) {
1862       throw new IOException("Failed to report opened region to master: "
1863         + r.getRegionNameAsString());
1864     }
1865 
1866     triggerFlushInPrimaryRegion(r);
1867 
1868     LOG.debug("Finished post open deploy task for " + r.getRegionNameAsString());
1869   }
1870 
1871   @Override
1872   public boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris) {
1873     return reportRegionStateTransition(code, HConstants.NO_SEQNUM, hris);
1874   }
1875 
1876   @Override
1877   public boolean reportRegionStateTransition(
1878       TransitionCode code, long openSeqNum, HRegionInfo... hris) {
1879     if (TEST_SKIP_REPORTING_TRANSITION) {
1880       // This is for testing only in case there is no master
1881       // to handle the region transition report at all.
1882       if (code == TransitionCode.OPENED) {
1883         Preconditions.checkArgument(hris != null && hris.length == 1);
1884         if (hris[0].isMetaRegion()) {
1885           try {
1886             MetaTableLocator.setMetaLocation(getZooKeeper(), serverName,
1887                 hris[0].getReplicaId(),State.OPEN);
1888           } catch (KeeperException e) {
1889             LOG.info("Failed to update meta location", e);
1890             return false;
1891           }
1892         } else {
1893           try {
1894             MetaTableAccessor.updateRegionLocation(clusterConnection,
1895               hris[0], serverName, openSeqNum);
1896           } catch (IOException e) {
1897             LOG.info("Failed to update meta", e);
1898             return false;
1899           }
1900         }
1901       }
1902       return true;
1903     }
1904 
1905     ReportRegionStateTransitionRequest.Builder builder =
1906       ReportRegionStateTransitionRequest.newBuilder();
1907     builder.setServer(ProtobufUtil.toServerName(serverName));
1908     RegionStateTransition.Builder transition = builder.addTransitionBuilder();
1909     transition.setTransitionCode(code);
1910     if (code == TransitionCode.OPENED && openSeqNum >= 0) {
1911       transition.setOpenSeqNum(openSeqNum);
1912     }
1913     for (HRegionInfo hri: hris) {
1914       transition.addRegionInfo(HRegionInfo.convert(hri));
1915     }
1916     ReportRegionStateTransitionRequest request = builder.build();
1917     while (keepLooping()) {
1918       RegionServerStatusService.BlockingInterface rss = rssStub;
1919       try {
1920         if (rss == null) {
1921           createRegionServerStatusStub();
1922           continue;
1923         }
1924         ReportRegionStateTransitionResponse response =
1925           rss.reportRegionStateTransition(null, request);
1926         if (response.hasErrorMessage()) {
1927           LOG.info("Failed to transition " + hris[0]
1928             + " to " + code + ": " + response.getErrorMessage());
1929           return false;
1930         }
1931         return true;
1932       } catch (ServiceException se) {
1933         IOException ioe = ProtobufUtil.getRemoteException(se);
1934         LOG.info("Failed to report region transition, will retry", ioe);
1935         if (rssStub == rss) {
1936           rssStub = null;
1937         }
1938       }
1939     }
1940     return false;
1941   }
1942 
1943   /**
1944    * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
1945    * block this thread. See RegionReplicaFlushHandler for details.
1946    */
1947   void triggerFlushInPrimaryRegion(final HRegion region) {
1948     if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
1949       return;
1950     }
1951     if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(getConfiguration()) ||
1952         !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(
1953           getConfiguration())) {
1954       region.setReadsEnabled(true);
1955       return;
1956     }
1957 
1958     region.setReadsEnabled(false); // disable reads before marking the region as opened.
1959     // RegionReplicaFlushHandler might reset this.
1960 
1961     // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
1962     this.service.submit(
1963       new RegionReplicaFlushHandler(this, clusterConnection,
1964         rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
1965   }
1966 
1967   @Override
1968   public RpcServerInterface getRpcServer() {
1969     return rpcServices.rpcServer;
1970   }
1971 
1972   @VisibleForTesting
1973   public RSRpcServices getRSRpcServices() {
1974     return rpcServices;
1975   }
1976 
1977   /**
1978    * Cause the server to exit without closing the regions it is serving, the log
1979    * it is using and without notifying the master. Used unit testing and on
1980    * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
1981    *
1982    * @param reason
1983    *          the reason we are aborting
1984    * @param cause
1985    *          the exception that caused the abort, or null
1986    */
1987   @Override
1988   public void abort(String reason, Throwable cause) {
1989     String msg = "ABORTING region server " + this + ": " + reason;
1990     if (cause != null) {
1991       LOG.fatal(msg, cause);
1992     } else {
1993       LOG.fatal(msg);
1994     }
1995     this.abortRequested = true;
1996     // HBASE-4014: show list of coprocessors that were loaded to help debug
1997     // regionserver crashes.Note that we're implicitly using
1998     // java.util.HashSet's toString() method to print the coprocessor names.
1999     LOG.fatal("RegionServer abort: loaded coprocessors are: " +
2000         CoprocessorHost.getLoadedCoprocessors());
2001     // Try and dump metrics if abort -- might give clue as to how fatal came about....
2002     try {
2003       LOG.info("Dump of metrics as JSON on abort: " + JSONBean.dumpRegionServerMetrics());
2004     } catch (MalformedObjectNameException | IOException e) {
2005       LOG.warn("Failed dumping metrics", e);
2006     }
2007 
2008     // Do our best to report our abort to the master, but this may not work
2009     try {
2010       if (cause != null) {
2011         msg += "\nCause:\n" + StringUtils.stringifyException(cause);
2012       }
2013       // Report to the master but only if we have already registered with the master.
2014       if (rssStub != null && this.serverName != null) {
2015         ReportRSFatalErrorRequest.Builder builder =
2016           ReportRSFatalErrorRequest.newBuilder();
2017         ServerName sn =
2018           ServerName.parseVersionedServerName(this.serverName.getVersionedBytes());
2019         builder.setServer(ProtobufUtil.toServerName(sn));
2020         builder.setErrorMessage(msg);
2021         rssStub.reportRSFatalError(null, builder.build());
2022       }
2023     } catch (Throwable t) {
2024       LOG.warn("Unable to report fatal error to master", t);
2025     }
2026     stop(reason);
2027   }
2028 
2029   /**
2030    * @see HRegionServer#abort(String, Throwable)
2031    */
2032   public void abort(String reason) {
2033     abort(reason, null);
2034   }
2035 
2036   @Override
2037   public boolean isAborted() {
2038     return this.abortRequested;
2039   }
2040 
2041   /*
2042    * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
2043    * logs but it does close socket in case want to bring up server on old
2044    * hostname+port immediately.
2045    */
2046   protected void kill() {
2047     this.killed = true;
2048     abort("Simulated kill");
2049   }
2050 
2051   /**
2052    * Wait on all threads to finish. Presumption is that all closes and stops
2053    * have already been called.
2054    */
2055   protected void stopServiceThreads() {
2056     // clean up the scheduled chores
2057     if (this.choreService != null) choreService.shutdown();
2058     if (this.nonceManagerChore != null) nonceManagerChore.cancel(true);
2059     if (this.compactionChecker != null) compactionChecker.cancel(true);
2060     if (this.periodicFlusher != null) periodicFlusher.cancel(true);
2061     if (this.healthCheckChore != null) healthCheckChore.cancel(true);
2062     if (this.storefileRefresher != null) storefileRefresher.cancel(true);
2063     if (this.movedRegionsCleaner != null) movedRegionsCleaner.cancel(true);
2064 
2065     if (this.cacheFlusher != null) {
2066       this.cacheFlusher.join();
2067     }
2068 
2069     if (this.spanReceiverHost != null) {
2070       this.spanReceiverHost.closeReceivers();
2071     }
2072     if (this.walRoller != null) {
2073       Threads.shutdown(this.walRoller.getThread());
2074     }
2075     final LogRoller metawalRoller = this.metawalRoller.get();
2076     if (metawalRoller != null) {
2077       Threads.shutdown(metawalRoller.getThread());
2078     }
2079     if (this.compactSplitThread != null) {
2080       this.compactSplitThread.join();
2081     }
2082     if (this.service != null) this.service.shutdown();
2083     if (this.replicationSourceHandler != null &&
2084         this.replicationSourceHandler == this.replicationSinkHandler) {
2085       this.replicationSourceHandler.stopReplicationService();
2086     } else {
2087       if (this.replicationSourceHandler != null) {
2088         this.replicationSourceHandler.stopReplicationService();
2089       }
2090       if (this.replicationSinkHandler != null) {
2091         this.replicationSinkHandler.stopReplicationService();
2092       }
2093     }
2094   }
2095 
2096   /**
2097    * @return Return the object that implements the replication
2098    * source service.
2099    */
2100   ReplicationSourceService getReplicationSourceService() {
2101     return replicationSourceHandler;
2102   }
2103 
2104   /**
2105    * @return Return the object that implements the replication
2106    * sink service.
2107    */
2108   ReplicationSinkService getReplicationSinkService() {
2109     return replicationSinkHandler;
2110   }
2111 
2112   /**
2113    * Get the current master from ZooKeeper and open the RPC connection to it.
2114    *
2115    * Method will block until a master is available. You can break from this
2116    * block by requesting the server stop.
2117    *
2118    * @return master + port, or null if server has been stopped
2119    */
2120   private synchronized ServerName createRegionServerStatusStub() {
2121     if (rssStub != null) {
2122       return masterAddressTracker.getMasterAddress();
2123     }
2124     ServerName sn = null;
2125     long previousLogTime = 0;
2126     boolean refresh = false; // for the first time, use cached data
2127     RegionServerStatusService.BlockingInterface intf = null;
2128     boolean interrupted = false;
2129     try {
2130       while (keepLooping()) {
2131         sn = this.masterAddressTracker.getMasterAddress(refresh);
2132         if (sn == null) {
2133           if (!keepLooping()) {
2134             // give up with no connection.
2135             LOG.debug("No master found and cluster is stopped; bailing out");
2136             return null;
2137           }
2138           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2139             LOG.debug("No master found; retry");
2140             previousLogTime = System.currentTimeMillis();
2141           }
2142           refresh = true; // let's try pull it from ZK directly
2143           if (sleep(200)) {
2144             interrupted = true;
2145           }
2146           continue;
2147         }
2148 
2149         // If we are on the active master, use the shortcut
2150         if (this instanceof HMaster && sn.equals(getServerName())) {
2151           intf = ((HMaster)this).getMasterRpcServices();
2152           break;
2153         }
2154         try {
2155           BlockingRpcChannel channel =
2156             this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(),
2157               shortOperationTimeout);
2158           intf = RegionServerStatusService.newBlockingStub(channel);
2159           break;
2160         } catch (IOException e) {
2161           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2162             e = e instanceof RemoteException ?
2163               ((RemoteException)e).unwrapRemoteException() : e;
2164             if (e instanceof ServerNotRunningYetException) {
2165               LOG.info("Master isn't available yet, retrying");
2166             } else {
2167               LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2168             }
2169             previousLogTime = System.currentTimeMillis();
2170           }
2171           if (sleep(200)) {
2172             interrupted = true;
2173           }
2174         }
2175       }
2176     } finally {
2177       if (interrupted) {
2178         Thread.currentThread().interrupt();
2179       }
2180     }
2181     rssStub = intf;
2182     return sn;
2183   }
2184 
2185   /**
2186    * @return True if we should break loop because cluster is going down or
2187    * this server has been stopped or hdfs has gone bad.
2188    */
2189   private boolean keepLooping() {
2190     return !this.stopped && isClusterUp();
2191   }
2192 
2193   /*
2194    * Let the master know we're here Run initialization using parameters passed
2195    * us by the master.
2196    * @return A Map of key/value configurations we got from the Master else
2197    * null if we failed to register.
2198    * @throws IOException
2199    */
2200   private RegionServerStartupResponse reportForDuty() throws IOException {
2201     ServerName masterServerName = createRegionServerStatusStub();
2202     if (masterServerName == null) return null;
2203     RegionServerStartupResponse result = null;
2204     try {
2205       rpcServices.requestCount.set(0);
2206       LOG.info("reportForDuty to master=" + masterServerName + " with port="
2207         + rpcServices.isa.getPort() + ", startcode=" + this.startcode);
2208       long now = EnvironmentEdgeManager.currentTime();
2209       int port = rpcServices.isa.getPort();
2210       RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2211       request.setPort(port);
2212       request.setServerStartCode(this.startcode);
2213       request.setServerCurrentTime(now);
2214       result = this.rssStub.regionServerStartup(null, request.build());
2215     } catch (ServiceException se) {
2216       IOException ioe = ProtobufUtil.getRemoteException(se);
2217       if (ioe instanceof ClockOutOfSyncException) {
2218         LOG.fatal("Master rejected startup because clock is out of sync", ioe);
2219         // Re-throw IOE will cause RS to abort
2220         throw ioe;
2221       } else if (ioe instanceof ServerNotRunningYetException) {
2222         LOG.debug("Master is not running yet");
2223       } else {
2224         LOG.warn("error telling master we are up", se);
2225       }
2226     }
2227     return result;
2228   }
2229 
2230   @Override
2231   public long getLastSequenceId(byte[] encodedRegionName) {
2232     long lastFlushedSequenceId = -1L;
2233     try {
2234       GetLastFlushedSequenceIdRequest req = RequestConverter
2235           .buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2236       RegionServerStatusService.BlockingInterface rss = rssStub;
2237       if (rss == null) { // Try to connect one more time
2238         createRegionServerStatusStub();
2239         rss = rssStub;
2240         if (rss == null) {
2241           // Still no luck, we tried
2242           LOG.warn("Unable to connect to the master to check "
2243             + "the last flushed sequence id");
2244           return -1L;
2245         }
2246       }
2247       lastFlushedSequenceId = rss.getLastFlushedSequenceId(null, req)
2248           .getLastFlushedSequenceId();
2249     } catch (ServiceException e) {
2250       lastFlushedSequenceId = -1l;
2251       LOG.warn("Unable to connect to the master to check "
2252         + "the last flushed sequence id", e);
2253     }
2254     return lastFlushedSequenceId;
2255   }
2256 
2257   /**
2258    * Closes all regions.  Called on our way out.
2259    * Assumes that its not possible for new regions to be added to onlineRegions
2260    * while this method runs.
2261    */
2262   protected void closeAllRegions(final boolean abort) {
2263     closeUserRegions(abort);
2264     closeMetaTableRegions(abort);
2265   }
2266 
2267   /**
2268    * Close meta region if we carry it
2269    * @param abort Whether we're running an abort.
2270    */
2271   void closeMetaTableRegions(final boolean abort) {
2272     HRegion meta = null;
2273     this.lock.writeLock().lock();
2274     try {
2275       for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) {
2276         HRegionInfo hri = e.getValue().getRegionInfo();
2277         if (hri.isMetaRegion()) {
2278           meta = e.getValue();
2279         }
2280         if (meta != null) break;
2281       }
2282     } finally {
2283       this.lock.writeLock().unlock();
2284     }
2285     if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2286   }
2287 
2288   /**
2289    * Schedule closes on all user regions.
2290    * Should be safe calling multiple times because it wont' close regions
2291    * that are already closed or that are closing.
2292    * @param abort Whether we're running an abort.
2293    */
2294   void closeUserRegions(final boolean abort) {
2295     this.lock.writeLock().lock();
2296     try {
2297       for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
2298         HRegion r = e.getValue();
2299         if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) {
2300           // Don't update zk with this close transition; pass false.
2301           closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2302         }
2303       }
2304     } finally {
2305       this.lock.writeLock().unlock();
2306     }
2307   }
2308 
2309   /** @return the info server */
2310   public InfoServer getInfoServer() {
2311     return infoServer;
2312   }
2313 
2314   /**
2315    * @return true if a stop has been requested.
2316    */
2317   @Override
2318   public boolean isStopped() {
2319     return this.stopped;
2320   }
2321 
2322   @Override
2323   public boolean isStopping() {
2324     return this.stopping;
2325   }
2326 
2327   @Override
2328   public Map<String, HRegion> getRecoveringRegions() {
2329     return this.recoveringRegions;
2330   }
2331 
2332   /**
2333    *
2334    * @return the configuration
2335    */
2336   @Override
2337   public Configuration getConfiguration() {
2338     return conf;
2339   }
2340 
2341   /** @return the write lock for the server */
2342   ReentrantReadWriteLock.WriteLock getWriteLock() {
2343     return lock.writeLock();
2344   }
2345 
2346   public int getNumberOfOnlineRegions() {
2347     return this.onlineRegions.size();
2348   }
2349 
2350   boolean isOnlineRegionsEmpty() {
2351     return this.onlineRegions.isEmpty();
2352   }
2353 
2354   /**
2355    * For tests, web ui and metrics.
2356    * This method will only work if HRegionServer is in the same JVM as client;
2357    * HRegion cannot be serialized to cross an rpc.
2358    */
2359   public Collection<HRegion> getOnlineRegionsLocalContext() {
2360     Collection<HRegion> regions = this.onlineRegions.values();
2361     return Collections.unmodifiableCollection(regions);
2362   }
2363 
2364   @Override
2365   public void addToOnlineRegions(HRegion region) {
2366     this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2367     configurationManager.registerObserver(region);
2368   }
2369 
2370   /**
2371    * @return A new Map of online regions sorted by region size with the first entry being the
2372    * biggest.  If two regions are the same size, then the last one found wins; i.e. this method
2373    * may NOT return all regions.
2374    */
2375   SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() {
2376     // we'll sort the regions in reverse
2377     SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
2378         new Comparator<Long>() {
2379           @Override
2380           public int compare(Long a, Long b) {
2381             return -1 * a.compareTo(b);
2382           }
2383         });
2384     // Copy over all regions. Regions are sorted by size with biggest first.
2385     for (HRegion region : this.onlineRegions.values()) {
2386       sortedRegions.put(region.memstoreSize.get(), region);
2387     }
2388     return sortedRegions;
2389   }
2390 
2391   /**
2392    * @return time stamp in millis of when this region server was started
2393    */
2394   public long getStartcode() {
2395     return this.startcode;
2396   }
2397 
2398   /** @return reference to FlushRequester */
2399   @Override
2400   public FlushRequester getFlushRequester() {
2401     return this.cacheFlusher;
2402   }
2403 
2404   /**
2405    * Get the top N most loaded regions this server is serving so we can tell the
2406    * master which regions it can reallocate if we're overloaded. TODO: actually
2407    * calculate which regions are most loaded. (Right now, we're just grabbing
2408    * the first N regions being served regardless of load.)
2409    */
2410   protected HRegionInfo[] getMostLoadedRegions() {
2411     ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2412     for (HRegion r : onlineRegions.values()) {
2413       if (!r.isAvailable()) {
2414         continue;
2415       }
2416       if (regions.size() < numRegionsToReport) {
2417         regions.add(r.getRegionInfo());
2418       } else {
2419         break;
2420       }
2421     }
2422     return regions.toArray(new HRegionInfo[regions.size()]);
2423   }
2424 
2425   @Override
2426   public Leases getLeases() {
2427     return leases;
2428   }
2429 
2430   /**
2431    * @return Return the rootDir.
2432    */
2433   protected Path getRootDir() {
2434     return rootDir;
2435   }
2436 
2437   /**
2438    * @return Return the fs.
2439    */
2440   @Override
2441   public FileSystem getFileSystem() {
2442     return fs;
2443   }
2444 
2445   @Override
2446   public String toString() {
2447     return getServerName().toString();
2448   }
2449 
2450   /**
2451    * Interval at which threads should run
2452    *
2453    * @return the interval
2454    */
2455   public int getThreadWakeFrequency() {
2456     return threadWakeFrequency;
2457   }
2458 
2459   @Override
2460   public ZooKeeperWatcher getZooKeeper() {
2461     return zooKeeper;
2462   }
2463 
2464   @Override
2465   public BaseCoordinatedStateManager getCoordinatedStateManager() {
2466     return csm;
2467   }
2468 
2469   @Override
2470   public ServerName getServerName() {
2471     return serverName;
2472   }
2473 
2474   @Override
2475   public CompactionRequestor getCompactionRequester() {
2476     return this.compactSplitThread;
2477   }
2478 
2479   public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
2480     return this.rsHost;
2481   }
2482 
2483   @Override
2484   public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2485     return this.regionsInTransitionInRS;
2486   }
2487 
2488   @Override
2489   public ExecutorService getExecutorService() {
2490     return service;
2491   }
2492 
2493   @Override
2494   public ChoreService getChoreService() {
2495     return choreService;
2496   }
2497 
2498   @Override
2499   public RegionServerQuotaManager getRegionServerQuotaManager() {
2500     return rsQuotaManager;
2501   }
2502 
2503   //
2504   // Main program and support routines
2505   //
2506 
2507   /**
2508    * Load the replication service objects, if any
2509    */
2510   static private void createNewReplicationInstance(Configuration conf,
2511     HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
2512 
2513     // If replication is not enabled, then return immediately.
2514     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
2515         HConstants.REPLICATION_ENABLE_DEFAULT)) {
2516       return;
2517     }
2518 
2519     // read in the name of the source replication class from the config file.
2520     String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2521                                HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2522 
2523     // read in the name of the sink replication class from the config file.
2524     String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2525                              HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2526 
2527     // If both the sink and the source class names are the same, then instantiate
2528     // only one object.
2529     if (sourceClassname.equals(sinkClassname)) {
2530       server.replicationSourceHandler = (ReplicationSourceService)
2531                                          newReplicationInstance(sourceClassname,
2532                                          conf, server, fs, logDir, oldLogDir);
2533       server.replicationSinkHandler = (ReplicationSinkService)
2534                                          server.replicationSourceHandler;
2535     } else {
2536       server.replicationSourceHandler = (ReplicationSourceService)
2537                                          newReplicationInstance(sourceClassname,
2538                                          conf, server, fs, logDir, oldLogDir);
2539       server.replicationSinkHandler = (ReplicationSinkService)
2540                                          newReplicationInstance(sinkClassname,
2541                                          conf, server, fs, logDir, oldLogDir);
2542     }
2543   }
2544 
2545   static private ReplicationService newReplicationInstance(String classname,
2546     Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
2547     Path oldLogDir) throws IOException{
2548 
2549     Class<?> clazz = null;
2550     try {
2551       ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2552       clazz = Class.forName(classname, true, classLoader);
2553     } catch (java.lang.ClassNotFoundException nfe) {
2554       throw new IOException("Could not find class for " + classname);
2555     }
2556 
2557     // create an instance of the replication object.
2558     ReplicationService service = (ReplicationService)
2559                               ReflectionUtils.newInstance(clazz, conf);
2560     service.initialize(server, fs, logDir, oldLogDir);
2561     return service;
2562   }
2563 
2564   /**
2565    * Utility for constructing an instance of the passed HRegionServer class.
2566    *
2567    * @param regionServerClass
2568    * @param conf2
2569    * @return HRegionServer instance.
2570    */
2571   public static HRegionServer constructRegionServer(
2572       Class<? extends HRegionServer> regionServerClass,
2573       final Configuration conf2, CoordinatedStateManager cp) {
2574     try {
2575       Constructor<? extends HRegionServer> c = regionServerClass
2576           .getConstructor(Configuration.class, CoordinatedStateManager.class);
2577       return c.newInstance(conf2, cp);
2578     } catch (Exception e) {
2579       throw new RuntimeException("Failed construction of " + "Regionserver: "
2580           + regionServerClass.toString(), e);
2581     }
2582   }
2583 
2584   /**
2585    * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2586    */
2587   public static void main(String[] args) throws Exception {
2588     VersionInfo.logVersion();
2589     Configuration conf = HBaseConfiguration.create();
2590     @SuppressWarnings("unchecked")
2591     Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2592         .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2593 
2594     new HRegionServerCommandLine(regionServerClass).doMain(args);
2595   }
2596 
2597   /**
2598    * Gets the online regions of the specified table.
2599    * This method looks at the in-memory onlineRegions.  It does not go to <code>hbase:meta</code>.
2600    * Only returns <em>online</em> regions.  If a region on this table has been
2601    * closed during a disable, etc., it will not be included in the returned list.
2602    * So, the returned list may not necessarily be ALL regions in this table, its
2603    * all the ONLINE regions in the table.
2604    * @param tableName
2605    * @return Online regions from <code>tableName</code>
2606    */
2607   @Override
2608   public List<HRegion> getOnlineRegions(TableName tableName) {
2609      List<HRegion> tableRegions = new ArrayList<HRegion>();
2610      synchronized (this.onlineRegions) {
2611        for (HRegion region: this.onlineRegions.values()) {
2612          HRegionInfo regionInfo = region.getRegionInfo();
2613          if(regionInfo.getTable().equals(tableName)) {
2614            tableRegions.add(region);
2615          }
2616        }
2617      }
2618      return tableRegions;
2619    }
2620 
2621   /**
2622    * Gets the online tables in this RS.
2623    * This method looks at the in-memory onlineRegions.
2624    * @return all the online tables in this RS
2625    */
2626   @Override
2627   public Set<TableName> getOnlineTables() {
2628     Set<TableName> tables = new HashSet<TableName>();
2629     synchronized (this.onlineRegions) {
2630       for (HRegion region: this.onlineRegions.values()) {
2631         tables.add(region.getTableDesc().getTableName());
2632       }
2633     }
2634     return tables;
2635   }
2636 
2637   // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
2638   public String[] getRegionServerCoprocessors() {
2639     TreeSet<String> coprocessors = new TreeSet<String>();
2640     try {
2641       coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
2642     } catch (IOException exception) {
2643       LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
2644           "skipping.");
2645       LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2646     }
2647     Collection<HRegion> regions = getOnlineRegionsLocalContext();
2648     for (HRegion region: regions) {
2649       coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2650       try {
2651         coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
2652       } catch (IOException exception) {
2653         LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
2654             "; skipping.");
2655         LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2656       }
2657     }
2658     return coprocessors.toArray(new String[coprocessors.size()]);
2659   }
2660 
2661   /**
2662    * Try to close the region, logs a warning on failure but continues.
2663    * @param region Region to close
2664    */
2665   private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
2666     try {
2667       if (!closeRegion(region.getEncodedName(), abort, null)) {
2668         LOG.warn("Failed to close " + region.getRegionNameAsString() +
2669             " - ignoring and continuing");
2670       }
2671     } catch (IOException e) {
2672       LOG.warn("Failed to close " + region.getRegionNameAsString() +
2673           " - ignoring and continuing", e);
2674     }
2675   }
2676 
2677   /**
2678    * Close asynchronously a region, can be called from the master or internally by the regionserver
2679    * when stopping. If called from the master, the region will update the znode status.
2680    *
2681    * <p>
2682    * If an opening was in progress, this method will cancel it, but will not start a new close. The
2683    * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
2684    * </p>
2685 
2686    * <p>
2687    *   If a close was in progress, this new request will be ignored, and an exception thrown.
2688    * </p>
2689    *
2690    * @param encodedName Region to close
2691    * @param abort True if we are aborting
2692    * @return True if closed a region.
2693    * @throws NotServingRegionException if the region is not online
2694    */
2695   protected boolean closeRegion(String encodedName, final boolean abort, final ServerName sn)
2696       throws NotServingRegionException {
2697     //Check for permissions to close.
2698     HRegion actualRegion = this.getFromOnlineRegions(encodedName);
2699     if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2700       try {
2701         actualRegion.getCoprocessorHost().preClose(false);
2702       } catch (IOException exp) {
2703         LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2704         return false;
2705       }
2706     }
2707 
2708     final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(encodedName.getBytes(),
2709         Boolean.FALSE);
2710 
2711     if (Boolean.TRUE.equals(previous)) {
2712       LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
2713           "trying to OPEN. Cancelling OPENING.");
2714       if (!regionsInTransitionInRS.replace(encodedName.getBytes(), previous, Boolean.FALSE)){
2715         // The replace failed. That should be an exceptional case, but theoretically it can happen.
2716         // We're going to try to do a standard close then.
2717         LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
2718             " Doing a standard close now");
2719         return closeRegion(encodedName, abort, sn);
2720       }
2721       // Let's get the region from the online region list again
2722       actualRegion = this.getFromOnlineRegions(encodedName);
2723       if (actualRegion == null) { // If already online, we still need to close it.
2724         LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
2725         // The master deletes the znode when it receives this exception.
2726         throw new NotServingRegionException("The region " + encodedName +
2727           " was opening but not yet served. Opening is cancelled.");
2728       }
2729     } else if (Boolean.FALSE.equals(previous)) {
2730       LOG.info("Received CLOSE for the region: " + encodedName +
2731         ", which we are already trying to CLOSE, but not completed yet");
2732       return true;
2733     }
2734 
2735     if (actualRegion == null) {
2736       LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
2737       this.regionsInTransitionInRS.remove(encodedName.getBytes());
2738       // The master deletes the znode when it receives this exception.
2739       throw new NotServingRegionException("The region " + encodedName +
2740           " is not online, and is not opening.");
2741     }
2742 
2743     CloseRegionHandler crh;
2744     final HRegionInfo hri = actualRegion.getRegionInfo();
2745     if (hri.isMetaRegion()) {
2746       crh = new CloseMetaHandler(this, this, hri, abort);
2747     } else {
2748       crh = new CloseRegionHandler(this, this, hri, abort, sn);
2749     }
2750     this.service.submit(crh);
2751     return true;
2752   }
2753 
2754    /**
2755    * @param regionName
2756    * @return HRegion for the passed binary <code>regionName</code> or null if
2757    *         named region is not member of the online regions.
2758    */
2759   public HRegion getOnlineRegion(final byte[] regionName) {
2760     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2761     return this.onlineRegions.get(encodedRegionName);
2762   }
2763 
2764   public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) {
2765     return this.regionFavoredNodesMap.get(encodedRegionName);
2766   }
2767 
2768   @Override
2769   public HRegion getFromOnlineRegions(final String encodedRegionName) {
2770     return this.onlineRegions.get(encodedRegionName);
2771   }
2772 
2773 
2774   @Override
2775   public boolean removeFromOnlineRegions(final HRegion r, ServerName destination) {
2776     HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
2777 
2778     if (destination != null) {
2779       try {
2780         WAL wal = getWAL(r.getRegionInfo());
2781         long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes());
2782         if (closeSeqNum == HConstants.NO_SEQNUM) {
2783           // No edits in WAL for this region; get the sequence number when the region was opened.
2784           closeSeqNum = r.getOpenSeqNum();
2785           if (closeSeqNum == HConstants.NO_SEQNUM) {
2786             closeSeqNum = 0;
2787           }
2788         }
2789         addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
2790       } catch (IOException exception) {
2791         LOG.error("Could not retrieve WAL information for region " + r.getRegionInfo() +
2792             "; not adding to moved regions.");
2793         LOG.debug("Exception details for failure to get wal", exception);
2794       }
2795     }
2796     this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
2797     return toReturn != null;
2798   }
2799 
2800   /**
2801    * Protected utility method for safely obtaining an HRegion handle.
2802    *
2803    * @param regionName
2804    *          Name of online {@link HRegion} to return
2805    * @return {@link HRegion} for <code>regionName</code>
2806    * @throws NotServingRegionException
2807    */
2808   protected HRegion getRegion(final byte[] regionName)
2809       throws NotServingRegionException {
2810     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2811     return getRegionByEncodedName(regionName, encodedRegionName);
2812   }
2813 
2814   public HRegion getRegionByEncodedName(String encodedRegionName)
2815       throws NotServingRegionException {
2816     return getRegionByEncodedName(null, encodedRegionName);
2817   }
2818 
2819   protected HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
2820     throws NotServingRegionException {
2821     HRegion region = this.onlineRegions.get(encodedRegionName);
2822     if (region == null) {
2823       MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
2824       if (moveInfo != null) {
2825         throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
2826       }
2827       Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
2828       String regionNameStr = regionName == null?
2829         encodedRegionName: Bytes.toStringBinary(regionName);
2830       if (isOpening != null && isOpening.booleanValue()) {
2831         throw new RegionOpeningException("Region " + regionNameStr +
2832           " is opening on " + this.serverName);
2833       }
2834       throw new NotServingRegionException("Region " + regionNameStr +
2835         " is not online on " + this.serverName);
2836     }
2837     return region;
2838   }
2839 
2840   /*
2841    * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
2842    * IOE if it isn't already.
2843    *
2844    * @param t Throwable
2845    *
2846    * @param msg Message to log in error. Can be null.
2847    *
2848    * @return Throwable converted to an IOE; methods can only let out IOEs.
2849    */
2850   private Throwable cleanup(final Throwable t, final String msg) {
2851     // Don't log as error if NSRE; NSRE is 'normal' operation.
2852     if (t instanceof NotServingRegionException) {
2853       LOG.debug("NotServingRegionException; " + t.getMessage());
2854       return t;
2855     }
2856     Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
2857     if (msg == null) {
2858       LOG.error("", e);
2859     } else {
2860       LOG.error(msg, e);
2861     }
2862     if (!rpcServices.checkOOME(t)) {
2863       checkFileSystem();
2864     }
2865     return t;
2866   }
2867 
2868   /*
2869    * @param t
2870    *
2871    * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
2872    *
2873    * @return Make <code>t</code> an IOE if it isn't already.
2874    */
2875   protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
2876     return (t instanceof IOException ? (IOException) t : msg == null
2877         || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
2878   }
2879 
2880   /**
2881    * Checks to see if the file system is still accessible. If not, sets
2882    * abortRequested and stopRequested
2883    *
2884    * @return false if file system is not available
2885    */
2886   public boolean checkFileSystem() {
2887     if (this.fsOk && this.fs != null) {
2888       try {
2889         FSUtils.checkFileSystemAvailable(this.fs);
2890       } catch (IOException e) {
2891         abort("File System not available", e);
2892         this.fsOk = false;
2893       }
2894     }
2895     return this.fsOk;
2896   }
2897 
2898   @Override
2899   public void updateRegionFavoredNodesMapping(String encodedRegionName,
2900       List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
2901     InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
2902     // Refer to the comment on the declaration of regionFavoredNodesMap on why
2903     // it is a map of region name to InetSocketAddress[]
2904     for (int i = 0; i < favoredNodes.size(); i++) {
2905       addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
2906           favoredNodes.get(i).getPort());
2907     }
2908     regionFavoredNodesMap.put(encodedRegionName, addr);
2909   }
2910 
2911   /**
2912    * Return the favored nodes for a region given its encoded name. Look at the
2913    * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[]
2914    * @param encodedRegionName
2915    * @return array of favored locations
2916    */
2917   @Override
2918   public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
2919     return regionFavoredNodesMap.get(encodedRegionName);
2920   }
2921 
2922   @Override
2923   public ServerNonceManager getNonceManager() {
2924     return this.nonceManager;
2925   }
2926 
2927   private static class MovedRegionInfo {
2928     private final ServerName serverName;
2929     private final long seqNum;
2930     private final long ts;
2931 
2932     public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
2933       this.serverName = serverName;
2934       this.seqNum = closeSeqNum;
2935       ts = EnvironmentEdgeManager.currentTime();
2936      }
2937 
2938     public ServerName getServerName() {
2939       return serverName;
2940     }
2941 
2942     public long getSeqNum() {
2943       return seqNum;
2944     }
2945 
2946     public long getMoveTime() {
2947       return ts;
2948     }
2949   }
2950 
2951   // This map will contains all the regions that we closed for a move.
2952   //  We add the time it was moved as we don't want to keep too old information
2953   protected Map<String, MovedRegionInfo> movedRegions =
2954       new ConcurrentHashMap<String, MovedRegionInfo>(3000);
2955 
2956   // We need a timeout. If not there is a risk of giving a wrong information: this would double
2957   //  the number of network calls instead of reducing them.
2958   private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
2959 
2960   protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
2961     if (ServerName.isSameHostnameAndPort(destination, this.getServerName())) {
2962       LOG.warn("Not adding moved region record: " + encodedName + " to self.");
2963       return;
2964     }
2965     LOG.info("Adding moved region record: "
2966       + encodedName + " to " + destination + " as of " + closeSeqNum);
2967     movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
2968   }
2969 
2970   void removeFromMovedRegions(String encodedName) {
2971     movedRegions.remove(encodedName);
2972   }
2973 
2974   private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
2975     MovedRegionInfo dest = movedRegions.get(encodedRegionName);
2976 
2977     long now = EnvironmentEdgeManager.currentTime();
2978     if (dest != null) {
2979       if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
2980         return dest;
2981       } else {
2982         movedRegions.remove(encodedRegionName);
2983       }
2984     }
2985 
2986     return null;
2987   }
2988 
2989   /**
2990    * Remove the expired entries from the moved regions list.
2991    */
2992   protected void cleanMovedRegions() {
2993     final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
2994     Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();
2995 
2996     while (it.hasNext()){
2997       Map.Entry<String, MovedRegionInfo> e = it.next();
2998       if (e.getValue().getMoveTime() < cutOff) {
2999         it.remove();
3000       }
3001     }
3002   }
3003 
3004   /*
3005    * Use this to allow tests to override and schedule more frequently.
3006    */
3007 
3008   protected int movedRegionCleanerPeriod() {
3009         return TIMEOUT_REGION_MOVED;
3010   }
3011 
3012   /**
3013    * Creates a Chore thread to clean the moved region cache.
3014    */
3015 
3016   protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable {
3017     private HRegionServer regionServer;
3018     Stoppable stoppable;
3019 
3020     private MovedRegionsCleaner(
3021       HRegionServer regionServer, Stoppable stoppable){
3022       super("MovedRegionsCleaner for region " + regionServer, stoppable,
3023           regionServer.movedRegionCleanerPeriod());
3024       this.regionServer = regionServer;
3025       this.stoppable = stoppable;
3026     }
3027 
3028     static MovedRegionsCleaner create(HRegionServer rs){
3029       Stoppable stoppable = new Stoppable() {
3030         private volatile boolean isStopped = false;
3031         @Override public void stop(String why) { isStopped = true;}
3032         @Override public boolean isStopped() {return isStopped;}
3033       };
3034 
3035       return new MovedRegionsCleaner(rs, stoppable);
3036     }
3037 
3038     @Override
3039     protected void chore() {
3040       regionServer.cleanMovedRegions();
3041     }
3042 
3043     @Override
3044     public void stop(String why) {
3045       stoppable.stop(why);
3046     }
3047 
3048     @Override
3049     public boolean isStopped() {
3050       return stoppable.isStopped();
3051     }
3052   }
3053 
3054   private String getMyEphemeralNodePath() {
3055     return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
3056   }
3057 
3058   private boolean isHealthCheckerConfigured() {
3059     String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3060     return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
3061   }
3062 
3063   /**
3064    * @return the underlying {@link CompactSplitThread} for the servers
3065    */
3066   public CompactSplitThread getCompactSplitThread() {
3067     return this.compactSplitThread;
3068   }
3069 
3070   /**
3071    * A helper function to store the last flushed sequence Id with the previous failed RS for a
3072    * recovering region. The Id is used to skip wal edits which are flushed. Since the flushed
3073    * sequence id is only valid for each RS, we associate the Id with corresponding failed RS.
3074    * @throws KeeperException
3075    * @throws IOException
3076    */
3077   private void updateRecoveringRegionLastFlushedSequenceId(HRegion r) throws KeeperException,
3078       IOException {
3079     if (!r.isRecovering()) {
3080       // return immdiately for non-recovering regions
3081       return;
3082     }
3083 
3084     HRegionInfo region = r.getRegionInfo();
3085     ZooKeeperWatcher zkw = getZooKeeper();
3086     String previousRSName = this.getLastFailedRSFromZK(region.getEncodedName());
3087     Map<byte[], Long> maxSeqIdInStores = r.getMaxStoreSeqIdForLogReplay();
3088     long minSeqIdForLogReplay = -1;
3089     for (Long storeSeqIdForReplay : maxSeqIdInStores.values()) {
3090       if (minSeqIdForLogReplay == -1 || storeSeqIdForReplay < minSeqIdForLogReplay) {
3091         minSeqIdForLogReplay = storeSeqIdForReplay;
3092       }
3093     }
3094 
3095     try {
3096       long lastRecordedFlushedSequenceId = -1;
3097       String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode,
3098         region.getEncodedName());
3099       // recovering-region level
3100       byte[] data;
3101       try {
3102         data = ZKUtil.getData(zkw, nodePath);
3103       } catch (InterruptedException e) {
3104         throw new InterruptedIOException();
3105       }
3106       if (data != null) {
3107       lastRecordedFlushedSequenceId = ZKSplitLog.parseLastFlushedSequenceIdFrom(data);
3108       }
3109       if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
3110         ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
3111       }
3112       if (previousRSName != null) {
3113         // one level deeper for the failed RS
3114         nodePath = ZKUtil.joinZNode(nodePath, previousRSName);
3115         ZKUtil.setData(zkw, nodePath,
3116           ZKUtil.regionSequenceIdsToByteArray(minSeqIdForLogReplay, maxSeqIdInStores));
3117         LOG.debug("Update last flushed sequence id of region " + region.getEncodedName() + " for "
3118             + previousRSName);
3119       } else {
3120         LOG.warn("Can't find failed region server for recovering region " +
3121           region.getEncodedName());
3122       }
3123     } catch (NoNodeException ignore) {
3124       LOG.debug("Region " + region.getEncodedName() +
3125         " must have completed recovery because its recovery znode has been removed", ignore);
3126     }
3127   }
3128 
3129   /**
3130    * Return the last failed RS name under /hbase/recovering-regions/encodedRegionName
3131    * @param encodedRegionName
3132    * @throws KeeperException
3133    */
3134   private String getLastFailedRSFromZK(String encodedRegionName) throws KeeperException {
3135     String result = null;
3136     long maxZxid = 0;
3137     ZooKeeperWatcher zkw = this.getZooKeeper();
3138     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
3139     List<String> failedServers = ZKUtil.listChildrenNoWatch(zkw, nodePath);
3140     if (failedServers == null || failedServers.isEmpty()) {
3141       return result;
3142     }
3143     for (String failedServer : failedServers) {
3144       String rsPath = ZKUtil.joinZNode(nodePath, failedServer);
3145       Stat stat = new Stat();
3146       ZKUtil.getDataNoWatch(zkw, rsPath, stat);
3147       if (maxZxid < stat.getCzxid()) {
3148         maxZxid = stat.getCzxid();
3149         result = failedServer;
3150       }
3151     }
3152     return result;
3153   }
3154 
3155   public CoprocessorServiceResponse execRegionServerService(final RpcController controller,
3156       final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3157     try {
3158       ServerRpcController execController = new ServerRpcController();
3159       CoprocessorServiceCall call = serviceRequest.getCall();
3160       String serviceName = call.getServiceName();
3161       String methodName = call.getMethodName();
3162       if (!coprocessorServiceHandlers.containsKey(serviceName)) {
3163         throw new UnknownProtocolException(null,
3164             "No registered coprocessor service found for name " + serviceName);
3165       }
3166       Service service = coprocessorServiceHandlers.get(serviceName);
3167       Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
3168       Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3169       if (methodDesc == null) {
3170         throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName
3171             + " called on service " + serviceName);
3172       }
3173       Message request =
3174           service.getRequestPrototype(methodDesc).newBuilderForType().mergeFrom(call.getRequest())
3175               .build();
3176       final Message.Builder responseBuilder =
3177           service.getResponsePrototype(methodDesc).newBuilderForType();
3178       service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
3179         @Override
3180         public void run(Message message) {
3181           if (message != null) {
3182             responseBuilder.mergeFrom(message);
3183           }
3184         }
3185       });
3186       Message execResult = responseBuilder.build();
3187       if (execController.getFailedOn() != null) {
3188         throw execController.getFailedOn();
3189       }
3190       ClientProtos.CoprocessorServiceResponse.Builder builder =
3191           ClientProtos.CoprocessorServiceResponse.newBuilder();
3192       builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
3193         HConstants.EMPTY_BYTE_ARRAY));
3194       builder.setValue(builder.getValueBuilder().setName(execResult.getClass().getName())
3195           .setValue(execResult.toByteString()));
3196       return builder.build();
3197     } catch (IOException ie) {
3198       throw new ServiceException(ie);
3199     }
3200   }
3201 
3202   /**
3203    * @return The cache config instance used by the regionserver.
3204    */
3205   public CacheConfig getCacheConfig() {
3206     return this.cacheConfig;
3207   }
3208 
3209   /**
3210    * @return : Returns the ConfigurationManager object for testing purposes.
3211    */
3212   protected ConfigurationManager getConfigurationManager() {
3213     return configurationManager;
3214   }
3215 
3216   /**
3217    * Reload the configuration from disk.
3218    */
3219   public void updateConfiguration() {
3220     LOG.info("Reloading the configuration from disk.");
3221     // Reload the configuration from disk.
3222     conf.reloadConfiguration();
3223     configurationManager.notifyAllObservers(conf);
3224   }
3225 
3226   @Override
3227   public HeapMemoryManager getHeapMemoryManager() {
3228     return hMemManager;
3229   }
3230 
3231   @Override
3232   public double getCompactionPressure() {
3233     double max = 0;
3234     for (HRegion region : onlineRegions.values()) {
3235       for (Store store : region.getStores().values()) {
3236         double normCount = store.getCompactionPressure();
3237         if (normCount > max) {
3238           max = normCount;
3239         }
3240       }
3241     }
3242     return max;
3243   }
3244 }