View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.lang.Thread.UncaughtExceptionHandler;
24  import java.lang.management.ManagementFactory;
25  import java.lang.management.MemoryUsage;
26  import java.lang.reflect.Constructor;
27  import java.net.BindException;
28  import java.net.InetAddress;
29  import java.net.InetSocketAddress;
30  import java.util.ArrayList;
31  import java.util.Collection;
32  import java.util.Collections;
33  import java.util.Comparator;
34  import java.util.HashMap;
35  import java.util.HashSet;
36  import java.util.Iterator;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.Map.Entry;
40  import java.util.Set;
41  import java.util.SortedMap;
42  import java.util.TreeMap;
43  import java.util.TreeSet;
44  import java.util.concurrent.ConcurrentHashMap;
45  import java.util.concurrent.ConcurrentMap;
46  import java.util.concurrent.ConcurrentSkipListMap;
47  import java.util.concurrent.atomic.AtomicBoolean;
48  import java.util.concurrent.atomic.AtomicReference;
49  import java.util.concurrent.locks.ReentrantReadWriteLock;
50  
51  import javax.management.MalformedObjectNameException;
52  import javax.management.ObjectName;
53  import javax.servlet.http.HttpServlet;
54  
55  import org.apache.commons.lang.math.RandomUtils;
56  import org.apache.commons.logging.Log;
57  import org.apache.commons.logging.LogFactory;
58  import org.apache.hadoop.conf.Configuration;
59  import org.apache.hadoop.fs.FileSystem;
60  import org.apache.hadoop.fs.Path;
61  import org.apache.hadoop.hbase.ChoreService;
62  import org.apache.hadoop.hbase.ClockOutOfSyncException;
63  import org.apache.hadoop.hbase.CoordinatedStateManager;
64  import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
65  import org.apache.hadoop.hbase.HBaseConfiguration;
66  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
67  import org.apache.hadoop.hbase.HConstants;
68  import org.apache.hadoop.hbase.HRegionInfo;
69  import org.apache.hadoop.hbase.HealthCheckChore;
70  import org.apache.hadoop.hbase.MetaTableAccessor;
71  import org.apache.hadoop.hbase.NotServingRegionException;
72  import org.apache.hadoop.hbase.RemoteExceptionHandler;
73  import org.apache.hadoop.hbase.ScheduledChore;
74  import org.apache.hadoop.hbase.ServerName;
75  import org.apache.hadoop.hbase.Stoppable;
76  import org.apache.hadoop.hbase.TableDescriptors;
77  import org.apache.hadoop.hbase.TableName;
78  import org.apache.hadoop.hbase.YouAreDeadException;
79  import org.apache.hadoop.hbase.ZNodeClearer;
80  import org.apache.hadoop.hbase.classification.InterfaceAudience;
81  import org.apache.hadoop.hbase.client.ClusterConnection;
82  import org.apache.hadoop.hbase.client.ConnectionUtils;
83  import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
84  import org.apache.hadoop.hbase.conf.ConfigurationManager;
85  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
86  import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
87  import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
88  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
89  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
90  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
91  import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
92  import org.apache.hadoop.hbase.executor.ExecutorService;
93  import org.apache.hadoop.hbase.executor.ExecutorType;
94  import org.apache.hadoop.hbase.fs.HFileSystem;
95  import org.apache.hadoop.hbase.http.InfoServer;
96  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
97  import org.apache.hadoop.hbase.ipc.RpcClient;
98  import org.apache.hadoop.hbase.ipc.RpcClientFactory;
99  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
100 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
101 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
102 import org.apache.hadoop.hbase.ipc.ServerRpcController;
103 import org.apache.hadoop.hbase.master.HMaster;
104 import org.apache.hadoop.hbase.master.RegionState.State;
105 import org.apache.hadoop.hbase.master.TableLockManager;
106 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
107 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
108 import org.apache.hadoop.hbase.protobuf.RequestConverter;
109 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
110 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
111 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
112 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
113 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
114 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
115 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
116 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
117 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor.Builder;
118 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
119 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
120 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
121 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
122 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
123 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
124 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
125 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
126 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
127 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
128 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
129 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
130 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
131 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
132 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
133 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
134 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
135 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
136 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
137 import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
138 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
139 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
140 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
141 import org.apache.hadoop.hbase.security.Superusers;
142 import org.apache.hadoop.hbase.security.UserProvider;
143 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
144 import org.apache.hadoop.hbase.util.Addressing;
145 import org.apache.hadoop.hbase.util.ByteStringer;
146 import org.apache.hadoop.hbase.util.Bytes;
147 import org.apache.hadoop.hbase.util.CompressionTest;
148 import org.apache.hadoop.hbase.util.ConfigUtil;
149 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
150 import org.apache.hadoop.hbase.util.FSTableDescriptors;
151 import org.apache.hadoop.hbase.util.FSUtils;
152 import org.apache.hadoop.hbase.util.HasThread;
153 import org.apache.hadoop.hbase.util.JSONBean;
154 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
155 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
156 import org.apache.hadoop.hbase.util.Sleeper;
157 import org.apache.hadoop.hbase.util.Threads;
158 import org.apache.hadoop.hbase.util.VersionInfo;
159 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
160 import org.apache.hadoop.hbase.wal.WAL;
161 import org.apache.hadoop.hbase.wal.WALFactory;
162 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
163 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
164 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
165 import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
166 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
167 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
168 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
169 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
170 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
171 import org.apache.hadoop.ipc.RemoteException;
172 import org.apache.hadoop.metrics.util.MBeanUtil;
173 import org.apache.hadoop.util.ReflectionUtils;
174 import org.apache.hadoop.util.StringUtils;
175 import org.apache.zookeeper.KeeperException;
176 import org.apache.zookeeper.KeeperException.NoNodeException;
177 import org.apache.zookeeper.data.Stat;
178 
179 import com.google.common.annotations.VisibleForTesting;
180 import com.google.common.base.Preconditions;
181 import com.google.common.collect.Maps;
182 import com.google.protobuf.BlockingRpcChannel;
183 import com.google.protobuf.Descriptors;
184 import com.google.protobuf.Message;
185 import com.google.protobuf.RpcCallback;
186 import com.google.protobuf.RpcController;
187 import com.google.protobuf.Service;
188 import com.google.protobuf.ServiceException;
189 
190 /**
191  * HRegionServer makes a set of HRegions available to clients. It checks in with
192  * the HMaster. There are many HRegionServers in a single HBase deployment.
193  */
194 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
195 @SuppressWarnings("deprecation")
196 public class HRegionServer extends HasThread implements
197     RegionServerServices, LastSequenceId {
198 
199   public static final Log LOG = LogFactory.getLog(HRegionServer.class);
200 
201   /*
202    * Strings to be used in forming the exception message for
203    * RegionsAlreadyInTransitionException.
204    */
205   protected static final String OPEN = "OPEN";
206   protected static final String CLOSE = "CLOSE";
207 
208   //RegionName vs current action in progress
209   //true - if open region action in progress
210   //false - if close region action in progress
211   protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
212     new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
213 
214   // Cache flushing
215   protected MemStoreFlusher cacheFlusher;
216 
217   protected HeapMemoryManager hMemManager;
218 
219   /**
220    * Cluster connection to be shared by services.
221    * Initialized at server startup and closed when server shuts down.
222    * Clients must never close it explicitly.
223    */
224   protected ClusterConnection clusterConnection;
225 
226   /*
227    * Long-living meta table locator, which is created when the server is started and stopped
228    * when server shuts down. References to this locator shall be used to perform according
229    * operations in EventHandlers. Primary reason for this decision is to make it mockable
230    * for tests.
231    */
232   protected MetaTableLocator metaTableLocator;
233 
234   // Watch if a region is out of recovering state from ZooKeeper
235   @SuppressWarnings("unused")
236   private RecoveringRegionWatcher recoveringRegionWatcher;
237 
238   /**
239    * Go here to get table descriptors.
240    */
241   protected TableDescriptors tableDescriptors;
242 
243   // Replication services. If no replication, this handler will be null.
244   protected ReplicationSourceService replicationSourceHandler;
245   protected ReplicationSinkService replicationSinkHandler;
246 
247   // Compactions
248   public CompactSplitThread compactSplitThread;
249 
250   /**
251    * Map of regions currently being served by this region server. Key is the
252    * encoded region name.  All access should be synchronized.
253    */
254   protected final Map<String, Region> onlineRegions = new ConcurrentHashMap<String, Region>();
255 
256   /**
257    * Map of encoded region names to the DataNode locations they should be hosted on
258    * We store the value as InetSocketAddress since this is used only in HDFS
259    * API (create() that takes favored nodes as hints for placing file blocks).
260    * We could have used ServerName here as the value class, but we'd need to
261    * convert it to InetSocketAddress at some point before the HDFS API call, and
262    * it seems a bit weird to store ServerName since ServerName refers to RegionServers
263    * and here we really mean DataNode locations.
264    */
265   protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
266       new ConcurrentHashMap<String, InetSocketAddress[]>();
267 
268   /**
269    * Set of regions currently being in recovering state which means it can accept writes(edits from
270    * previous failed region server) but not reads. A recovering region is also an online region.
271    */
272   protected final Map<String, Region> recoveringRegions = Collections
273       .synchronizedMap(new HashMap<String, Region>());
274 
275   // Leases
276   protected Leases leases;
277 
278   // Instance of the hbase executor service.
279   protected ExecutorService service;
280 
281   // If false, the file system has become unavailable
282   protected volatile boolean fsOk;
283   protected HFileSystem fs;
284 
285   // Set when a report to the master comes back with a message asking us to
286   // shutdown. Also set by call to stop when debugging or running unit tests
287   // of HRegionServer in isolation.
288   private volatile boolean stopped = false;
289 
290   // Go down hard. Used if file system becomes unavailable and also in
291   // debugging and unit tests.
292   private volatile boolean abortRequested;
293 
294   ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
295 
296   // A state before we go into stopped state.  At this stage we're closing user
297   // space regions.
298   private boolean stopping = false;
299 
300   private volatile boolean killed = false;
301 
302   protected final Configuration conf;
303 
304   private Path rootDir;
305 
306   protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
307 
308   final int numRetries;
309   protected final int threadWakeFrequency;
310   protected final int msgInterval;
311 
312   protected final int numRegionsToReport;
313 
314   // Stub to do region server status calls against the master.
315   private volatile RegionServerStatusService.BlockingInterface rssStub;
316   // RPC client. Used to make the stub above that does region server status checking.
317   RpcClient rpcClient;
318 
319   private RpcRetryingCallerFactory rpcRetryingCallerFactory;
320   private RpcControllerFactory rpcControllerFactory;
321 
322   private UncaughtExceptionHandler uncaughtExceptionHandler;
323 
324   // Info server. Default access so can be used by unit tests. REGIONSERVER
325   // is name of the webapp and the attribute name used stuffing this instance
326   // into web context.
327   protected InfoServer infoServer;
328   private JvmPauseMonitor pauseMonitor;
329 
330   /** region server process name */
331   public static final String REGIONSERVER = "regionserver";
332 
333   MetricsRegionServer metricsRegionServer;
334   private SpanReceiverHost spanReceiverHost;
335 
336   /**
337    * ChoreService used to schedule tasks that we want to run periodically
338    */
339   private final ChoreService choreService;
340 
341   /*
342    * Check for compactions requests.
343    */
344   ScheduledChore compactionChecker;
345 
346   /*
347    * Check for flushes
348    */
349   ScheduledChore periodicFlusher;
350 
351   protected volatile WALFactory walFactory;
352 
353   // WAL roller. log is protected rather than private to avoid
354   // eclipse warning when accessed by inner classes
355   final LogRoller walRoller;
356   // Lazily initialized if this RegionServer hosts a meta table.
357   final AtomicReference<LogRoller> metawalRoller = new AtomicReference<LogRoller>();
358 
359   // flag set after we're done setting up server threads
360   final AtomicBoolean online = new AtomicBoolean(false);
361 
362   // zookeeper connection and watcher
363   protected ZooKeeperWatcher zooKeeper;
364 
365   // master address tracker
366   private MasterAddressTracker masterAddressTracker;
367 
368   // Cluster Status Tracker
369   protected ClusterStatusTracker clusterStatusTracker;
370 
371   // Log Splitting Worker
372   private SplitLogWorker splitLogWorker;
373 
374   // A sleeper that sleeps for msgInterval.
375   protected final Sleeper sleeper;
376 
377   private final int operationTimeout;
378   private final int shortOperationTimeout;
379 
380   private final RegionServerAccounting regionServerAccounting;
381 
382   // Cache configuration and block cache reference
383   protected CacheConfig cacheConfig;
384 
385   /** The health check chore. */
386   private HealthCheckChore healthCheckChore;
387 
388   /** The nonce manager chore. */
389   private ScheduledChore nonceManagerChore;
390 
391   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
392 
393   /**
394    * The server name the Master sees us as.  Its made from the hostname the
395    * master passes us, port, and server startcode. Gets set after registration
396    * against  Master.
397    */
398   protected ServerName serverName;
399 
400   /*
401    * hostname specified by hostname config
402    */
403   private String useThisHostnameInstead;
404 
405   // key to the config parameter of server hostname
406   // the specification of server hostname is optional. The hostname should be resolvable from
407   // both master and region server
408   final static String RS_HOSTNAME_KEY = "hbase.regionserver.hostname";
409 
410   final static String MASTER_HOSTNAME_KEY = "hbase.master.hostname";
411 
412   /**
413    * This servers startcode.
414    */
415   protected final long startcode;
416 
417   /**
418    * Unique identifier for the cluster we are a part of.
419    */
420   private String clusterId;
421 
422   /**
423    * MX Bean for RegionServerInfo
424    */
425   private ObjectName mxBean = null;
426 
427   /**
428    * Chore to clean periodically the moved region list
429    */
430   private MovedRegionsCleaner movedRegionsCleaner;
431 
432   // chore for refreshing store files for secondary regions
433   private StorefileRefresherChore storefileRefresher;
434 
435   private RegionServerCoprocessorHost rsHost;
436 
437   private RegionServerProcedureManagerHost rspmHost;
438   
439   private RegionServerQuotaManager rsQuotaManager;
440 
441   // Table level lock manager for locking for region operations
442   protected TableLockManager tableLockManager;
443 
444   /**
445    * Nonce manager. Nonces are used to make operations like increment and append idempotent
446    * in the case where client doesn't receive the response from a successful operation and
447    * retries. We track the successful ops for some time via a nonce sent by client and handle
448    * duplicate operations (currently, by failing them; in future we might use MVCC to return
449    * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
450    * HBASE-3787) are:
451    * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
452    *   of past records. If we don't read the records, we don't read and recover the nonces.
453    *   Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
454    * - There's no WAL recovery during normal region move, so nonces will not be transfered.
455    * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
456    * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
457    * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
458    * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
459    * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
460    * latest nonce in it expired. It can also be recovered during move.
461    */
462   final ServerNonceManager nonceManager;
463 
464   private UserProvider userProvider;
465 
466   protected final RSRpcServices rpcServices;
467 
468   protected BaseCoordinatedStateManager csm;
469 
470   private final boolean useZKForAssignment;
471 
472   /**
473    * Configuration manager is used to register/deregister and notify the configuration observers
474    * when the regionserver is notified that there was a change in the on disk configs.
475    */
476   protected final ConfigurationManager configurationManager;
477 
478   /**
479    * Starts a HRegionServer at the default location.
480    * @param conf
481    * @throws IOException
482    * @throws InterruptedException
483    */
484   public HRegionServer(Configuration conf) throws IOException, InterruptedException {
485     this(conf, CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
486   }
487 
488   /**
489    * Starts a HRegionServer at the default location
490    * @param conf
491    * @param csm implementation of CoordinatedStateManager to be used
492    * @throws IOException
493    * @throws InterruptedException
494    */
495   public HRegionServer(Configuration conf, CoordinatedStateManager csm)
496       throws IOException, InterruptedException {
497     this.fsOk = true;
498     this.conf = conf;
499     checkCodecs(this.conf);
500     this.userProvider = UserProvider.instantiate(conf);
501     FSUtils.setupShortCircuitRead(this.conf);
502     // Disable usage of meta replicas in the regionserver
503     this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
504 
505     // Config'ed params
506     this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
507         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
508     this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
509     this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
510 
511     this.sleeper = new Sleeper(this.msgInterval, this);
512 
513     boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
514     this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
515 
516     this.numRegionsToReport = conf.getInt(
517       "hbase.regionserver.numregionstoreport", 10);
518 
519     this.operationTimeout = conf.getInt(
520       HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
521       HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
522 
523     this.shortOperationTimeout = conf.getInt(
524       HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
525       HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
526 
527     this.abortRequested = false;
528     this.stopped = false;
529 
530     rpcServices = createRpcServices();
531     this.startcode = System.currentTimeMillis();
532     if (this instanceof HMaster) {
533       useThisHostnameInstead = conf.get(MASTER_HOSTNAME_KEY);
534     } else {
535       useThisHostnameInstead = conf.get(RS_HOSTNAME_KEY);
536     }
537     String hostName = shouldUseThisHostnameInstead() ? useThisHostnameInstead :
538       rpcServices.isa.getHostName();
539     serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode);
540 
541     rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
542     rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
543 
544     // login the zookeeper client principal (if using security)
545     ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
546       HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
547     // login the server principal (if using secure Hadoop)
548     login(userProvider, hostName);
549     // init superusers and add the server principal (if using security)
550     // or process owner as default super user.
551     Superusers.initialize(conf);
552 
553     regionServerAccounting = new RegionServerAccounting();
554     uncaughtExceptionHandler = new UncaughtExceptionHandler() {
555       @Override
556       public void uncaughtException(Thread t, Throwable e) {
557         abort("Uncaught exception in service thread " + t.getName(), e);
558       }
559     };
560 
561     useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
562 
563     // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
564     // underlying hadoop hdfs accessors will be going against wrong filesystem
565     // (unless all is set to defaults).
566     FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
567     // Get fs instance used by this RS.  Do we use checksum verification in the hbase? If hbase
568     // checksum verification enabled, then automatically switch off hdfs checksum verification.
569     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
570     this.fs = new HFileSystem(this.conf, useHBaseChecksum);
571     this.rootDir = FSUtils.getRootDir(this.conf);
572     this.tableDescriptors = new FSTableDescriptors(
573       this.conf, this.fs, this.rootDir, !canUpdateTableDescriptor(), false);
574 
575     service = new ExecutorService(getServerName().toShortString());
576     spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
577 
578     // Some unit tests don't need a cluster, so no zookeeper at all
579     if (!conf.getBoolean("hbase.testing.nocluster", false)) {
580       // Open connection to zookeeper and set primary watcher
581       zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
582         rpcServices.isa.getPort(), this, canCreateBaseZNode());
583 
584       this.csm = (BaseCoordinatedStateManager) csm;
585       this.csm.initialize(this);
586       this.csm.start();
587 
588       tableLockManager = TableLockManager.createTableLockManager(
589         conf, zooKeeper, serverName);
590 
591       masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
592       masterAddressTracker.start();
593 
594       clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
595       clusterStatusTracker.start();
596     }
597     this.configurationManager = new ConfigurationManager();
598 
599     rpcServices.start();
600     putUpWebUI();
601     this.walRoller = new LogRoller(this, this);
602     this.choreService = new ChoreService(getServerName().toString());
603   }
604 
605   /*
606    * Returns true if configured hostname should be used
607    */
608   protected boolean shouldUseThisHostnameInstead() {
609     return useThisHostnameInstead != null && !useThisHostnameInstead.isEmpty();
610   }
611 
612   protected void login(UserProvider user, String host) throws IOException {
613     user.login("hbase.regionserver.keytab.file",
614       "hbase.regionserver.kerberos.principal", host);
615   }
616 
617   protected void waitForMasterActive(){
618   }
619 
620   protected String getProcessName() {
621     return REGIONSERVER;
622   }
623 
624   protected boolean canCreateBaseZNode() {
625     return false;
626   }
627 
628   protected boolean canUpdateTableDescriptor() {
629     return false;
630   }
631 
632   protected RSRpcServices createRpcServices() throws IOException {
633     return new RSRpcServices(this);
634   }
635 
636   protected void configureInfoServer() {
637     infoServer.addServlet("rs-status", "/rs-status", RSStatusServlet.class);
638     infoServer.setAttribute(REGIONSERVER, this);
639   }
640 
641   protected Class<? extends HttpServlet> getDumpServlet() {
642     return RSDumpServlet.class;
643   }
644 
645   protected void doMetrics() {
646   }
647 
648   @Override
649   public boolean registerService(Service instance) {
650     /*
651      * No stacking of instances is allowed for a single service name
652      */
653     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
654     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
655       LOG.error("Coprocessor service " + serviceDesc.getFullName()
656           + " already registered, rejecting request from " + instance);
657       return false;
658     }
659 
660     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
661     if (LOG.isDebugEnabled()) {
662       LOG.debug("Registered regionserver coprocessor service: service="+serviceDesc.getFullName());
663     }
664     return true;
665   }
666 
667   /**
668    * Create a 'smarter' HConnection, one that is capable of by-passing RPC if the request is to
669    * the local server. Safe to use going to local or remote server.
670    * Create this instance in a method can be intercepted and mocked in tests.
671    * @throws IOException
672    */
673   @VisibleForTesting
674   protected ClusterConnection createClusterConnection() throws IOException {
675     // Create a cluster connection that when appropriate, can short-circuit and go directly to the
676     // local server if the request is to the local server bypassing RPC. Can be used for both local
677     // and remote invocations.
678     return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(),
679       serverName, rpcServices, rpcServices);
680   }
681 
682   /**
683    * Run test on configured codecs to make sure supporting libs are in place.
684    * @param c
685    * @throws IOException
686    */
687   private static void checkCodecs(final Configuration c) throws IOException {
688     // check to see if the codec list is available:
689     String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
690     if (codecs == null) return;
691     for (String codec : codecs) {
692       if (!CompressionTest.testCompression(codec)) {
693         throw new IOException("Compression codec " + codec +
694           " not supported, aborting RS construction");
695       }
696     }
697   }
698 
699   public String getClusterId() {
700     return this.clusterId;
701   }
702 
703   /**
704    * Setup our cluster connection if not already initialized.
705    * @throws IOException
706    */
707   protected synchronized void setupClusterConnection() throws IOException {
708     if (clusterConnection == null) {
709       clusterConnection = createClusterConnection();
710       metaTableLocator = new MetaTableLocator();
711     }
712   }
713 
714   /**
715    * All initialization needed before we go register with Master.
716    *
717    * @throws IOException
718    * @throws InterruptedException
719    */
720   private void preRegistrationInitialization(){
721     try {
722       setupClusterConnection();
723 
724       // Health checker thread.
725       if (isHealthCheckerConfigured()) {
726         int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
727           HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
728         healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
729       }
730       this.pauseMonitor = new JvmPauseMonitor(conf);
731       pauseMonitor.start();
732 
733       initializeZooKeeper();
734       if (!isStopped() && !isAborted()) {
735         initializeThreads();
736       }
737     } catch (Throwable t) {
738       // Call stop if error or process will stick around for ever since server
739       // puts up non-daemon threads.
740       this.rpcServices.stop();
741       abort("Initialization of RS failed.  Hence aborting RS.", t);
742     }
743   }
744 
745   /**
746    * Bring up connection to zk ensemble and then wait until a master for this
747    * cluster and then after that, wait until cluster 'up' flag has been set.
748    * This is the order in which master does things.
749    * Finally open long-living server short-circuit connection.
750    * @throws IOException
751    * @throws InterruptedException
752    */
753   private void initializeZooKeeper() throws IOException, InterruptedException {
754     // Create the master address tracker, register with zk, and start it.  Then
755     // block until a master is available.  No point in starting up if no master
756     // running.
757     blockAndCheckIfStopped(this.masterAddressTracker);
758 
759     // Wait on cluster being up.  Master will set this flag up in zookeeper
760     // when ready.
761     blockAndCheckIfStopped(this.clusterStatusTracker);
762 
763     // Retrieve clusterId
764     // Since cluster status is now up
765     // ID should have already been set by HMaster
766     try {
767       clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
768       if (clusterId == null) {
769         this.abort("Cluster ID has not been set");
770       }
771       LOG.info("ClusterId : "+clusterId);
772     } catch (KeeperException e) {
773       this.abort("Failed to retrieve Cluster ID",e);
774     }
775 
776     // In case colocated master, wait here till it's active.
777     // So backup masters won't start as regionservers.
778     // This is to avoid showing backup masters as regionservers
779     // in master web UI, or assigning any region to them.
780     waitForMasterActive();
781     if (isStopped() || isAborted()) {
782       return; // No need for further initialization
783     }
784 
785     // watch for snapshots and other procedures
786     try {
787       rspmHost = new RegionServerProcedureManagerHost();
788       rspmHost.loadProcedures(conf);
789       rspmHost.initialize(this);
790     } catch (KeeperException e) {
791       this.abort("Failed to reach zk cluster when creating procedure handler.", e);
792     }
793     // register watcher for recovering regions
794     this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
795   }
796 
797   /**
798    * Utilty method to wait indefinitely on a znode availability while checking
799    * if the region server is shut down
800    * @param tracker znode tracker to use
801    * @throws IOException any IO exception, plus if the RS is stopped
802    * @throws InterruptedException
803    */
804   private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker)
805       throws IOException, InterruptedException {
806     while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
807       if (this.stopped) {
808         throw new IOException("Received the shutdown message while waiting.");
809       }
810     }
811   }
812 
813   /**
814    * @return False if cluster shutdown in progress
815    */
816   private boolean isClusterUp() {
817     return clusterStatusTracker != null && clusterStatusTracker.isClusterUp();
818   }
819 
820   private void initializeThreads() throws IOException {
821     // Cache flushing thread.
822     this.cacheFlusher = new MemStoreFlusher(conf, this);
823 
824     // Compaction thread
825     this.compactSplitThread = new CompactSplitThread(this);
826 
827     // Background thread to check for compactions; needed if region has not gotten updates
828     // in a while. It will take care of not checking too frequently on store-by-store basis.
829     this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
830     this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
831     this.leases = new Leases(this.threadWakeFrequency);
832 
833     // Create the thread to clean the moved regions list
834     movedRegionsCleaner = MovedRegionsCleaner.create(this);
835 
836     if (this.nonceManager != null) {
837       // Create the scheduled chore that cleans up nonces.
838       nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
839     }
840 
841     // Setup the Quota Manager
842     rsQuotaManager = new RegionServerQuotaManager(this);
843     
844     // Setup RPC client for master communication
845     rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
846         rpcServices.isa.getAddress(), 0));
847 
848     boolean onlyMetaRefresh = false;
849     int storefileRefreshPeriod = conf.getInt(
850         StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
851       , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
852     if (storefileRefreshPeriod == 0) {
853       storefileRefreshPeriod = conf.getInt(
854           StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
855           StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
856       onlyMetaRefresh = true;
857     }
858     if (storefileRefreshPeriod > 0) {
859       this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
860           onlyMetaRefresh, this, this);
861     }
862     registerConfigurationObservers();
863   }
864 
865   private void registerConfigurationObservers() {
866     // Registering the compactSplitThread object with the ConfigurationManager.
867     configurationManager.registerObserver(this.compactSplitThread);
868   }
869 
870   /**
871    * The HRegionServer sticks in this loop until closed.
872    */
873   @Override
874   public void run() {
875     try {
876       // Do pre-registration initializations; zookeeper, lease threads, etc.
877       preRegistrationInitialization();
878     } catch (Throwable e) {
879       abort("Fatal exception during initialization", e);
880     }
881 
882     try {
883       if (!isStopped() && !isAborted()) {
884         ShutdownHook.install(conf, fs, this, Thread.currentThread());
885         // Set our ephemeral znode up in zookeeper now we have a name.
886         createMyEphemeralNode();
887         // Initialize the RegionServerCoprocessorHost now that our ephemeral
888         // node was created, in case any coprocessors want to use ZooKeeper
889         this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
890       }
891 
892       // Try and register with the Master; tell it we are here.  Break if
893       // server is stopped or the clusterup flag is down or hdfs went wacky.
894       while (keepLooping()) {
895         RegionServerStartupResponse w = reportForDuty();
896         if (w == null) {
897           LOG.warn("reportForDuty failed; sleeping and then retrying.");
898           this.sleeper.sleep();
899         } else {
900           handleReportForDutyResponse(w);
901           break;
902         }
903       }
904 
905       if (!isStopped() && isHealthy()){
906         // start the snapshot handler and other procedure handlers,
907         // since the server is ready to run
908         rspmHost.start();
909       }
910       
911       // Start the Quota Manager
912       if (this.rsQuotaManager != null) {
913         rsQuotaManager.start(getRpcServer().getScheduler());
914       }
915 
916       // We registered with the Master.  Go into run mode.
917       long lastMsg = System.currentTimeMillis();
918       long oldRequestCount = -1;
919       // The main run loop.
920       while (!isStopped() && isHealthy()) {
921         if (!isClusterUp()) {
922           if (isOnlineRegionsEmpty()) {
923             stop("Exiting; cluster shutdown set and not carrying any regions");
924           } else if (!this.stopping) {
925             this.stopping = true;
926             LOG.info("Closing user regions");
927             closeUserRegions(this.abortRequested);
928           } else if (this.stopping) {
929             boolean allUserRegionsOffline = areAllUserRegionsOffline();
930             if (allUserRegionsOffline) {
931               // Set stopped if no more write requests tp meta tables
932               // since last time we went around the loop.  Any open
933               // meta regions will be closed on our way out.
934               if (oldRequestCount == getWriteRequestCount()) {
935                 stop("Stopped; only catalog regions remaining online");
936                 break;
937               }
938               oldRequestCount = getWriteRequestCount();
939             } else {
940               // Make sure all regions have been closed -- some regions may
941               // have not got it because we were splitting at the time of
942               // the call to closeUserRegions.
943               closeUserRegions(this.abortRequested);
944             }
945             LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
946           }
947         }
948         long now = System.currentTimeMillis();
949         if ((now - lastMsg) >= msgInterval) {
950           tryRegionServerReport(lastMsg, now);
951           lastMsg = System.currentTimeMillis();
952           doMetrics();
953         }
954         if (!isStopped() && !isAborted()) {
955           this.sleeper.sleep();
956         }
957       } // for
958     } catch (Throwable t) {
959       if (!rpcServices.checkOOME(t)) {
960         String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
961         abort(prefix + t.getMessage(), t);
962       }
963     }
964     // Run shutdown.
965     if (mxBean != null) {
966       MBeanUtil.unregisterMBean(mxBean);
967       mxBean = null;
968     }
969     if (this.leases != null) this.leases.closeAfterLeasesExpire();
970     if (this.splitLogWorker != null) {
971       splitLogWorker.stop();
972     }
973     if (this.infoServer != null) {
974       LOG.info("Stopping infoServer");
975       try {
976         this.infoServer.stop();
977       } catch (Exception e) {
978         LOG.error("Failed to stop infoServer", e);
979       }
980     }
981     // Send cache a shutdown.
982     if (cacheConfig != null && cacheConfig.isBlockCacheEnabled()) {
983       cacheConfig.getBlockCache().shutdown();
984     }
985 
986     if (movedRegionsCleaner != null) {
987       movedRegionsCleaner.stop("Region Server stopping");
988     }
989 
990     // Send interrupts to wake up threads if sleeping so they notice shutdown.
991     // TODO: Should we check they are alive? If OOME could have exited already
992     if (this.hMemManager != null) this.hMemManager.stop();
993     if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
994     if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
995     if (this.compactionChecker != null) this.compactionChecker.cancel(true);
996     if (this.healthCheckChore != null) this.healthCheckChore.cancel(true);
997     if (this.nonceManagerChore != null) this.nonceManagerChore.cancel(true);
998     if (this.storefileRefresher != null) this.storefileRefresher.cancel(true);
999     sendShutdownInterrupt();
1000 
1001     // Stop the quota manager
1002     if (rsQuotaManager != null) {
1003       rsQuotaManager.stop();
1004     }
1005     
1006     // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
1007     if (rspmHost != null) {
1008       rspmHost.stop(this.abortRequested || this.killed);
1009     }
1010 
1011     if (this.killed) {
1012       // Just skip out w/o closing regions.  Used when testing.
1013     } else if (abortRequested) {
1014       if (this.fsOk) {
1015         closeUserRegions(abortRequested); // Don't leave any open file handles
1016       }
1017       LOG.info("aborting server " + this.serverName);
1018     } else {
1019       closeUserRegions(abortRequested);
1020       LOG.info("stopping server " + this.serverName);
1021     }
1022 
1023     // so callers waiting for meta without timeout can stop
1024     if (this.metaTableLocator != null) this.metaTableLocator.stop();
1025     if (this.clusterConnection != null && !clusterConnection.isClosed()) {
1026       try {
1027         this.clusterConnection.close();
1028       } catch (IOException e) {
1029         // Although the {@link Closeable} interface throws an {@link
1030         // IOException}, in reality, the implementation would never do that.
1031         LOG.warn("Attempt to close server's short circuit HConnection failed.", e);
1032       }
1033     }
1034 
1035     // Closing the compactSplit thread before closing meta regions
1036     if (!this.killed && containsMetaTableRegions()) {
1037       if (!abortRequested || this.fsOk) {
1038         if (this.compactSplitThread != null) {
1039           this.compactSplitThread.join();
1040           this.compactSplitThread = null;
1041         }
1042         closeMetaTableRegions(abortRequested);
1043       }
1044     }
1045 
1046     if (!this.killed && this.fsOk) {
1047       waitOnAllRegionsToClose(abortRequested);
1048       LOG.info("stopping server " + this.serverName +
1049         "; all regions closed.");
1050     }
1051 
1052     //fsOk flag may be changed when closing regions throws exception.
1053     if (this.fsOk) {
1054       shutdownWAL(!abortRequested);
1055     }
1056 
1057     // Make sure the proxy is down.
1058     if (this.rssStub != null) {
1059       this.rssStub = null;
1060     }
1061     if (this.rpcClient != null) {
1062       this.rpcClient.close();
1063     }
1064     if (this.leases != null) {
1065       this.leases.close();
1066     }
1067     if (this.pauseMonitor != null) {
1068       this.pauseMonitor.stop();
1069     }
1070 
1071     if (!killed) {
1072       stopServiceThreads();
1073     }
1074 
1075     if (this.rpcServices != null) {
1076       this.rpcServices.stop();
1077     }
1078 
1079     try {
1080       deleteMyEphemeralNode();
1081     } catch (KeeperException.NoNodeException nn) {
1082     } catch (KeeperException e) {
1083       LOG.warn("Failed deleting my ephemeral node", e);
1084     }
1085     // We may have failed to delete the znode at the previous step, but
1086     //  we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1087     ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1088 
1089     if (this.zooKeeper != null) {
1090       this.zooKeeper.close();
1091     }
1092     LOG.info("stopping server " + this.serverName +
1093       "; zookeeper connection closed.");
1094 
1095     LOG.info(Thread.currentThread().getName() + " exiting");
1096   }
1097 
1098   private boolean containsMetaTableRegions() {
1099     return onlineRegions.containsKey(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1100   }
1101 
1102   private boolean areAllUserRegionsOffline() {
1103     if (getNumberOfOnlineRegions() > 2) return false;
1104     boolean allUserRegionsOffline = true;
1105     for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
1106       if (!e.getValue().getRegionInfo().isMetaTable()) {
1107         allUserRegionsOffline = false;
1108         break;
1109       }
1110     }
1111     return allUserRegionsOffline;
1112   }
1113 
1114   /**
1115    * @return Current write count for all online regions.
1116    */
1117   private long getWriteRequestCount() {
1118     long writeCount = 0;
1119     for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
1120       writeCount += e.getValue().getWriteRequestsCount();
1121     }
1122     return writeCount;
1123   }
1124 
1125   @VisibleForTesting
1126   protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1127   throws IOException {
1128     RegionServerStatusService.BlockingInterface rss = rssStub;
1129     if (rss == null) {
1130       // the current server could be stopping.
1131       return;
1132     }
1133     ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1134     try {
1135       RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1136       ServerName sn = ServerName.parseVersionedServerName(
1137         this.serverName.getVersionedBytes());
1138       request.setServer(ProtobufUtil.toServerName(sn));
1139       request.setLoad(sl);
1140       rss.regionServerReport(null, request.build());
1141     } catch (ServiceException se) {
1142       IOException ioe = ProtobufUtil.getRemoteException(se);
1143       if (ioe instanceof YouAreDeadException) {
1144         // This will be caught and handled as a fatal error in run()
1145         throw ioe;
1146       }
1147       if (rssStub == rss) {
1148         rssStub = null;
1149       }
1150       // Couldn't connect to the master, get location from zk and reconnect
1151       // Method blocks until new master is found or we are stopped
1152       createRegionServerStatusStub();
1153     }
1154   }
1155 
1156   ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1157       throws IOException {
1158     // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1159     // per second, and other metrics  As long as metrics are part of ServerLoad it's best to use
1160     // the wrapper to compute those numbers in one place.
1161     // In the long term most of these should be moved off of ServerLoad and the heart beat.
1162     // Instead they should be stored in an HBase table so that external visibility into HBase is
1163     // improved; Additionally the load balancer will be able to take advantage of a more complete
1164     // history.
1165     MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1166     Collection<Region> regions = getOnlineRegionsLocalContext();
1167     MemoryUsage memory = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
1168 
1169     ClusterStatusProtos.ServerLoad.Builder serverLoad =
1170       ClusterStatusProtos.ServerLoad.newBuilder();
1171     serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1172     serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
1173     serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024));
1174     serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024));
1175     Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1176     Builder coprocessorBuilder = Coprocessor.newBuilder();
1177     for (String coprocessor : coprocessors) {
1178       serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1179     }
1180     RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1181     RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1182     for (Region region : regions) {
1183       if (region.getCoprocessorHost() != null) {
1184         Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1185         Iterator<String> iterator = regionCoprocessors.iterator();
1186         while (iterator.hasNext()) {
1187           serverLoad.addCoprocessors(coprocessorBuilder.setName(iterator.next()).build());
1188         }
1189       }
1190       serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1191       for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1192           .getCoprocessors()) {
1193         serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1194       }
1195     }
1196     serverLoad.setReportStartTime(reportStartTime);
1197     serverLoad.setReportEndTime(reportEndTime);
1198     if (this.infoServer != null) {
1199       serverLoad.setInfoServerPort(this.infoServer.getPort());
1200     } else {
1201       serverLoad.setInfoServerPort(-1);
1202     }
1203 
1204     // for the replicationLoad purpose. Only need to get from one service
1205     // either source or sink will get the same info
1206     ReplicationSourceService rsources = getReplicationSourceService();
1207 
1208     if (rsources != null) {
1209       // always refresh first to get the latest value
1210       ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
1211       if (rLoad != null) {
1212         serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1213         for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) {
1214           serverLoad.addReplLoadSource(rLS);
1215         }
1216       }
1217     }
1218 
1219     return serverLoad.build();
1220   }
1221 
1222   String getOnlineRegionsAsPrintableString() {
1223     StringBuilder sb = new StringBuilder();
1224     for (Region r: this.onlineRegions.values()) {
1225       if (sb.length() > 0) sb.append(", ");
1226       sb.append(r.getRegionInfo().getEncodedName());
1227     }
1228     return sb.toString();
1229   }
1230 
1231   /**
1232    * Wait on regions close.
1233    */
1234   private void waitOnAllRegionsToClose(final boolean abort) {
1235     // Wait till all regions are closed before going out.
1236     int lastCount = -1;
1237     long previousLogTime = 0;
1238     Set<String> closedRegions = new HashSet<String>();
1239     boolean interrupted = false;
1240     try {
1241       while (!isOnlineRegionsEmpty()) {
1242         int count = getNumberOfOnlineRegions();
1243         // Only print a message if the count of regions has changed.
1244         if (count != lastCount) {
1245           // Log every second at most
1246           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1247             previousLogTime = System.currentTimeMillis();
1248             lastCount = count;
1249             LOG.info("Waiting on " + count + " regions to close");
1250             // Only print out regions still closing if a small number else will
1251             // swamp the log.
1252             if (count < 10 && LOG.isDebugEnabled()) {
1253               LOG.debug(this.onlineRegions);
1254             }
1255           }
1256         }
1257         // Ensure all user regions have been sent a close. Use this to
1258         // protect against the case where an open comes in after we start the
1259         // iterator of onlineRegions to close all user regions.
1260         for (Map.Entry<String, Region> e : this.onlineRegions.entrySet()) {
1261           HRegionInfo hri = e.getValue().getRegionInfo();
1262           if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1263               && !closedRegions.contains(hri.getEncodedName())) {
1264             closedRegions.add(hri.getEncodedName());
1265             // Don't update zk with this close transition; pass false.
1266             closeRegionIgnoreErrors(hri, abort);
1267               }
1268         }
1269         // No regions in RIT, we could stop waiting now.
1270         if (this.regionsInTransitionInRS.isEmpty()) {
1271           if (!isOnlineRegionsEmpty()) {
1272             LOG.info("We were exiting though online regions are not empty," +
1273                 " because some regions failed closing");
1274           }
1275           break;
1276         }
1277         if (sleep(200)) {
1278           interrupted = true;
1279         }
1280       }
1281     } finally {
1282       if (interrupted) {
1283         Thread.currentThread().interrupt();
1284       }
1285     }
1286   }
1287 
1288   private boolean sleep(long millis) {
1289     boolean interrupted = false;
1290     try {
1291       Thread.sleep(millis);
1292     } catch (InterruptedException e) {
1293       LOG.warn("Interrupted while sleeping");
1294       interrupted = true;
1295     }
1296     return interrupted;
1297   }
1298 
1299   private void shutdownWAL(final boolean close) {
1300     if (this.walFactory != null) {
1301       try {
1302         if (close) {
1303           walFactory.close();
1304         } else {
1305           walFactory.shutdown();
1306         }
1307       } catch (Throwable e) {
1308         e = RemoteExceptionHandler.checkThrowable(e);
1309         LOG.error("Shutdown / close of WAL failed: " + e);
1310         LOG.debug("Shutdown / close exception details:", e);
1311       }
1312     }
1313   }
1314 
1315   /*
1316    * Run init. Sets up wal and starts up all server threads.
1317    *
1318    * @param c Extra configuration.
1319    */
1320   protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1321   throws IOException {
1322     try {
1323       for (NameStringPair e : c.getMapEntriesList()) {
1324         String key = e.getName();
1325         // The hostname the master sees us as.
1326         if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1327           String hostnameFromMasterPOV = e.getValue();
1328           this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1329             rpcServices.isa.getPort(), this.startcode);
1330           if (shouldUseThisHostnameInstead() &&
1331               !hostnameFromMasterPOV.equals(useThisHostnameInstead)) {
1332             String msg = "Master passed us a different hostname to use; was=" +
1333                 this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV;
1334             LOG.error(msg);
1335             throw new IOException(msg);
1336           }
1337           if (!shouldUseThisHostnameInstead() &&
1338               !hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) {
1339             String msg = "Master passed us a different hostname to use; was=" +
1340                 rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV;
1341             LOG.error(msg);
1342           }
1343           continue;
1344         }
1345         String value = e.getValue();
1346         if (LOG.isDebugEnabled()) {
1347           LOG.info("Config from master: " + key + "=" + value);
1348         }
1349         this.conf.set(key, value);
1350       }
1351 
1352       // hack! Maps DFSClient => RegionServer for logs.  HDFS made this
1353       // config param for task trackers, but we can piggyback off of it.
1354       if (this.conf.get("mapreduce.task.attempt.id") == null) {
1355         this.conf.set("mapreduce.task.attempt.id", "hb_rs_" +
1356           this.serverName.toString());
1357       }
1358 
1359       // Save it in a file, this will allow to see if we crash
1360       ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1361 
1362       this.cacheConfig = new CacheConfig(conf);
1363       this.walFactory = setupWALAndReplication();
1364       // Init in here rather than in constructor after thread name has been set
1365       this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
1366 
1367       startServiceThreads();
1368       startHeapMemoryManager();
1369       LOG.info("Serving as " + this.serverName +
1370         ", RpcServer on " + rpcServices.isa +
1371         ", sessionid=0x" +
1372         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1373 
1374       // Wake up anyone waiting for this server to online
1375       synchronized (online) {
1376         online.set(true);
1377         online.notifyAll();
1378       }
1379     } catch (Throwable e) {
1380       stop("Failed initialization");
1381       throw convertThrowableToIOE(cleanup(e, "Failed init"),
1382           "Region server startup failed");
1383     } finally {
1384       sleeper.skipSleepCycle();
1385     }
1386   }
1387 
1388   private void startHeapMemoryManager() {
1389     this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this);
1390     if (this.hMemManager != null) {
1391       this.hMemManager.start(getChoreService());
1392     }
1393   }
1394 
1395   private void createMyEphemeralNode() throws KeeperException, IOException {
1396     RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1397     rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1398     byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1399     ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper,
1400       getMyEphemeralNodePath(), data);
1401   }
1402 
1403   private void deleteMyEphemeralNode() throws KeeperException {
1404     ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1405   }
1406 
1407   @Override
1408   public RegionServerAccounting getRegionServerAccounting() {
1409     return regionServerAccounting;
1410   }
1411 
1412   @Override
1413   public TableLockManager getTableLockManager() {
1414     return tableLockManager;
1415   }
1416 
1417   /*
1418    * @param r Region to get RegionLoad for.
1419    * @param regionLoadBldr the RegionLoad.Builder, can be null
1420    * @param regionSpecifier the RegionSpecifier.Builder, can be null
1421    * @return RegionLoad instance.
1422    *
1423    * @throws IOException
1424    */
1425   private RegionLoad createRegionLoad(final Region r, RegionLoad.Builder regionLoadBldr,
1426       RegionSpecifier.Builder regionSpecifier) throws IOException {
1427     byte[] name = r.getRegionInfo().getRegionName();
1428     int stores = 0;
1429     int storefiles = 0;
1430     int storeUncompressedSizeMB = 0;
1431     int storefileSizeMB = 0;
1432     int memstoreSizeMB = (int) (r.getMemstoreSize() / 1024 / 1024);
1433     int storefileIndexSizeMB = 0;
1434     int rootIndexSizeKB = 0;
1435     int totalStaticIndexSizeKB = 0;
1436     int totalStaticBloomSizeKB = 0;
1437     long totalCompactingKVs = 0;
1438     long currentCompactedKVs = 0;
1439     List<Store> storeList = r.getStores();
1440     stores += storeList.size();
1441     for (Store store : storeList) {
1442       storefiles += store.getStorefilesCount();
1443       storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024);
1444       storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1445       storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
1446       CompactionProgress progress = store.getCompactionProgress();
1447       if (progress != null) {
1448         totalCompactingKVs += progress.totalCompactingKVs;
1449         currentCompactedKVs += progress.currentCompactedKVs;
1450       }
1451       rootIndexSizeKB += (int) (store.getStorefilesIndexSize() / 1024);
1452       totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024);
1453       totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024);
1454     }
1455 
1456     float dataLocality =
1457         r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname());
1458     if (regionLoadBldr == null) {
1459       regionLoadBldr = RegionLoad.newBuilder();
1460     }
1461     if (regionSpecifier == null) {
1462       regionSpecifier = RegionSpecifier.newBuilder();
1463     }
1464     regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1465     regionSpecifier.setValue(ByteStringer.wrap(name));
1466     regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1467       .setStores(stores)
1468       .setStorefiles(storefiles)
1469       .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1470       .setStorefileSizeMB(storefileSizeMB)
1471       .setMemstoreSizeMB(memstoreSizeMB)
1472       .setStorefileIndexSizeMB(storefileIndexSizeMB)
1473       .setRootIndexSizeKB(rootIndexSizeKB)
1474       .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1475       .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1476       .setReadRequestsCount(r.getReadRequestsCount())
1477       .setWriteRequestsCount(r.getWriteRequestsCount())
1478       .setTotalCompactingKVs(totalCompactingKVs)
1479       .setCurrentCompactedKVs(currentCompactedKVs)
1480       .setDataLocality(dataLocality)
1481       .setLastMajorCompactionTs(r.getOldestHfileTs(true));
1482     ((HRegion)r).setCompleteSequenceId(regionLoadBldr);
1483 
1484     return regionLoadBldr.build();
1485   }
1486 
1487   /**
1488    * @param encodedRegionName
1489    * @return An instance of RegionLoad.
1490    */
1491   public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1492     Region r = onlineRegions.get(encodedRegionName);
1493     return r != null ? createRegionLoad(r, null, null) : null;
1494   }
1495 
1496   /*
1497    * Inner class that runs on a long period checking if regions need compaction.
1498    */
1499   private static class CompactionChecker extends ScheduledChore {
1500     private final HRegionServer instance;
1501     private final int majorCompactPriority;
1502     private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1503     private long iteration = 0;
1504 
1505     CompactionChecker(final HRegionServer h, final int sleepTime,
1506         final Stoppable stopper) {
1507       super("CompactionChecker", stopper, sleepTime);
1508       this.instance = h;
1509       LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));
1510 
1511       /* MajorCompactPriority is configurable.
1512        * If not set, the compaction will use default priority.
1513        */
1514       this.majorCompactPriority = this.instance.conf.
1515         getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1516         DEFAULT_PRIORITY);
1517     }
1518 
1519     @Override
1520     protected void chore() {
1521       for (Region r : this.instance.onlineRegions.values()) {
1522         if (r == null)
1523           continue;
1524         for (Store s : r.getStores()) {
1525           try {
1526             long multiplier = s.getCompactionCheckMultiplier();
1527             assert multiplier > 0;
1528             if (iteration % multiplier != 0) continue;
1529             if (s.needsCompaction()) {
1530               // Queue a compaction. Will recognize if major is needed.
1531               this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
1532                   + " requests compaction");
1533             } else if (s.isMajorCompaction()) {
1534               if (majorCompactPriority == DEFAULT_PRIORITY
1535                   || majorCompactPriority > ((HRegion)r).getCompactPriority()) {
1536                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1537                     + " requests major compaction; use default priority", null);
1538               } else {
1539                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1540                     + " requests major compaction; use configured priority",
1541                   this.majorCompactPriority, null, null);
1542               }
1543             }
1544           } catch (IOException e) {
1545             LOG.warn("Failed major compaction check on " + r, e);
1546           }
1547         }
1548       }
1549       iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1550     }
1551   }
1552 
1553   static class PeriodicMemstoreFlusher extends ScheduledChore {
1554     final HRegionServer server;
1555     final static int RANGE_OF_DELAY = 20000; //millisec
1556     final static int MIN_DELAY_TIME = 3000; //millisec
1557     public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1558       super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval);
1559       this.server = server;
1560     }
1561 
1562     @Override
1563     protected void chore() {
1564       for (Region r : this.server.onlineRegions.values()) {
1565         if (r == null)
1566           continue;
1567         if (((HRegion)r).shouldFlush()) {
1568           FlushRequester requester = server.getFlushRequester();
1569           if (requester != null) {
1570             long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
1571             LOG.info(getName() + " requesting flush for region " +
1572               r.getRegionInfo().getRegionNameAsString() + " after a delay of " + randomDelay);
1573             //Throttle the flushes by putting a delay. If we don't throttle, and there
1574             //is a balanced write-load on the regions in a table, we might end up
1575             //overwhelming the filesystem with too many flushes at once.
1576             requester.requestDelayedFlush(r, randomDelay, false);
1577           }
1578         }
1579       }
1580     }
1581   }
1582 
1583   /**
1584    * Report the status of the server. A server is online once all the startup is
1585    * completed (setting up filesystem, starting service threads, etc.). This
1586    * method is designed mostly to be useful in tests.
1587    *
1588    * @return true if online, false if not.
1589    */
1590   public boolean isOnline() {
1591     return online.get();
1592   }
1593 
1594   /**
1595    * Setup WAL log and replication if enabled.
1596    * Replication setup is done in here because it wants to be hooked up to WAL.
1597    * @return A WAL instance.
1598    * @throws IOException
1599    */
1600   private WALFactory setupWALAndReplication() throws IOException {
1601     // TODO Replication make assumptions here based on the default filesystem impl
1602     final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1603     final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString());
1604 
1605     Path logdir = new Path(rootDir, logName);
1606     if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1607     if (this.fs.exists(logdir)) {
1608       throw new RegionServerRunningException("Region server has already " +
1609         "created directory at " + this.serverName.toString());
1610     }
1611 
1612     // Instantiate replication manager if replication enabled.  Pass it the
1613     // log directories.
1614     createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
1615 
1616     // listeners the wal factory will add to wals it creates.
1617     final List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1618     listeners.add(new MetricsWAL());
1619     if (this.replicationSourceHandler != null &&
1620         this.replicationSourceHandler.getWALActionsListener() != null) {
1621       // Replication handler is an implementation of WALActionsListener.
1622       listeners.add(this.replicationSourceHandler.getWALActionsListener());
1623     }
1624 
1625     return new WALFactory(conf, listeners, serverName.toString());
1626   }
1627 
1628   /**
1629    * We initialize the roller for the wal that handles meta lazily
1630    * since we don't know if this regionserver will handle it. All calls to
1631    * this method return a reference to the that same roller. As newly referenced
1632    * meta regions are brought online, they will be offered to the roller for maintenance.
1633    * As a part of that registration process, the roller will add itself as a
1634    * listener on the wal.
1635    */
1636   protected LogRoller ensureMetaWALRoller() {
1637     // Using a tmp log roller to ensure metaLogRoller is alive once it is not
1638     // null
1639     LogRoller roller = metawalRoller.get();
1640     if (null == roller) {
1641       LogRoller tmpLogRoller = new LogRoller(this, this);
1642       String n = Thread.currentThread().getName();
1643       Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
1644           n + "-MetaLogRoller", uncaughtExceptionHandler);
1645       if (metawalRoller.compareAndSet(null, tmpLogRoller)) {
1646         roller = tmpLogRoller;
1647       } else {
1648         // Another thread won starting the roller
1649         Threads.shutdown(tmpLogRoller.getThread());
1650         roller = metawalRoller.get();
1651       }
1652     }
1653     return roller;
1654   }
1655 
1656   public MetricsRegionServer getRegionServerMetrics() {
1657     return this.metricsRegionServer;
1658   }
1659 
1660   /**
1661    * @return Master address tracker instance.
1662    */
1663   public MasterAddressTracker getMasterAddressTracker() {
1664     return this.masterAddressTracker;
1665   }
1666 
1667   /*
1668    * Start maintenance Threads, Server, Worker and lease checker threads.
1669    * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1670    * get an unhandled exception. We cannot set the handler on all threads.
1671    * Server's internal Listener thread is off limits. For Server, if an OOME, it
1672    * waits a while then retries. Meantime, a flush or a compaction that tries to
1673    * run should trigger same critical condition and the shutdown will run. On
1674    * its way out, this server will shut down Server. Leases are sort of
1675    * inbetween. It has an internal thread that while it inherits from Chore, it
1676    * keeps its own internal stop mechanism so needs to be stopped by this
1677    * hosting server. Worker logs the exception and exits.
1678    */
1679   private void startServiceThreads() throws IOException {
1680     // Start executor services
1681     this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
1682       conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1683     this.service.startExecutorService(ExecutorType.RS_OPEN_META,
1684       conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1685     this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1686       conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1687     this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
1688       conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1689     if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1690       this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
1691         conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
1692     }
1693     this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
1694        "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
1695 
1696     if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1697       this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
1698         conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1699           conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
1700     }
1701 
1702     Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
1703         uncaughtExceptionHandler);
1704     this.cacheFlusher.start(uncaughtExceptionHandler);
1705 
1706     if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);
1707     if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher);
1708     if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore);
1709     if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore);
1710     if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher);
1711     if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner);
1712 
1713     // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1714     // an unhandled exception, it will just exit.
1715     Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker",
1716       uncaughtExceptionHandler);
1717 
1718     if (this.replicationSourceHandler == this.replicationSinkHandler &&
1719         this.replicationSourceHandler != null) {
1720       this.replicationSourceHandler.startReplicationService();
1721     } else {
1722       if (this.replicationSourceHandler != null) {
1723         this.replicationSourceHandler.startReplicationService();
1724       }
1725       if (this.replicationSinkHandler != null) {
1726         this.replicationSinkHandler.startReplicationService();
1727       }
1728     }
1729 
1730     // Create the log splitting worker and start it
1731     // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
1732     // quite a while inside HConnection layer. The worker won't be available for other
1733     // tasks even after current task is preempted after a split task times out.
1734     Configuration sinkConf = HBaseConfiguration.create(conf);
1735     sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1736       conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
1737     sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1738       conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
1739     sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
1740     this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory);
1741     splitLogWorker.start();
1742   }
1743 
1744   /**
1745    * Puts up the webui.
1746    * @return Returns final port -- maybe different from what we started with.
1747    * @throws IOException
1748    */
1749   private int putUpWebUI() throws IOException {
1750     int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
1751       HConstants.DEFAULT_REGIONSERVER_INFOPORT);
1752     String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
1753 
1754     if(this instanceof HMaster) {
1755       port = conf.getInt(HConstants.MASTER_INFO_PORT,
1756           HConstants.DEFAULT_MASTER_INFOPORT);
1757       addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
1758     }
1759     // -1 is for disabling info server
1760     if (port < 0) return port;
1761 
1762     if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
1763       String msg =
1764           "Failed to start http info server. Address " + addr
1765               + " does not belong to this host. Correct configuration parameter: "
1766               + "hbase.regionserver.info.bindAddress";
1767       LOG.error(msg);
1768       throw new IOException(msg);
1769     }
1770     // check if auto port bind enabled
1771     boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
1772         false);
1773     while (true) {
1774       try {
1775         this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
1776         infoServer.addServlet("dump", "/dump", getDumpServlet());
1777         configureInfoServer();
1778         this.infoServer.start();
1779         break;
1780       } catch (BindException e) {
1781         if (!auto) {
1782           // auto bind disabled throw BindException
1783           LOG.error("Failed binding http info server to port: " + port);
1784           throw e;
1785         }
1786         // auto bind enabled, try to use another port
1787         LOG.info("Failed binding http info server to port: " + port);
1788         port++;
1789       }
1790     }
1791     port = this.infoServer.getPort();
1792     conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
1793     int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT,
1794       HConstants.DEFAULT_MASTER_INFOPORT);
1795     conf.setInt("hbase.master.info.port.orig", masterInfoPort);
1796     conf.setInt(HConstants.MASTER_INFO_PORT, port);
1797     return port;
1798   }
1799 
1800   /*
1801    * Verify that server is healthy
1802    */
1803   private boolean isHealthy() {
1804     if (!fsOk) {
1805       // File system problem
1806       return false;
1807     }
1808     // Verify that all threads are alive
1809     if (!(leases.isAlive()
1810         && cacheFlusher.isAlive() && walRoller.isAlive()
1811         && this.compactionChecker.isScheduled()
1812         && this.periodicFlusher.isScheduled())) {
1813       stop("One or more threads are no longer alive -- stop");
1814       return false;
1815     }
1816     final LogRoller metawalRoller = this.metawalRoller.get();
1817     if (metawalRoller != null && !metawalRoller.isAlive()) {
1818       stop("Meta WAL roller thread is no longer alive -- stop");
1819       return false;
1820     }
1821     return true;
1822   }
1823 
1824   private static final byte[] UNSPECIFIED_REGION = new byte[]{};
1825 
1826   @Override
1827   public WAL getWAL(HRegionInfo regionInfo) throws IOException {
1828     WAL wal;
1829     LogRoller roller = walRoller;
1830     //_ROOT_ and hbase:meta regions have separate WAL.
1831     if (regionInfo != null && regionInfo.isMetaTable() &&
1832         regionInfo.getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
1833       roller = ensureMetaWALRoller();
1834       wal = walFactory.getMetaWAL(regionInfo.getEncodedNameAsBytes());
1835     } else if (regionInfo == null) {
1836       wal = walFactory.getWAL(UNSPECIFIED_REGION);
1837     } else {
1838       wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes());
1839     }
1840     roller.addWAL(wal);
1841     return wal;
1842   }
1843 
1844   @Override
1845   public ClusterConnection getConnection() {
1846     return this.clusterConnection;
1847   }
1848 
1849   @Override
1850   public MetaTableLocator getMetaTableLocator() {
1851     return this.metaTableLocator;
1852   }
1853 
1854   @Override
1855   public void stop(final String msg) {
1856     if (!this.stopped) {
1857       try {
1858         if (this.rsHost != null) {
1859           this.rsHost.preStop(msg);
1860         }
1861         this.stopped = true;
1862         LOG.info("STOPPED: " + msg);
1863         // Wakes run() if it is sleeping
1864         sleeper.skipSleepCycle();
1865       } catch (IOException exp) {
1866         LOG.warn("The region server did not stop", exp);
1867       }
1868     }
1869   }
1870 
1871   public void waitForServerOnline(){
1872     while (!isStopped() && !isOnline()) {
1873       synchronized (online) {
1874         try {
1875           online.wait(msgInterval);
1876         } catch (InterruptedException ie) {
1877           Thread.currentThread().interrupt();
1878           break;
1879         }
1880       }
1881     }
1882   }
1883 
1884   @Override
1885   public void postOpenDeployTasks(final Region r) throws KeeperException, IOException {
1886     postOpenDeployTasks(new PostOpenDeployContext(r, -1));
1887   }
1888 
1889   @Override
1890   public void postOpenDeployTasks(final PostOpenDeployContext context)
1891       throws KeeperException, IOException {
1892     Region r = context.getRegion();
1893     long masterSystemTime = context.getMasterSystemTime();
1894     Preconditions.checkArgument(r instanceof HRegion, "r must be an HRegion");
1895     rpcServices.checkOpen();
1896     LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString());
1897     // Do checks to see if we need to compact (references or too many files)
1898     for (Store s : r.getStores()) {
1899       if (s.hasReferences() || s.needsCompaction()) {
1900        this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
1901       }
1902     }
1903     long openSeqNum = r.getOpenSeqNum();
1904     if (openSeqNum == HConstants.NO_SEQNUM) {
1905       // If we opened a region, we should have read some sequence number from it.
1906       LOG.error("No sequence number found when opening " +
1907         r.getRegionInfo().getRegionNameAsString());
1908       openSeqNum = 0;
1909     }
1910 
1911     // Update flushed sequence id of a recovering region in ZK
1912     updateRecoveringRegionLastFlushedSequenceId(r);
1913 
1914     // Update ZK, or META
1915     if (r.getRegionInfo().isMetaRegion()) {
1916       MetaTableLocator.setMetaLocation(getZooKeeper(), serverName, r.getRegionInfo().getReplicaId(),
1917          State.OPEN);
1918     } else if (useZKForAssignment) {
1919       MetaTableAccessor.updateRegionLocation(getConnection(), r.getRegionInfo(),
1920         this.serverName, openSeqNum, masterSystemTime);
1921     }
1922     if (!useZKForAssignment && !reportRegionStateTransition(new RegionStateTransitionContext(
1923         TransitionCode.OPENED, openSeqNum, masterSystemTime, r.getRegionInfo()))) {
1924       throw new IOException("Failed to report opened region to master: "
1925         + r.getRegionInfo().getRegionNameAsString());
1926     }
1927 
1928     triggerFlushInPrimaryRegion((HRegion)r);
1929 
1930     LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
1931   }
1932 
1933   @Override
1934   public boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris) {
1935     return reportRegionStateTransition(code, HConstants.NO_SEQNUM, hris);
1936   }
1937 
1938   @Override
1939   public boolean reportRegionStateTransition(
1940       TransitionCode code, long openSeqNum, HRegionInfo... hris) {
1941     return reportRegionStateTransition(
1942       new RegionStateTransitionContext(code, HConstants.NO_SEQNUM, -1, hris));
1943   }
1944 
1945   @Override
1946   public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
1947     TransitionCode code = context.getCode();
1948     long openSeqNum = context.getOpenSeqNum();
1949     long masterSystemTime = context.getMasterSystemTime();
1950     HRegionInfo[] hris = context.getHris();
1951 
1952     ReportRegionStateTransitionRequest.Builder builder =
1953       ReportRegionStateTransitionRequest.newBuilder();
1954     builder.setServer(ProtobufUtil.toServerName(serverName));
1955     RegionStateTransition.Builder transition = builder.addTransitionBuilder();
1956     transition.setTransitionCode(code);
1957     if (code == TransitionCode.OPENED && openSeqNum >= 0) {
1958       transition.setOpenSeqNum(openSeqNum);
1959     }
1960     for (HRegionInfo hri: hris) {
1961       transition.addRegionInfo(HRegionInfo.convert(hri));
1962     }
1963     ReportRegionStateTransitionRequest request = builder.build();
1964     while (keepLooping()) {
1965       RegionServerStatusService.BlockingInterface rss = rssStub;
1966       try {
1967         if (rss == null) {
1968           createRegionServerStatusStub();
1969           continue;
1970         }
1971         ReportRegionStateTransitionResponse response =
1972           rss.reportRegionStateTransition(null, request);
1973         if (response.hasErrorMessage()) {
1974           LOG.info("Failed to transition " + hris[0]
1975             + " to " + code + ": " + response.getErrorMessage());
1976           return false;
1977         }
1978         return true;
1979       } catch (ServiceException se) {
1980         IOException ioe = ProtobufUtil.getRemoteException(se);
1981         LOG.info("Failed to report region transition, will retry", ioe);
1982         if (rssStub == rss) {
1983           rssStub = null;
1984         }
1985       }
1986     }
1987     return false;
1988   }
1989 
1990   /**
1991    * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
1992    * block this thread. See RegionReplicaFlushHandler for details.
1993    */
1994   void triggerFlushInPrimaryRegion(final HRegion region) {
1995     if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
1996       return;
1997     }
1998     if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) ||
1999         !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(
2000           region.conf)) {
2001       region.setReadsEnabled(true);
2002       return;
2003     }
2004 
2005     region.setReadsEnabled(false); // disable reads before marking the region as opened.
2006     // RegionReplicaFlushHandler might reset this.
2007 
2008     // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2009     this.service.submit(
2010       new RegionReplicaFlushHandler(this, clusterConnection,
2011         rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
2012   }
2013 
2014   @Override
2015   public RpcServerInterface getRpcServer() {
2016     return rpcServices.rpcServer;
2017   }
2018 
2019   @VisibleForTesting
2020   public RSRpcServices getRSRpcServices() {
2021     return rpcServices;
2022   }
2023 
2024   /**
2025    * Cause the server to exit without closing the regions it is serving, the log
2026    * it is using and without notifying the master. Used unit testing and on
2027    * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
2028    *
2029    * @param reason
2030    *          the reason we are aborting
2031    * @param cause
2032    *          the exception that caused the abort, or null
2033    */
2034   @Override
2035   public void abort(String reason, Throwable cause) {
2036     String msg = "ABORTING region server " + this + ": " + reason;
2037     if (cause != null) {
2038       LOG.fatal(msg, cause);
2039     } else {
2040       LOG.fatal(msg);
2041     }
2042     this.abortRequested = true;
2043     // HBASE-4014: show list of coprocessors that were loaded to help debug
2044     // regionserver crashes.Note that we're implicitly using
2045     // java.util.HashSet's toString() method to print the coprocessor names.
2046     LOG.fatal("RegionServer abort: loaded coprocessors are: " +
2047         CoprocessorHost.getLoadedCoprocessors());
2048     // Try and dump metrics if abort -- might give clue as to how fatal came about....
2049     try {
2050       LOG.info("Dump of metrics as JSON on abort: " + JSONBean.dumpRegionServerMetrics());
2051     } catch (MalformedObjectNameException | IOException e) {
2052       LOG.warn("Failed dumping metrics", e);
2053     }
2054 
2055     // Do our best to report our abort to the master, but this may not work
2056     try {
2057       if (cause != null) {
2058         msg += "\nCause:\n" + StringUtils.stringifyException(cause);
2059       }
2060       // Report to the master but only if we have already registered with the master.
2061       if (rssStub != null && this.serverName != null) {
2062         ReportRSFatalErrorRequest.Builder builder =
2063           ReportRSFatalErrorRequest.newBuilder();
2064         ServerName sn =
2065           ServerName.parseVersionedServerName(this.serverName.getVersionedBytes());
2066         builder.setServer(ProtobufUtil.toServerName(sn));
2067         builder.setErrorMessage(msg);
2068         rssStub.reportRSFatalError(null, builder.build());
2069       }
2070     } catch (Throwable t) {
2071       LOG.warn("Unable to report fatal error to master", t);
2072     }
2073     stop(reason);
2074   }
2075 
2076   /**
2077    * @see HRegionServer#abort(String, Throwable)
2078    */
2079   public void abort(String reason) {
2080     abort(reason, null);
2081   }
2082 
2083   @Override
2084   public boolean isAborted() {
2085     return this.abortRequested;
2086   }
2087 
2088   /*
2089    * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
2090    * logs but it does close socket in case want to bring up server on old
2091    * hostname+port immediately.
2092    */
2093   protected void kill() {
2094     this.killed = true;
2095     abort("Simulated kill");
2096   }
2097 
2098   /**
2099    * Called on stop/abort before closing the cluster connection and meta locator.
2100    */
2101   protected void sendShutdownInterrupt() {
2102   }
2103 
2104   /**
2105    * Wait on all threads to finish. Presumption is that all closes and stops
2106    * have already been called.
2107    */
2108   protected void stopServiceThreads() {
2109     // clean up the scheduled chores
2110     if (this.choreService != null) choreService.shutdown();
2111     if (this.nonceManagerChore != null) nonceManagerChore.cancel(true);
2112     if (this.compactionChecker != null) compactionChecker.cancel(true);
2113     if (this.periodicFlusher != null) periodicFlusher.cancel(true);
2114     if (this.healthCheckChore != null) healthCheckChore.cancel(true);
2115     if (this.storefileRefresher != null) storefileRefresher.cancel(true);
2116     if (this.movedRegionsCleaner != null) movedRegionsCleaner.cancel(true);
2117 
2118     if (this.cacheFlusher != null) {
2119       this.cacheFlusher.join();
2120     }
2121 
2122     if (this.spanReceiverHost != null) {
2123       this.spanReceiverHost.closeReceivers();
2124     }
2125     if (this.walRoller != null) {
2126       Threads.shutdown(this.walRoller.getThread());
2127     }
2128     final LogRoller metawalRoller = this.metawalRoller.get();
2129     if (metawalRoller != null) {
2130       Threads.shutdown(metawalRoller.getThread());
2131     }
2132     if (this.compactSplitThread != null) {
2133       this.compactSplitThread.join();
2134     }
2135     if (this.service != null) this.service.shutdown();
2136     if (this.replicationSourceHandler != null &&
2137         this.replicationSourceHandler == this.replicationSinkHandler) {
2138       this.replicationSourceHandler.stopReplicationService();
2139     } else {
2140       if (this.replicationSourceHandler != null) {
2141         this.replicationSourceHandler.stopReplicationService();
2142       }
2143       if (this.replicationSinkHandler != null) {
2144         this.replicationSinkHandler.stopReplicationService();
2145       }
2146     }
2147   }
2148 
2149   /**
2150    * @return Return the object that implements the replication
2151    * source service.
2152    */
2153   ReplicationSourceService getReplicationSourceService() {
2154     return replicationSourceHandler;
2155   }
2156 
2157   /**
2158    * @return Return the object that implements the replication
2159    * sink service.
2160    */
2161   ReplicationSinkService getReplicationSinkService() {
2162     return replicationSinkHandler;
2163   }
2164 
2165   /**
2166    * Get the current master from ZooKeeper and open the RPC connection to it.
2167    * To get a fresh connection, the current rssStub must be null.
2168    * Method will block until a master is available. You can break from this
2169    * block by requesting the server stop.
2170    *
2171    * @return master + port, or null if server has been stopped
2172    */
2173   @VisibleForTesting
2174   protected synchronized ServerName createRegionServerStatusStub() {
2175     if (rssStub != null) {
2176       return masterAddressTracker.getMasterAddress();
2177     }
2178     ServerName sn = null;
2179     long previousLogTime = 0;
2180     boolean refresh = false; // for the first time, use cached data
2181     RegionServerStatusService.BlockingInterface intf = null;
2182     boolean interrupted = false;
2183     try {
2184       while (keepLooping()) {
2185         sn = this.masterAddressTracker.getMasterAddress(refresh);
2186         if (sn == null) {
2187           if (!keepLooping()) {
2188             // give up with no connection.
2189             LOG.debug("No master found and cluster is stopped; bailing out");
2190             return null;
2191           }
2192           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2193             LOG.debug("No master found; retry");
2194             previousLogTime = System.currentTimeMillis();
2195           }
2196           refresh = true; // let's try pull it from ZK directly
2197           if (sleep(200)) {
2198             interrupted = true;
2199           }
2200           continue;
2201         }
2202 
2203         // If we are on the active master, use the shortcut
2204         if (this instanceof HMaster && sn.equals(getServerName())) {
2205           intf = ((HMaster)this).getMasterRpcServices();
2206           break;
2207         }
2208         try {
2209           BlockingRpcChannel channel =
2210             this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(),
2211               shortOperationTimeout);
2212           intf = RegionServerStatusService.newBlockingStub(channel);
2213           break;
2214         } catch (IOException e) {
2215           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2216             e = e instanceof RemoteException ?
2217               ((RemoteException)e).unwrapRemoteException() : e;
2218             if (e instanceof ServerNotRunningYetException) {
2219               LOG.info("Master isn't available yet, retrying");
2220             } else {
2221               LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2222             }
2223             previousLogTime = System.currentTimeMillis();
2224           }
2225           if (sleep(200)) {
2226             interrupted = true;
2227           }
2228         }
2229       }
2230     } finally {
2231       if (interrupted) {
2232         Thread.currentThread().interrupt();
2233       }
2234     }
2235     rssStub = intf;
2236     return sn;
2237   }
2238 
2239   /**
2240    * @return True if we should break loop because cluster is going down or
2241    * this server has been stopped or hdfs has gone bad.
2242    */
2243   private boolean keepLooping() {
2244     return !this.stopped && isClusterUp();
2245   }
2246 
2247   /*
2248    * Let the master know we're here Run initialization using parameters passed
2249    * us by the master.
2250    * @return A Map of key/value configurations we got from the Master else
2251    * null if we failed to register.
2252    * @throws IOException
2253    */
2254   private RegionServerStartupResponse reportForDuty() throws IOException {
2255     ServerName masterServerName = createRegionServerStatusStub();
2256     if (masterServerName == null) return null;
2257     RegionServerStartupResponse result = null;
2258     try {
2259       rpcServices.requestCount.set(0);
2260       LOG.info("reportForDuty to master=" + masterServerName + " with port="
2261         + rpcServices.isa.getPort() + ", startcode=" + this.startcode);
2262       long now = EnvironmentEdgeManager.currentTime();
2263       int port = rpcServices.isa.getPort();
2264       RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2265       if (shouldUseThisHostnameInstead()) {
2266         request.setUseThisHostnameInstead(useThisHostnameInstead);
2267       }
2268       request.setPort(port);
2269       request.setServerStartCode(this.startcode);
2270       request.setServerCurrentTime(now);
2271       result = this.rssStub.regionServerStartup(null, request.build());
2272     } catch (ServiceException se) {
2273       IOException ioe = ProtobufUtil.getRemoteException(se);
2274       if (ioe instanceof ClockOutOfSyncException) {
2275         LOG.fatal("Master rejected startup because clock is out of sync", ioe);
2276         // Re-throw IOE will cause RS to abort
2277         throw ioe;
2278       } else if (ioe instanceof ServerNotRunningYetException) {
2279         LOG.debug("Master is not running yet");
2280       } else {
2281         LOG.warn("error telling master we are up", se);
2282       }
2283       rssStub = null;
2284     }
2285     return result;
2286   }
2287 
2288   @Override
2289   public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2290     try {
2291       GetLastFlushedSequenceIdRequest req =
2292           RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2293       RegionServerStatusService.BlockingInterface rss = rssStub;
2294       if (rss == null) { // Try to connect one more time
2295         createRegionServerStatusStub();
2296         rss = rssStub;
2297         if (rss == null) {
2298           // Still no luck, we tried
2299           LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2300           return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2301               .build();
2302         }
2303       }
2304       GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2305       return RegionStoreSequenceIds.newBuilder()
2306           .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2307           .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2308     } catch (ServiceException e) {
2309       LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2310       return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2311           .build();
2312     }
2313   }
2314 
2315   /**
2316    * Closes all regions.  Called on our way out.
2317    * Assumes that its not possible for new regions to be added to onlineRegions
2318    * while this method runs.
2319    */
2320   protected void closeAllRegions(final boolean abort) {
2321     closeUserRegions(abort);
2322     closeMetaTableRegions(abort);
2323   }
2324 
2325   /**
2326    * Close meta region if we carry it
2327    * @param abort Whether we're running an abort.
2328    */
2329   void closeMetaTableRegions(final boolean abort) {
2330     Region meta = null;
2331     this.lock.writeLock().lock();
2332     try {
2333       for (Map.Entry<String, Region> e: onlineRegions.entrySet()) {
2334         HRegionInfo hri = e.getValue().getRegionInfo();
2335         if (hri.isMetaRegion()) {
2336           meta = e.getValue();
2337         }
2338         if (meta != null) break;
2339       }
2340     } finally {
2341       this.lock.writeLock().unlock();
2342     }
2343     if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2344   }
2345 
2346   /**
2347    * Schedule closes on all user regions.
2348    * Should be safe calling multiple times because it wont' close regions
2349    * that are already closed or that are closing.
2350    * @param abort Whether we're running an abort.
2351    */
2352   void closeUserRegions(final boolean abort) {
2353     this.lock.writeLock().lock();
2354     try {
2355       for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
2356         Region r = e.getValue();
2357         if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) {
2358           // Don't update zk with this close transition; pass false.
2359           closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2360         }
2361       }
2362     } finally {
2363       this.lock.writeLock().unlock();
2364     }
2365   }
2366 
2367   /** @return the info server */
2368   public InfoServer getInfoServer() {
2369     return infoServer;
2370   }
2371 
2372   /**
2373    * @return true if a stop has been requested.
2374    */
2375   @Override
2376   public boolean isStopped() {
2377     return this.stopped;
2378   }
2379 
2380   @Override
2381   public boolean isStopping() {
2382     return this.stopping;
2383   }
2384 
2385   @Override
2386   public Map<String, Region> getRecoveringRegions() {
2387     return this.recoveringRegions;
2388   }
2389 
2390   /**
2391    *
2392    * @return the configuration
2393    */
2394   @Override
2395   public Configuration getConfiguration() {
2396     return conf;
2397   }
2398 
2399   /** @return the write lock for the server */
2400   ReentrantReadWriteLock.WriteLock getWriteLock() {
2401     return lock.writeLock();
2402   }
2403 
2404   public int getNumberOfOnlineRegions() {
2405     return this.onlineRegions.size();
2406   }
2407 
2408   boolean isOnlineRegionsEmpty() {
2409     return this.onlineRegions.isEmpty();
2410   }
2411 
2412   /**
2413    * For tests, web ui and metrics.
2414    * This method will only work if HRegionServer is in the same JVM as client;
2415    * HRegion cannot be serialized to cross an rpc.
2416    */
2417   public Collection<Region> getOnlineRegionsLocalContext() {
2418     Collection<Region> regions = this.onlineRegions.values();
2419     return Collections.unmodifiableCollection(regions);
2420   }
2421 
2422   @Override
2423   public void addToOnlineRegions(Region region) {
2424     this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2425     configurationManager.registerObserver(region);
2426   }
2427 
2428   /**
2429    * @return A new Map of online regions sorted by region size with the first entry being the
2430    * biggest.  If two regions are the same size, then the last one found wins; i.e. this method
2431    * may NOT return all regions.
2432    */
2433   SortedMap<Long, Region> getCopyOfOnlineRegionsSortedBySize() {
2434     // we'll sort the regions in reverse
2435     SortedMap<Long, Region> sortedRegions = new TreeMap<Long, Region>(
2436         new Comparator<Long>() {
2437           @Override
2438           public int compare(Long a, Long b) {
2439             return -1 * a.compareTo(b);
2440           }
2441         });
2442     // Copy over all regions. Regions are sorted by size with biggest first.
2443     for (Region region : this.onlineRegions.values()) {
2444       sortedRegions.put(region.getMemstoreSize(), region);
2445     }
2446     return sortedRegions;
2447   }
2448 
2449   /**
2450    * @return time stamp in millis of when this region server was started
2451    */
2452   public long getStartcode() {
2453     return this.startcode;
2454   }
2455 
2456   /** @return reference to FlushRequester */
2457   @Override
2458   public FlushRequester getFlushRequester() {
2459     return this.cacheFlusher;
2460   }
2461 
2462   /**
2463    * Get the top N most loaded regions this server is serving so we can tell the
2464    * master which regions it can reallocate if we're overloaded. TODO: actually
2465    * calculate which regions are most loaded. (Right now, we're just grabbing
2466    * the first N regions being served regardless of load.)
2467    */
2468   protected HRegionInfo[] getMostLoadedRegions() {
2469     ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2470     for (Region r : onlineRegions.values()) {
2471       if (!r.isAvailable()) {
2472         continue;
2473       }
2474       if (regions.size() < numRegionsToReport) {
2475         regions.add(r.getRegionInfo());
2476       } else {
2477         break;
2478       }
2479     }
2480     return regions.toArray(new HRegionInfo[regions.size()]);
2481   }
2482 
2483   @Override
2484   public Leases getLeases() {
2485     return leases;
2486   }
2487 
2488   /**
2489    * @return Return the rootDir.
2490    */
2491   protected Path getRootDir() {
2492     return rootDir;
2493   }
2494 
2495   /**
2496    * @return Return the fs.
2497    */
2498   @Override
2499   public FileSystem getFileSystem() {
2500     return fs;
2501   }
2502 
2503   @Override
2504   public String toString() {
2505     return getServerName().toString();
2506   }
2507 
2508   /**
2509    * Interval at which threads should run
2510    *
2511    * @return the interval
2512    */
2513   public int getThreadWakeFrequency() {
2514     return threadWakeFrequency;
2515   }
2516 
2517   @Override
2518   public ZooKeeperWatcher getZooKeeper() {
2519     return zooKeeper;
2520   }
2521 
2522   @Override
2523   public BaseCoordinatedStateManager getCoordinatedStateManager() {
2524     return csm;
2525   }
2526 
2527   @Override
2528   public ServerName getServerName() {
2529     return serverName;
2530   }
2531 
2532   @Override
2533   public CompactionRequestor getCompactionRequester() {
2534     return this.compactSplitThread;
2535   }
2536 
2537   public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
2538     return this.rsHost;
2539   }
2540 
2541   @Override
2542   public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2543     return this.regionsInTransitionInRS;
2544   }
2545 
2546   @Override
2547   public ExecutorService getExecutorService() {
2548     return service;
2549   }
2550 
2551   @Override
2552   public ChoreService getChoreService() {
2553     return choreService;
2554   }
2555   
2556   @Override
2557   public RegionServerQuotaManager getRegionServerQuotaManager() {
2558     return rsQuotaManager;
2559   }
2560 
2561   //
2562   // Main program and support routines
2563   //
2564 
2565   /**
2566    * Load the replication service objects, if any
2567    */
2568   static private void createNewReplicationInstance(Configuration conf,
2569     HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
2570 
2571     // If replication is not enabled, then return immediately.
2572     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
2573         HConstants.REPLICATION_ENABLE_DEFAULT)) {
2574       return;
2575     }
2576 
2577     // read in the name of the source replication class from the config file.
2578     String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2579                                HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2580 
2581     // read in the name of the sink replication class from the config file.
2582     String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2583                              HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2584 
2585     // If both the sink and the source class names are the same, then instantiate
2586     // only one object.
2587     if (sourceClassname.equals(sinkClassname)) {
2588       server.replicationSourceHandler = (ReplicationSourceService)
2589                                          newReplicationInstance(sourceClassname,
2590                                          conf, server, fs, logDir, oldLogDir);
2591       server.replicationSinkHandler = (ReplicationSinkService)
2592                                          server.replicationSourceHandler;
2593     } else {
2594       server.replicationSourceHandler = (ReplicationSourceService)
2595                                          newReplicationInstance(sourceClassname,
2596                                          conf, server, fs, logDir, oldLogDir);
2597       server.replicationSinkHandler = (ReplicationSinkService)
2598                                          newReplicationInstance(sinkClassname,
2599                                          conf, server, fs, logDir, oldLogDir);
2600     }
2601   }
2602 
2603   static private ReplicationService newReplicationInstance(String classname,
2604     Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
2605     Path oldLogDir) throws IOException{
2606 
2607     Class<?> clazz = null;
2608     try {
2609       ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2610       clazz = Class.forName(classname, true, classLoader);
2611     } catch (java.lang.ClassNotFoundException nfe) {
2612       throw new IOException("Could not find class for " + classname);
2613     }
2614 
2615     // create an instance of the replication object.
2616     ReplicationService service = (ReplicationService)
2617                               ReflectionUtils.newInstance(clazz, conf);
2618     service.initialize(server, fs, logDir, oldLogDir);
2619     return service;
2620   }
2621 
2622   /**
2623    * Utility for constructing an instance of the passed HRegionServer class.
2624    *
2625    * @param regionServerClass
2626    * @param conf2
2627    * @return HRegionServer instance.
2628    */
2629   public static HRegionServer constructRegionServer(
2630       Class<? extends HRegionServer> regionServerClass,
2631       final Configuration conf2, CoordinatedStateManager cp) {
2632     try {
2633       Constructor<? extends HRegionServer> c = regionServerClass
2634           .getConstructor(Configuration.class, CoordinatedStateManager.class);
2635       return c.newInstance(conf2, cp);
2636     } catch (Exception e) {
2637       throw new RuntimeException("Failed construction of " + "Regionserver: "
2638           + regionServerClass.toString(), e);
2639     }
2640   }
2641 
2642   /**
2643    * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2644    */
2645   public static void main(String[] args) throws Exception {
2646     VersionInfo.logVersion();
2647     Configuration conf = HBaseConfiguration.create();
2648     @SuppressWarnings("unchecked")
2649     Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2650         .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2651 
2652     new HRegionServerCommandLine(regionServerClass).doMain(args);
2653   }
2654 
2655   /**
2656    * Gets the online regions of the specified table.
2657    * This method looks at the in-memory onlineRegions.  It does not go to <code>hbase:meta</code>.
2658    * Only returns <em>online</em> regions.  If a region on this table has been
2659    * closed during a disable, etc., it will not be included in the returned list.
2660    * So, the returned list may not necessarily be ALL regions in this table, its
2661    * all the ONLINE regions in the table.
2662    * @param tableName
2663    * @return Online regions from <code>tableName</code>
2664    */
2665   @Override
2666   public List<Region> getOnlineRegions(TableName tableName) {
2667      List<Region> tableRegions = new ArrayList<Region>();
2668      synchronized (this.onlineRegions) {
2669        for (Region region: this.onlineRegions.values()) {
2670          HRegionInfo regionInfo = region.getRegionInfo();
2671          if(regionInfo.getTable().equals(tableName)) {
2672            tableRegions.add(region);
2673          }
2674        }
2675      }
2676      return tableRegions;
2677    }
2678   
2679   /**
2680    * Gets the online tables in this RS.
2681    * This method looks at the in-memory onlineRegions.
2682    * @return all the online tables in this RS
2683    */
2684   @Override
2685   public Set<TableName> getOnlineTables() {
2686     Set<TableName> tables = new HashSet<TableName>();
2687     synchronized (this.onlineRegions) {
2688       for (Region region: this.onlineRegions.values()) {
2689         tables.add(region.getTableDesc().getTableName());
2690       }
2691     }
2692     return tables;
2693   }
2694 
2695   // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
2696   public String[] getRegionServerCoprocessors() {
2697     TreeSet<String> coprocessors = new TreeSet<String>();
2698     try {
2699       coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
2700     } catch (IOException exception) {
2701       LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
2702           "skipping.");
2703       LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2704     }
2705     Collection<Region> regions = getOnlineRegionsLocalContext();
2706     for (Region region: regions) {
2707       coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2708       try {
2709         coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
2710       } catch (IOException exception) {
2711         LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
2712             "; skipping.");
2713         LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2714       }
2715     }
2716     return coprocessors.toArray(new String[coprocessors.size()]);
2717   }
2718 
2719   /**
2720    * Try to close the region, logs a warning on failure but continues.
2721    * @param region Region to close
2722    */
2723   private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
2724     try {
2725       CloseRegionCoordination.CloseRegionDetails details =
2726         csm.getCloseRegionCoordination().getDetaultDetails();
2727       if (!closeRegion(region.getEncodedName(), abort, details, null)) {
2728         LOG.warn("Failed to close " + region.getRegionNameAsString() +
2729             " - ignoring and continuing");
2730       }
2731     } catch (IOException e) {
2732       LOG.warn("Failed to close " + region.getRegionNameAsString() +
2733           " - ignoring and continuing", e);
2734     }
2735   }
2736 
2737   /**
2738    * Close asynchronously a region, can be called from the master or internally by the regionserver
2739    * when stopping. If called from the master, the region will update the znode status.
2740    *
2741    * <p>
2742    * If an opening was in progress, this method will cancel it, but will not start a new close. The
2743    * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
2744    * </p>
2745 
2746    * <p>
2747    *   If a close was in progress, this new request will be ignored, and an exception thrown.
2748    * </p>
2749    *
2750    * @param encodedName Region to close
2751    * @param abort True if we are aborting
2752    * @param crd details about closing region coordination-coordinated task
2753    * @return True if closed a region.
2754    * @throws NotServingRegionException if the region is not online
2755    * @throws RegionAlreadyInTransitionException if the region is already closing
2756    */
2757   protected boolean closeRegion(String encodedName, final boolean abort,
2758       CloseRegionCoordination.CloseRegionDetails crd, final ServerName sn)
2759       throws NotServingRegionException, RegionAlreadyInTransitionException {
2760     //Check for permissions to close.
2761     Region actualRegion = this.getFromOnlineRegions(encodedName);
2762     // Can be null if we're calling close on a region that's not online
2763     if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2764       try {
2765         actualRegion.getCoprocessorHost().preClose(false);
2766       } catch (IOException exp) {
2767         LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2768         return false;
2769       }
2770     }
2771 
2772     final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(encodedName.getBytes(),
2773         Boolean.FALSE);
2774 
2775     if (Boolean.TRUE.equals(previous)) {
2776       LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
2777           "trying to OPEN. Cancelling OPENING.");
2778       if (!regionsInTransitionInRS.replace(encodedName.getBytes(), previous, Boolean.FALSE)){
2779         // The replace failed. That should be an exceptional case, but theoretically it can happen.
2780         // We're going to try to do a standard close then.
2781         LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
2782             " Doing a standard close now");
2783         return closeRegion(encodedName, abort, crd, sn);
2784       }
2785       // Let's get the region from the online region list again
2786       actualRegion = this.getFromOnlineRegions(encodedName);
2787       if (actualRegion == null) { // If already online, we still need to close it.
2788         LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
2789         // The master deletes the znode when it receives this exception.
2790         throw new RegionAlreadyInTransitionException("The region " + encodedName +
2791           " was opening but not yet served. Opening is cancelled.");
2792       }
2793     } else if (Boolean.FALSE.equals(previous)) {
2794       LOG.info("Received CLOSE for the region: " + encodedName +
2795         ", which we are already trying to CLOSE, but not completed yet");
2796       // The master will retry till the region is closed. We need to do this since
2797       // the region could fail to close somehow. If we mark the region closed in master
2798       // while it is not, there could be data loss.
2799       // If the region stuck in closing for a while, and master runs out of retries,
2800       // master will move the region to failed_to_close. Later on, if the region
2801       // is indeed closed, master can properly re-assign it.
2802       throw new RegionAlreadyInTransitionException("The region " + encodedName +
2803         " was already closing. New CLOSE request is ignored.");
2804     }
2805 
2806     if (actualRegion == null) {
2807       LOG.error("Received CLOSE for a region which is not online, and we're not opening.");
2808       this.regionsInTransitionInRS.remove(encodedName.getBytes());
2809       // The master deletes the znode when it receives this exception.
2810       throw new NotServingRegionException("The region " + encodedName +
2811           " is not online, and is not opening.");
2812     }
2813 
2814     CloseRegionHandler crh;
2815     final HRegionInfo hri = actualRegion.getRegionInfo();
2816     if (hri.isMetaRegion()) {
2817       crh = new CloseMetaHandler(this, this, hri, abort,
2818         csm.getCloseRegionCoordination(), crd);
2819     } else {
2820       crh = new CloseRegionHandler(this, this, hri, abort,
2821         csm.getCloseRegionCoordination(), crd, sn);
2822     }
2823     this.service.submit(crh);
2824     return true;
2825   }
2826 
2827    /**
2828    * @param regionName
2829    * @return HRegion for the passed binary <code>regionName</code> or null if
2830    *         named region is not member of the online regions.
2831    */
2832   public Region getOnlineRegion(final byte[] regionName) {
2833     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2834     return this.onlineRegions.get(encodedRegionName);
2835   }
2836 
2837   public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) {
2838     return this.regionFavoredNodesMap.get(encodedRegionName);
2839   }
2840 
2841   @Override
2842   public Region getFromOnlineRegions(final String encodedRegionName) {
2843     return this.onlineRegions.get(encodedRegionName);
2844   }
2845 
2846 
2847   @Override
2848   public boolean removeFromOnlineRegions(final Region r, ServerName destination) {
2849     Region toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
2850 
2851     if (destination != null) {
2852       long closeSeqNum = r.getMaxFlushedSeqId();
2853       if (closeSeqNum == HConstants.NO_SEQNUM) {
2854         // No edits in WAL for this region; get the sequence number when the region was opened.
2855         closeSeqNum = r.getOpenSeqNum();
2856         if (closeSeqNum == HConstants.NO_SEQNUM) {
2857           closeSeqNum = 0;
2858         }
2859       }
2860       addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
2861     }
2862     this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
2863     return toReturn != null;
2864   }
2865 
2866   /**
2867    * Protected utility method for safely obtaining an HRegion handle.
2868    *
2869    * @param regionName
2870    *          Name of online {@link Region} to return
2871    * @return {@link Region} for <code>regionName</code>
2872    * @throws NotServingRegionException
2873    */
2874   protected Region getRegion(final byte[] regionName)
2875       throws NotServingRegionException {
2876     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2877     return getRegionByEncodedName(regionName, encodedRegionName);
2878   }
2879 
2880   public Region getRegionByEncodedName(String encodedRegionName)
2881       throws NotServingRegionException {
2882     return getRegionByEncodedName(null, encodedRegionName);
2883   }
2884 
2885   protected Region getRegionByEncodedName(byte[] regionName, String encodedRegionName)
2886     throws NotServingRegionException {
2887     Region region = this.onlineRegions.get(encodedRegionName);
2888     if (region == null) {
2889       MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
2890       if (moveInfo != null) {
2891         throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
2892       }
2893       Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
2894       String regionNameStr = regionName == null?
2895         encodedRegionName: Bytes.toStringBinary(regionName);
2896       if (isOpening != null && isOpening.booleanValue()) {
2897         throw new RegionOpeningException("Region " + regionNameStr +
2898           " is opening on " + this.serverName);
2899       }
2900       throw new NotServingRegionException("Region " + regionNameStr +
2901         " is not online on " + this.serverName);
2902     }
2903     return region;
2904   }
2905 
2906   /*
2907    * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
2908    * IOE if it isn't already.
2909    *
2910    * @param t Throwable
2911    *
2912    * @param msg Message to log in error. Can be null.
2913    *
2914    * @return Throwable converted to an IOE; methods can only let out IOEs.
2915    */
2916   private Throwable cleanup(final Throwable t, final String msg) {
2917     // Don't log as error if NSRE; NSRE is 'normal' operation.
2918     if (t instanceof NotServingRegionException) {
2919       LOG.debug("NotServingRegionException; " + t.getMessage());
2920       return t;
2921     }
2922     if (msg == null) {
2923       LOG.error("", RemoteExceptionHandler.checkThrowable(t));
2924     } else {
2925       LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
2926     }
2927     if (!rpcServices.checkOOME(t)) {
2928       checkFileSystem();
2929     }
2930     return t;
2931   }
2932 
2933   /*
2934    * @param t
2935    *
2936    * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
2937    *
2938    * @return Make <code>t</code> an IOE if it isn't already.
2939    */
2940   protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
2941     return (t instanceof IOException ? (IOException) t : msg == null
2942         || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
2943   }
2944 
2945   /**
2946    * Checks to see if the file system is still accessible. If not, sets
2947    * abortRequested and stopRequested
2948    *
2949    * @return false if file system is not available
2950    */
2951   public boolean checkFileSystem() {
2952     if (this.fsOk && this.fs != null) {
2953       try {
2954         FSUtils.checkFileSystemAvailable(this.fs);
2955       } catch (IOException e) {
2956         abort("File System not available", e);
2957         this.fsOk = false;
2958       }
2959     }
2960     return this.fsOk;
2961   }
2962 
2963   @Override
2964   public void updateRegionFavoredNodesMapping(String encodedRegionName,
2965       List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
2966     InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
2967     // Refer to the comment on the declaration of regionFavoredNodesMap on why
2968     // it is a map of region name to InetSocketAddress[]
2969     for (int i = 0; i < favoredNodes.size(); i++) {
2970       addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
2971           favoredNodes.get(i).getPort());
2972     }
2973     regionFavoredNodesMap.put(encodedRegionName, addr);
2974   }
2975 
2976   /**
2977    * Return the favored nodes for a region given its encoded name. Look at the
2978    * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[]
2979    * @param encodedRegionName
2980    * @return array of favored locations
2981    */
2982   @Override
2983   public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
2984     return regionFavoredNodesMap.get(encodedRegionName);
2985   }
2986 
2987   @Override
2988   public ServerNonceManager getNonceManager() {
2989     return this.nonceManager;
2990   }
2991 
2992   private static class MovedRegionInfo {
2993     private final ServerName serverName;
2994     private final long seqNum;
2995     private final long ts;
2996 
2997     public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
2998       this.serverName = serverName;
2999       this.seqNum = closeSeqNum;
3000       ts = EnvironmentEdgeManager.currentTime();
3001      }
3002 
3003     public ServerName getServerName() {
3004       return serverName;
3005     }
3006 
3007     public long getSeqNum() {
3008       return seqNum;
3009     }
3010 
3011     public long getMoveTime() {
3012       return ts;
3013     }
3014   }
3015 
3016   // This map will contains all the regions that we closed for a move.
3017   //  We add the time it was moved as we don't want to keep too old information
3018   protected Map<String, MovedRegionInfo> movedRegions =
3019       new ConcurrentHashMap<String, MovedRegionInfo>(3000);
3020 
3021   // We need a timeout. If not there is a risk of giving a wrong information: this would double
3022   //  the number of network calls instead of reducing them.
3023   private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3024 
3025   protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
3026     if (ServerName.isSameHostnameAndPort(destination, this.getServerName())) {
3027       LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3028       return;
3029     }
3030     LOG.info("Adding moved region record: "
3031       + encodedName + " to " + destination + " as of " + closeSeqNum);
3032     movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3033   }
3034 
3035   void removeFromMovedRegions(String encodedName) {
3036     movedRegions.remove(encodedName);
3037   }
3038 
3039   private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
3040     MovedRegionInfo dest = movedRegions.get(encodedRegionName);
3041 
3042     long now = EnvironmentEdgeManager.currentTime();
3043     if (dest != null) {
3044       if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
3045         return dest;
3046       } else {
3047         movedRegions.remove(encodedRegionName);
3048       }
3049     }
3050 
3051     return null;
3052   }
3053 
3054   /**
3055    * Remove the expired entries from the moved regions list.
3056    */
3057   protected void cleanMovedRegions() {
3058     final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
3059     Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();
3060 
3061     while (it.hasNext()){
3062       Map.Entry<String, MovedRegionInfo> e = it.next();
3063       if (e.getValue().getMoveTime() < cutOff) {
3064         it.remove();
3065       }
3066     }
3067   }
3068 
3069   /*
3070    * Use this to allow tests to override and schedule more frequently.
3071    */
3072 
3073   protected int movedRegionCleanerPeriod() {
3074         return TIMEOUT_REGION_MOVED;
3075   }
3076 
3077   /**
3078    * Creates a Chore thread to clean the moved region cache.
3079    */
3080 
3081   protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable {
3082     private HRegionServer regionServer;
3083     Stoppable stoppable;
3084 
3085     private MovedRegionsCleaner(
3086       HRegionServer regionServer, Stoppable stoppable){
3087       super("MovedRegionsCleaner for region " + regionServer, stoppable,
3088           regionServer.movedRegionCleanerPeriod());
3089       this.regionServer = regionServer;
3090       this.stoppable = stoppable;
3091     }
3092 
3093     static MovedRegionsCleaner create(HRegionServer rs){
3094       Stoppable stoppable = new Stoppable() {
3095         private volatile boolean isStopped = false;
3096         @Override public void stop(String why) { isStopped = true;}
3097         @Override public boolean isStopped() {return isStopped;}
3098       };
3099 
3100       return new MovedRegionsCleaner(rs, stoppable);
3101     }
3102 
3103     @Override
3104     protected void chore() {
3105       regionServer.cleanMovedRegions();
3106     }
3107 
3108     @Override
3109     public void stop(String why) {
3110       stoppable.stop(why);
3111     }
3112 
3113     @Override
3114     public boolean isStopped() {
3115       return stoppable.isStopped();
3116     }
3117   }
3118 
3119   private String getMyEphemeralNodePath() {
3120     return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
3121   }
3122 
3123   private boolean isHealthCheckerConfigured() {
3124     String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3125     return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
3126   }
3127 
3128   /**
3129    * @return the underlying {@link CompactSplitThread} for the servers
3130    */
3131   public CompactSplitThread getCompactSplitThread() {
3132     return this.compactSplitThread;
3133   }
3134 
3135   /**
3136    * A helper function to store the last flushed sequence Id with the previous failed RS for a
3137    * recovering region. The Id is used to skip wal edits which are flushed. Since the flushed
3138    * sequence id is only valid for each RS, we associate the Id with corresponding failed RS.
3139    * @throws KeeperException
3140    * @throws IOException
3141    */
3142   private void updateRecoveringRegionLastFlushedSequenceId(Region r) throws KeeperException,
3143       IOException {
3144     if (!r.isRecovering()) {
3145       // return immdiately for non-recovering regions
3146       return;
3147     }
3148 
3149     HRegionInfo regionInfo = r.getRegionInfo();
3150     ZooKeeperWatcher zkw = getZooKeeper();
3151     String previousRSName = this.getLastFailedRSFromZK(regionInfo.getEncodedName());
3152     Map<byte[], Long> maxSeqIdInStores = r.getMaxStoreSeqId();
3153     long minSeqIdForLogReplay = -1;
3154     for (Long storeSeqIdForReplay : maxSeqIdInStores.values()) {
3155       if (minSeqIdForLogReplay == -1 || storeSeqIdForReplay < minSeqIdForLogReplay) {
3156         minSeqIdForLogReplay = storeSeqIdForReplay;
3157       }
3158     }
3159 
3160     try {
3161       long lastRecordedFlushedSequenceId = -1;
3162       String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode,
3163         regionInfo.getEncodedName());
3164       // recovering-region level
3165       byte[] data;
3166       try {
3167         data = ZKUtil.getData(zkw, nodePath);
3168       } catch (InterruptedException e) {
3169         throw new InterruptedIOException();
3170       }
3171       if (data != null) {
3172         lastRecordedFlushedSequenceId = ZKSplitLog.parseLastFlushedSequenceIdFrom(data);
3173       }
3174       if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
3175         ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
3176       }
3177       if (previousRSName != null) {
3178         // one level deeper for the failed RS
3179         nodePath = ZKUtil.joinZNode(nodePath, previousRSName);
3180         ZKUtil.setData(zkw, nodePath,
3181           ZKUtil.regionSequenceIdsToByteArray(minSeqIdForLogReplay, maxSeqIdInStores));
3182         LOG.debug("Update last flushed sequence id of region " + regionInfo.getEncodedName() +
3183           " for " + previousRSName);
3184       } else {
3185         LOG.warn("Can't find failed region server for recovering region " +
3186             regionInfo.getEncodedName());
3187       }
3188     } catch (NoNodeException ignore) {
3189       LOG.debug("Region " + regionInfo.getEncodedName() +
3190         " must have completed recovery because its recovery znode has been removed", ignore);
3191     }
3192   }
3193 
3194   /**
3195    * Return the last failed RS name under /hbase/recovering-regions/encodedRegionName
3196    * @param encodedRegionName
3197    * @throws KeeperException
3198    */
3199   private String getLastFailedRSFromZK(String encodedRegionName) throws KeeperException {
3200     String result = null;
3201     long maxZxid = 0;
3202     ZooKeeperWatcher zkw = this.getZooKeeper();
3203     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
3204     List<String> failedServers = ZKUtil.listChildrenNoWatch(zkw, nodePath);
3205     if (failedServers == null || failedServers.isEmpty()) {
3206       return result;
3207     }
3208     for (String failedServer : failedServers) {
3209       String rsPath = ZKUtil.joinZNode(nodePath, failedServer);
3210       Stat stat = new Stat();
3211       ZKUtil.getDataNoWatch(zkw, rsPath, stat);
3212       if (maxZxid < stat.getCzxid()) {
3213         maxZxid = stat.getCzxid();
3214         result = failedServer;
3215       }
3216     }
3217     return result;
3218   }
3219 
3220   public CoprocessorServiceResponse execRegionServerService(final RpcController controller,
3221       final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3222     try {
3223       ServerRpcController execController = new ServerRpcController();
3224       CoprocessorServiceCall call = serviceRequest.getCall();
3225       String serviceName = call.getServiceName();
3226       String methodName = call.getMethodName();
3227       if (!coprocessorServiceHandlers.containsKey(serviceName)) {
3228         throw new UnknownProtocolException(null,
3229             "No registered coprocessor service found for name " + serviceName);
3230       }
3231       Service service = coprocessorServiceHandlers.get(serviceName);
3232       Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
3233       Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3234       if (methodDesc == null) {
3235         throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName
3236             + " called on service " + serviceName);
3237       }
3238       Message.Builder builderForType = service.getRequestPrototype(methodDesc).newBuilderForType();
3239       ProtobufUtil.mergeFrom(builderForType, call.getRequest());
3240       Message request = builderForType.build();
3241       final Message.Builder responseBuilder =
3242           service.getResponsePrototype(methodDesc).newBuilderForType();
3243       service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
3244         @Override
3245         public void run(Message message) {
3246           if (message != null) {
3247             responseBuilder.mergeFrom(message);
3248           }
3249         }
3250       });
3251       Message execResult = responseBuilder.build();
3252       if (execController.getFailedOn() != null) {
3253         throw execController.getFailedOn();
3254       }
3255       ClientProtos.CoprocessorServiceResponse.Builder builder =
3256           ClientProtos.CoprocessorServiceResponse.newBuilder();
3257       builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
3258         HConstants.EMPTY_BYTE_ARRAY));
3259       builder.setValue(builder.getValueBuilder().setName(execResult.getClass().getName())
3260           .setValue(execResult.toByteString()));
3261       return builder.build();
3262     } catch (IOException ie) {
3263       throw new ServiceException(ie);
3264     }
3265   }
3266 
3267   /**
3268    * @return The cache config instance used by the regionserver.
3269    */
3270   public CacheConfig getCacheConfig() {
3271     return this.cacheConfig;
3272   }
3273 
3274   /**
3275    * @return : Returns the ConfigurationManager object for testing purposes.
3276    */
3277   protected ConfigurationManager getConfigurationManager() {
3278     return configurationManager;
3279   }
3280 
3281   /**
3282    * @return Return table descriptors implementation.
3283    */
3284   public TableDescriptors getTableDescriptors() {
3285     return this.tableDescriptors;
3286   }
3287 
3288   /**
3289    * Reload the configuration from disk.
3290    */
3291   public void updateConfiguration() {
3292     LOG.info("Reloading the configuration from disk.");
3293     // Reload the configuration from disk.
3294     conf.reloadConfiguration();
3295     configurationManager.notifyAllObservers(conf);
3296   }
3297 
3298   @Override
3299   public HeapMemoryManager getHeapMemoryManager() {
3300     return hMemManager;
3301   }
3302 
3303   @Override
3304   public double getCompactionPressure() {
3305     double max = 0;
3306     for (Region region : onlineRegions.values()) {
3307       for (Store store : region.getStores()) {
3308         double normCount = store.getCompactionPressure();
3309         if (normCount > max) {
3310           max = normCount;
3311         }
3312       }
3313     }
3314     return max;
3315   }
3316 }