View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.lang.Thread.UncaughtExceptionHandler;
24  import java.lang.management.ManagementFactory;
25  import java.lang.management.MemoryUsage;
26  import java.lang.reflect.Constructor;
27  import java.net.BindException;
28  import java.net.InetSocketAddress;
29  import java.util.ArrayList;
30  import java.util.Collection;
31  import java.util.Collections;
32  import java.util.Comparator;
33  import java.util.HashMap;
34  import java.util.HashSet;
35  import java.util.Iterator;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.Map.Entry;
39  import java.util.Set;
40  import java.util.SortedMap;
41  import java.util.TreeMap;
42  import java.util.TreeSet;
43  import java.util.concurrent.ConcurrentHashMap;
44  import java.util.concurrent.ConcurrentMap;
45  import java.util.concurrent.ConcurrentSkipListMap;
46  import java.util.concurrent.atomic.AtomicBoolean;
47  import java.util.concurrent.atomic.AtomicReference;
48  import java.util.concurrent.locks.ReentrantReadWriteLock;
49  import java.net.InetAddress;
50  
51  import javax.management.MalformedObjectNameException;
52  import javax.management.ObjectName;
53  import javax.servlet.http.HttpServlet;
54  
55  import org.apache.commons.lang.math.RandomUtils;
56  import org.apache.commons.logging.Log;
57  import org.apache.commons.logging.LogFactory;
58  import org.apache.hadoop.conf.Configuration;
59  import org.apache.hadoop.fs.FileSystem;
60  import org.apache.hadoop.fs.Path;
61  import org.apache.hadoop.hbase.Chore;
62  import org.apache.hadoop.hbase.ClockOutOfSyncException;
63  import org.apache.hadoop.hbase.CoordinatedStateManager;
64  import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
65  import org.apache.hadoop.hbase.HBaseConfiguration;
66  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
67  import org.apache.hadoop.hbase.HConstants;
68  import org.apache.hadoop.hbase.HRegionInfo;
69  import org.apache.hadoop.hbase.HealthCheckChore;
70  import org.apache.hadoop.hbase.MetaTableAccessor;
71  import org.apache.hadoop.hbase.NotServingRegionException;
72  import org.apache.hadoop.hbase.ServerName;
73  import org.apache.hadoop.hbase.Stoppable;
74  import org.apache.hadoop.hbase.TableDescriptors;
75  import org.apache.hadoop.hbase.TableName;
76  import org.apache.hadoop.hbase.YouAreDeadException;
77  import org.apache.hadoop.hbase.ZNodeClearer;
78  import org.apache.hadoop.hbase.classification.InterfaceAudience;
79  import org.apache.hadoop.hbase.client.ConnectionFactory;
80  import org.apache.hadoop.hbase.client.ConnectionUtils;
81  import org.apache.hadoop.hbase.conf.ConfigurationManager;
82  import org.apache.hadoop.hbase.client.ClusterConnection;
83  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
84  import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
85  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
86  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
87  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
88  import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
89  import org.apache.hadoop.hbase.executor.ExecutorService;
90  import org.apache.hadoop.hbase.executor.ExecutorType;
91  import org.apache.hadoop.hbase.fs.HFileSystem;
92  import org.apache.hadoop.hbase.http.InfoServer;
93  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
94  import org.apache.hadoop.hbase.ipc.RpcClient;
95  import org.apache.hadoop.hbase.ipc.RpcClientFactory;
96  import org.apache.hadoop.hbase.ipc.RpcServerInterface;
97  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
98  import org.apache.hadoop.hbase.ipc.ServerRpcController;
99  import org.apache.hadoop.hbase.master.HMaster;
100 import org.apache.hadoop.hbase.master.RegionState.State;
101 import org.apache.hadoop.hbase.master.TableLockManager;
102 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
103 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
104 import org.apache.hadoop.hbase.protobuf.RequestConverter;
105 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
106 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
107 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
108 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
109 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
110 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
111 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
112 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
113 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
114 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
115 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
116 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
117 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
118 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
119 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
120 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
121 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
122 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
123 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
124 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
125 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
126 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
127 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
128 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
129 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
130 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
131 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
132 import org.apache.hadoop.hbase.wal.WAL;
133 import org.apache.hadoop.hbase.wal.WALFactory;
134 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
135 import org.apache.hadoop.hbase.security.UserProvider;
136 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
137 import org.apache.hadoop.hbase.util.Addressing;
138 import org.apache.hadoop.hbase.util.ByteStringer;
139 import org.apache.hadoop.hbase.util.Bytes;
140 import org.apache.hadoop.hbase.util.CompressionTest;
141 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
142 import org.apache.hadoop.hbase.util.FSTableDescriptors;
143 import org.apache.hadoop.hbase.util.FSUtils;
144 import org.apache.hadoop.hbase.util.HasThread;
145 import org.apache.hadoop.hbase.util.JSONBean;
146 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
147 import org.apache.hadoop.hbase.util.Sleeper;
148 import org.apache.hadoop.hbase.util.Threads;
149 import org.apache.hadoop.hbase.util.VersionInfo;
150 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
151 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
152 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
153 import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
154 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
155 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
156 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
157 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
158 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
159 import org.apache.hadoop.ipc.RemoteException;
160 import org.apache.hadoop.metrics.util.MBeanUtil;
161 import org.apache.hadoop.util.ReflectionUtils;
162 import org.apache.hadoop.util.StringUtils;
163 import org.apache.zookeeper.KeeperException;
164 import org.apache.zookeeper.KeeperException.NoNodeException;
165 import org.apache.zookeeper.data.Stat;
166 
167 import com.google.common.annotations.VisibleForTesting;
168 import com.google.common.base.Preconditions;
169 import com.google.common.collect.Maps;
170 import com.google.protobuf.BlockingRpcChannel;
171 import com.google.protobuf.Descriptors;
172 import com.google.protobuf.Message;
173 import com.google.protobuf.RpcCallback;
174 import com.google.protobuf.RpcController;
175 import com.google.protobuf.Service;
176 import com.google.protobuf.ServiceException;
177 
178 /**
179  * HRegionServer makes a set of HRegions available to clients. It checks in with
180  * the HMaster. There are many HRegionServers in a single HBase deployment.
181  */
182 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
183 @SuppressWarnings("deprecation")
184 public class HRegionServer extends HasThread implements
185     RegionServerServices, LastSequenceId {
186 
187   public static final Log LOG = LogFactory.getLog(HRegionServer.class);
188 
189   /**
190    * For testing only!  Set to true to skip notifying region assignment to master .
191    */
192   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
193   public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
194 
195   /*
196    * Strings to be used in forming the exception message for
197    * RegionsAlreadyInTransitionException.
198    */
199   protected static final String OPEN = "OPEN";
200   protected static final String CLOSE = "CLOSE";
201 
202   //RegionName vs current action in progress
203   //true - if open region action in progress
204   //false - if close region action in progress
205   protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
206     new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
207 
208   // Cache flushing
209   protected MemStoreFlusher cacheFlusher;
210 
211   protected HeapMemoryManager hMemManager;
212 
213   /**
214    * Cluster connection to be shared by services.
215    * Initialized at server startup and closed when server shuts down.
216    * Clients must never close it explicitly.
217    */
218   protected ClusterConnection clusterConnection;
219 
220   /*
221    * Long-living meta table locator, which is created when the server is started and stopped
222    * when server shuts down. References to this locator shall be used to perform according
223    * operations in EventHandlers. Primary reason for this decision is to make it mockable
224    * for tests.
225    */
226   protected MetaTableLocator metaTableLocator;
227 
228   // Watch if a region is out of recovering state from ZooKeeper
229   @SuppressWarnings("unused")
230   private RecoveringRegionWatcher recoveringRegionWatcher;
231 
232   /**
233    * Go here to get table descriptors.
234    */
235   protected TableDescriptors tableDescriptors;
236 
237   // Replication services. If no replication, this handler will be null.
238   protected ReplicationSourceService replicationSourceHandler;
239   protected ReplicationSinkService replicationSinkHandler;
240 
241   // Compactions
242   public CompactSplitThread compactSplitThread;
243 
244   /**
245    * Map of regions currently being served by this region server. Key is the
246    * encoded region name.  All access should be synchronized.
247    */
248   protected final Map<String, HRegion> onlineRegions =
249     new ConcurrentHashMap<String, HRegion>();
250 
251   /**
252    * Map of encoded region names to the DataNode locations they should be hosted on
253    * We store the value as InetSocketAddress since this is used only in HDFS
254    * API (create() that takes favored nodes as hints for placing file blocks).
255    * We could have used ServerName here as the value class, but we'd need to
256    * convert it to InetSocketAddress at some point before the HDFS API call, and
257    * it seems a bit weird to store ServerName since ServerName refers to RegionServers
258    * and here we really mean DataNode locations.
259    */
260   protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
261       new ConcurrentHashMap<String, InetSocketAddress[]>();
262 
263   /**
264    * Set of regions currently being in recovering state which means it can accept writes(edits from
265    * previous failed region server) but not reads. A recovering region is also an online region.
266    */
267   protected final Map<String, HRegion> recoveringRegions = Collections
268       .synchronizedMap(new HashMap<String, HRegion>());
269 
270   // Leases
271   protected Leases leases;
272 
273   // Instance of the hbase executor service.
274   protected ExecutorService service;
275 
276   // If false, the file system has become unavailable
277   protected volatile boolean fsOk;
278   protected HFileSystem fs;
279 
280   // Set when a report to the master comes back with a message asking us to
281   // shutdown. Also set by call to stop when debugging or running unit tests
282   // of HRegionServer in isolation.
283   private volatile boolean stopped = false;
284 
285   // Go down hard. Used if file system becomes unavailable and also in
286   // debugging and unit tests.
287   private volatile boolean abortRequested;
288 
289   ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
290 
291   // A state before we go into stopped state.  At this stage we're closing user
292   // space regions.
293   private boolean stopping = false;
294 
295   private volatile boolean killed = false;
296 
297   protected final Configuration conf;
298 
299   private Path rootDir;
300 
301   protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
302 
303   final int numRetries;
304   protected final int threadWakeFrequency;
305   protected final int msgInterval;
306 
307   protected final int numRegionsToReport;
308 
309   // Stub to do region server status calls against the master.
310   private volatile RegionServerStatusService.BlockingInterface rssStub;
311   // RPC client. Used to make the stub above that does region server status checking.
312   RpcClient rpcClient;
313 
314   private UncaughtExceptionHandler uncaughtExceptionHandler;
315 
316   // Info server. Default access so can be used by unit tests. REGIONSERVER
317   // is name of the webapp and the attribute name used stuffing this instance
318   // into web context.
319   protected InfoServer infoServer;
320   private JvmPauseMonitor pauseMonitor;
321 
322   /** region server process name */
323   public static final String REGIONSERVER = "regionserver";
324 
325   MetricsRegionServer metricsRegionServer;
326   private SpanReceiverHost spanReceiverHost;
327 
328   /*
329    * Check for compactions requests.
330    */
331   Chore compactionChecker;
332 
333   /*
334    * Check for flushes
335    */
336   Chore periodicFlusher;
337 
338   protected volatile WALFactory walFactory;
339 
340   // WAL roller. log is protected rather than private to avoid
341   // eclipse warning when accessed by inner classes
342   final LogRoller walRoller;
343   // Lazily initialized if this RegionServer hosts a meta table.
344   final AtomicReference<LogRoller> metawalRoller = new AtomicReference<LogRoller>();
345 
346   // flag set after we're done setting up server threads
347   final AtomicBoolean online = new AtomicBoolean(false);
348 
349   // zookeeper connection and watcher
350   protected ZooKeeperWatcher zooKeeper;
351 
352   // master address tracker
353   private MasterAddressTracker masterAddressTracker;
354 
355   // Cluster Status Tracker
356   protected ClusterStatusTracker clusterStatusTracker;
357 
358   // Log Splitting Worker
359   private SplitLogWorker splitLogWorker;
360 
361   // A sleeper that sleeps for msgInterval.
362   protected final Sleeper sleeper;
363 
364   private final int operationTimeout;
365 
366   private final RegionServerAccounting regionServerAccounting;
367 
368   // Cache configuration and block cache reference
369   protected CacheConfig cacheConfig;
370 
371   /** The health check chore. */
372   private HealthCheckChore healthCheckChore;
373 
374   /** The nonce manager chore. */
375   private Chore nonceManagerChore;
376 
377   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
378 
379   /**
380    * The server name the Master sees us as.  Its made from the hostname the
381    * master passes us, port, and server startcode. Gets set after registration
382    * against  Master.
383    */
384   protected ServerName serverName;
385 
386   /**
387    * This servers startcode.
388    */
389   protected final long startcode;
390 
391   /**
392    * Unique identifier for the cluster we are a part of.
393    */
394   private String clusterId;
395 
396   /**
397    * MX Bean for RegionServerInfo
398    */
399   private ObjectName mxBean = null;
400 
401   /**
402    * Chore to clean periodically the moved region list
403    */
404   private MovedRegionsCleaner movedRegionsCleaner;
405 
406   // chore for refreshing store files for secondary regions
407   private StorefileRefresherChore storefileRefresher;
408 
409   private RegionServerCoprocessorHost rsHost;
410 
411   private RegionServerProcedureManagerHost rspmHost;
412 
413   private RegionServerQuotaManager rsQuotaManager;
414 
415   // Table level lock manager for locking for region operations
416   protected TableLockManager tableLockManager;
417 
418   /**
419    * Nonce manager. Nonces are used to make operations like increment and append idempotent
420    * in the case where client doesn't receive the response from a successful operation and
421    * retries. We track the successful ops for some time via a nonce sent by client and handle
422    * duplicate operations (currently, by failing them; in future we might use MVCC to return
423    * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
424    * HBASE-3787) are:
425    * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
426    *   of past records. If we don't read the records, we don't read and recover the nonces.
427    *   Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
428    * - There's no WAL recovery during normal region move, so nonces will not be transfered.
429    * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
430    * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
431    * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
432    * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
433    * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
434    * latest nonce in it expired. It can also be recovered during move.
435    */
436   final ServerNonceManager nonceManager;
437 
438   private UserProvider userProvider;
439 
440   protected final RSRpcServices rpcServices;
441 
442   protected BaseCoordinatedStateManager csm;
443 
444   /**
445    * Configuration manager is used to register/deregister and notify the configuration observers
446    * when the regionserver is notified that there was a change in the on disk configs.
447    */
448   protected final ConfigurationManager configurationManager;
449 
450   /**
451    * Starts a HRegionServer at the default location.
452    * @param conf
453    * @throws IOException
454    * @throws InterruptedException
455    */
456   public HRegionServer(Configuration conf) throws IOException, InterruptedException {
457     this(conf, CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
458   }
459 
460   /**
461    * Starts a HRegionServer at the default location
462    * @param conf
463    * @param csm implementation of CoordinatedStateManager to be used
464    * @throws IOException
465    */
466   public HRegionServer(Configuration conf, CoordinatedStateManager csm)
467       throws IOException {
468     this.fsOk = true;
469     this.conf = conf;
470     checkCodecs(this.conf);
471     this.userProvider = UserProvider.instantiate(conf);
472     FSUtils.setupShortCircuitRead(this.conf);
473 
474     // Config'ed params
475     this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
476         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
477     this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
478     this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
479 
480     this.sleeper = new Sleeper(this.msgInterval, this);
481 
482     boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
483     this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
484 
485     this.numRegionsToReport = conf.getInt(
486       "hbase.regionserver.numregionstoreport", 10);
487 
488     this.operationTimeout = conf.getInt(
489       HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
490       HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
491 
492     this.abortRequested = false;
493     this.stopped = false;
494 
495     rpcServices = createRpcServices();
496     this.startcode = System.currentTimeMillis();
497     String hostName = rpcServices.isa.getHostName();
498     serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode);
499 
500     // login the zookeeper client principal (if using security)
501     ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
502       "hbase.zookeeper.client.kerberos.principal", hostName);
503     // login the server principal (if using secure Hadoop)
504     login(userProvider, hostName);
505 
506     regionServerAccounting = new RegionServerAccounting();
507     uncaughtExceptionHandler = new UncaughtExceptionHandler() {
508       @Override
509       public void uncaughtException(Thread t, Throwable e) {
510         abort("Uncaught exception in service thread " + t.getName(), e);
511       }
512     };
513 
514     // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
515     // underlying hadoop hdfs accessors will be going against wrong filesystem
516     // (unless all is set to defaults).
517     FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
518     // Get fs instance used by this RS.  Do we use checksum verification in the hbase? If hbase
519     // checksum verification enabled, then automatically switch off hdfs checksum verification.
520     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
521     this.fs = new HFileSystem(this.conf, useHBaseChecksum);
522     this.rootDir = FSUtils.getRootDir(this.conf);
523     this.tableDescriptors = new FSTableDescriptors(this.conf,
524       this.fs, this.rootDir, !canUpdateTableDescriptor(), false);
525 
526     service = new ExecutorService(getServerName().toShortString());
527     spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
528 
529     // Some unit tests don't need a cluster, so no zookeeper at all
530     if (!conf.getBoolean("hbase.testing.nocluster", false)) {
531       // Open connection to zookeeper and set primary watcher
532       zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
533         rpcServices.isa.getPort(), this, canCreateBaseZNode());
534 
535       this.csm = (BaseCoordinatedStateManager) csm;
536       this.csm.initialize(this);
537       this.csm.start();
538 
539       tableLockManager = TableLockManager.createTableLockManager(
540         conf, zooKeeper, serverName);
541 
542       masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
543       masterAddressTracker.start();
544 
545       clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
546       clusterStatusTracker.start();
547     }
548     this.configurationManager = new ConfigurationManager();
549 
550     rpcServices.start();
551     putUpWebUI();
552     this.walRoller = new LogRoller(this, this);
553   }
554 
555   protected void login(UserProvider user, String host) throws IOException {
556     user.login("hbase.regionserver.keytab.file",
557       "hbase.regionserver.kerberos.principal", host);
558   }
559 
560   protected void waitForMasterActive(){
561   }
562 
563   protected String getProcessName() {
564     return REGIONSERVER;
565   }
566 
567   protected boolean canCreateBaseZNode() {
568     return false;
569   }
570 
571   protected boolean canUpdateTableDescriptor() {
572     return false;
573   }
574 
575   protected RSRpcServices createRpcServices() throws IOException {
576     return new RSRpcServices(this);
577   }
578 
579   protected void configureInfoServer() {
580     infoServer.addServlet("rs-status", "/rs-status", RSStatusServlet.class);
581     infoServer.setAttribute(REGIONSERVER, this);
582   }
583 
584   protected Class<? extends HttpServlet> getDumpServlet() {
585     return RSDumpServlet.class;
586   }
587 
588   protected void doMetrics() {
589   }
590 
591   @Override
592   public boolean registerService(Service instance) {
593     /*
594      * No stacking of instances is allowed for a single service name
595      */
596     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
597     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
598       LOG.error("Coprocessor service " + serviceDesc.getFullName()
599           + " already registered, rejecting request from " + instance);
600       return false;
601     }
602 
603     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
604     if (LOG.isDebugEnabled()) {
605       LOG.debug("Registered regionserver coprocessor service: service=" + serviceDesc.getFullName());
606     }
607     return true;
608   }
609 
610   /**
611    * Create a 'smarter' HConnection, one that is capable of by-passing RPC if the request is to
612    * the local server.  Safe to use going to local or remote server.
613    * Create this instance in a method can be intercepted and mocked in tests.
614    * @throws IOException
615    */
616   @VisibleForTesting
617   protected ClusterConnection createClusterConnection() throws IOException {
618     // Create a cluster connection that when appropriate, can short-circuit and go directly to the
619     // local server if the request is to the local server bypassing RPC. Can be used for both local
620     // and remote invocations.
621     return ConnectionUtils.createShortCircuitHConnection(
622       ConnectionFactory.createConnection(conf), serverName, rpcServices, rpcServices);
623   }
624 
625   /**
626    * Run test on configured codecs to make sure supporting libs are in place.
627    * @param c
628    * @throws IOException
629    */
630   private static void checkCodecs(final Configuration c) throws IOException {
631     // check to see if the codec list is available:
632     String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
633     if (codecs == null) return;
634     for (String codec : codecs) {
635       if (!CompressionTest.testCompression(codec)) {
636         throw new IOException("Compression codec " + codec +
637           " not supported, aborting RS construction");
638       }
639     }
640   }
641 
642   public String getClusterId() {
643     return this.clusterId;
644   }
645 
646   /**
647    * Setup our cluster connection if not already initialized.
648    * @throws IOException
649    */
650   protected synchronized void setupClusterConnection() throws IOException {
651     if (clusterConnection == null) {
652       clusterConnection = createClusterConnection();
653       metaTableLocator = new MetaTableLocator();
654     }
655   }
656 
657   /**
658    * All initialization needed before we go register with Master.
659    *
660    * @throws IOException
661    * @throws InterruptedException
662    */
663   private void preRegistrationInitialization(){
664     try {
665       setupClusterConnection();
666 
667       // Health checker thread.
668       if (isHealthCheckerConfigured()) {
669         int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
670           HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
671         healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
672       }
673       this.pauseMonitor = new JvmPauseMonitor(conf);
674       pauseMonitor.start();
675 
676       initializeZooKeeper();
677       if (!isStopped() && !isAborted()) {
678         initializeThreads();
679       }
680     } catch (Throwable t) {
681       // Call stop if error or process will stick around for ever since server
682       // puts up non-daemon threads.
683       this.rpcServices.stop();
684       abort("Initialization of RS failed.  Hence aborting RS.", t);
685     }
686   }
687 
688   /**
689    * Bring up connection to zk ensemble and then wait until a master for this
690    * cluster and then after that, wait until cluster 'up' flag has been set.
691    * This is the order in which master does things.
692    * Finally open long-living server short-circuit connection.
693    * @throws IOException
694    * @throws InterruptedException
695    */
696   private void initializeZooKeeper() throws IOException, InterruptedException {
697     // Create the master address tracker, register with zk, and start it.  Then
698     // block until a master is available.  No point in starting up if no master
699     // running.
700     blockAndCheckIfStopped(this.masterAddressTracker);
701 
702     // Wait on cluster being up.  Master will set this flag up in zookeeper
703     // when ready.
704     blockAndCheckIfStopped(this.clusterStatusTracker);
705 
706     // Retrieve clusterId
707     // Since cluster status is now up
708     // ID should have already been set by HMaster
709     try {
710       clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
711       if (clusterId == null) {
712         this.abort("Cluster ID has not been set");
713       }
714       LOG.info("ClusterId : "+clusterId);
715     } catch (KeeperException e) {
716       this.abort("Failed to retrieve Cluster ID",e);
717     }
718 
719     // In case colocated master, wait here till it's active.
720     // So backup masters won't start as regionservers.
721     // This is to avoid showing backup masters as regionservers
722     // in master web UI, or assigning any region to them.
723     waitForMasterActive();
724     if (isStopped() || isAborted()) {
725       return; // No need for further initialization
726     }
727 
728     // watch for snapshots and other procedures
729     try {
730       rspmHost = new RegionServerProcedureManagerHost();
731       rspmHost.loadProcedures(conf);
732       rspmHost.initialize(this);
733     } catch (KeeperException e) {
734       this.abort("Failed to reach zk cluster when creating procedure handler.", e);
735     }
736     // register watcher for recovering regions
737     this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
738   }
739 
740   /**
741    * Utilty method to wait indefinitely on a znode availability while checking
742    * if the region server is shut down
743    * @param tracker znode tracker to use
744    * @throws IOException any IO exception, plus if the RS is stopped
745    * @throws InterruptedException
746    */
747   private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker)
748       throws IOException, InterruptedException {
749     while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
750       if (this.stopped) {
751         throw new IOException("Received the shutdown message while waiting.");
752       }
753     }
754   }
755 
756   /**
757    * @return False if cluster shutdown in progress
758    */
759   private boolean isClusterUp() {
760     return clusterStatusTracker != null && clusterStatusTracker.isClusterUp();
761   }
762 
763   private void initializeThreads() throws IOException {
764     // Cache flushing thread.
765     this.cacheFlusher = new MemStoreFlusher(conf, this);
766 
767     // Compaction thread
768     this.compactSplitThread = new CompactSplitThread(this);
769 
770     // Background thread to check for compactions; needed if region has not gotten updates
771     // in a while. It will take care of not checking too frequently on store-by-store basis.
772     this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
773     this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
774     this.leases = new Leases(this.threadWakeFrequency);
775 
776     // Create the thread to clean the moved regions list
777     movedRegionsCleaner = MovedRegionsCleaner.createAndStart(this);
778 
779     if (this.nonceManager != null) {
780       // Create the chore that cleans up nonces.
781       nonceManagerChore = this.nonceManager.createCleanupChore(this);
782     }
783 
784     // Setup the Quota Manager
785     rsQuotaManager = new RegionServerQuotaManager(this);
786 
787     // Setup RPC client for master communication
788     rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
789         rpcServices.isa.getAddress(), 0));
790 
791     int storefileRefreshPeriod = conf.getInt(
792         StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
793       , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
794     if (storefileRefreshPeriod > 0) {
795       this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, this, this);
796     }
797     registerConfigurationObservers();
798   }
799 
800   private void registerConfigurationObservers() {
801     // Registering the compactSplitThread object with the ConfigurationManager.
802     configurationManager.registerObserver(this.compactSplitThread);
803   }
804 
805   /**
806    * The HRegionServer sticks in this loop until closed.
807    */
808   @Override
809   public void run() {
810     try {
811       // Do pre-registration initializations; zookeeper, lease threads, etc.
812       preRegistrationInitialization();
813     } catch (Throwable e) {
814       abort("Fatal exception during initialization", e);
815     }
816 
817     try {
818       if (!isStopped() && !isAborted()) {
819         ShutdownHook.install(conf, fs, this, Thread.currentThread());
820         // Set our ephemeral znode up in zookeeper now we have a name.
821         createMyEphemeralNode();
822         // Initialize the RegionServerCoprocessorHost now that our ephemeral
823         // node was created, in case any coprocessors want to use ZooKeeper
824         this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
825       }
826 
827       // Try and register with the Master; tell it we are here.  Break if
828       // server is stopped or the clusterup flag is down or hdfs went wacky.
829       while (keepLooping()) {
830         RegionServerStartupResponse w = reportForDuty();
831         if (w == null) {
832           LOG.warn("reportForDuty failed; sleeping and then retrying.");
833           this.sleeper.sleep();
834         } else {
835           handleReportForDutyResponse(w);
836           break;
837         }
838       }
839 
840       if (!isStopped() && isHealthy()){
841         // start the snapshot handler and other procedure handlers,
842         // since the server is ready to run
843         rspmHost.start();
844 
845         // Start the Quota Manager
846         rsQuotaManager.start(getRpcServer().getScheduler());
847       }
848 
849       // We registered with the Master.  Go into run mode.
850       long lastMsg = System.currentTimeMillis();
851       long oldRequestCount = -1;
852       // The main run loop.
853       while (!isStopped() && isHealthy()) {
854         if (!isClusterUp()) {
855           if (isOnlineRegionsEmpty()) {
856             stop("Exiting; cluster shutdown set and not carrying any regions");
857           } else if (!this.stopping) {
858             this.stopping = true;
859             LOG.info("Closing user regions");
860             closeUserRegions(this.abortRequested);
861           } else if (this.stopping) {
862             boolean allUserRegionsOffline = areAllUserRegionsOffline();
863             if (allUserRegionsOffline) {
864               // Set stopped if no more write requests tp meta tables
865               // since last time we went around the loop.  Any open
866               // meta regions will be closed on our way out.
867               if (oldRequestCount == getWriteRequestCount()) {
868                 stop("Stopped; only catalog regions remaining online");
869                 break;
870               }
871               oldRequestCount = getWriteRequestCount();
872             } else {
873               // Make sure all regions have been closed -- some regions may
874               // have not got it because we were splitting at the time of
875               // the call to closeUserRegions.
876               closeUserRegions(this.abortRequested);
877             }
878             LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
879           }
880         }
881         long now = System.currentTimeMillis();
882         if ((now - lastMsg) >= msgInterval) {
883           tryRegionServerReport(lastMsg, now);
884           lastMsg = System.currentTimeMillis();
885           doMetrics();
886         }
887         if (!isStopped() && !isAborted()) {
888           this.sleeper.sleep();
889         }
890       } // for
891     } catch (Throwable t) {
892       if (!rpcServices.checkOOME(t)) {
893         String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
894         abort(prefix + t.getMessage(), t);
895       }
896     }
897     // Run shutdown.
898     if (mxBean != null) {
899       MBeanUtil.unregisterMBean(mxBean);
900       mxBean = null;
901     }
902     if (this.leases != null) this.leases.closeAfterLeasesExpire();
903     if (this.splitLogWorker != null) {
904       splitLogWorker.stop();
905     }
906     if (this.infoServer != null) {
907       LOG.info("Stopping infoServer");
908       try {
909         this.infoServer.stop();
910       } catch (Exception e) {
911         LOG.error("Failed to stop infoServer", e);
912       }
913     }
914     // Send cache a shutdown.
915     if (cacheConfig != null && cacheConfig.isBlockCacheEnabled()) {
916       cacheConfig.getBlockCache().shutdown();
917     }
918 
919     if (movedRegionsCleaner != null) {
920       movedRegionsCleaner.stop("Region Server stopping");
921     }
922 
923     // Send interrupts to wake up threads if sleeping so they notice shutdown.
924     // TODO: Should we check they are alive? If OOME could have exited already
925     if(this.hMemManager != null) this.hMemManager.stop();
926     if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
927     if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
928     if (this.compactionChecker != null)
929       this.compactionChecker.interrupt();
930     if (this.healthCheckChore != null) {
931       this.healthCheckChore.interrupt();
932     }
933     if (this.nonceManagerChore != null) {
934       this.nonceManagerChore.interrupt();
935     }
936     if (this.storefileRefresher != null) {
937       this.storefileRefresher.interrupt();
938     }
939 
940     // Stop the quota manager
941     if (rsQuotaManager != null) {
942       rsQuotaManager.stop();
943     }
944 
945     // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
946     if (rspmHost != null) {
947       rspmHost.stop(this.abortRequested || this.killed);
948     }
949 
950     if (this.killed) {
951       // Just skip out w/o closing regions.  Used when testing.
952     } else if (abortRequested) {
953       if (this.fsOk) {
954         closeUserRegions(abortRequested); // Don't leave any open file handles
955       }
956       LOG.info("aborting server " + this.serverName);
957     } else {
958       closeUserRegions(abortRequested);
959       LOG.info("stopping server " + this.serverName);
960     }
961 
962     // so callers waiting for meta without timeout can stop
963     if (this.metaTableLocator != null) this.metaTableLocator.stop();
964     if (this.clusterConnection != null && !clusterConnection.isClosed()) {
965       try {
966         this.clusterConnection.close();
967       } catch (IOException e) {
968         // Although the {@link Closeable} interface throws an {@link
969         // IOException}, in reality, the implementation would never do that.
970         LOG.warn("Attempt to close server's short circuit HConnection failed.", e);
971       }
972     }
973 
974     // Closing the compactSplit thread before closing meta regions
975     if (!this.killed && containsMetaTableRegions()) {
976       if (!abortRequested || this.fsOk) {
977         if (this.compactSplitThread != null) {
978           this.compactSplitThread.join();
979           this.compactSplitThread = null;
980         }
981         closeMetaTableRegions(abortRequested);
982       }
983     }
984 
985     if (!this.killed && this.fsOk) {
986       waitOnAllRegionsToClose(abortRequested);
987       LOG.info("stopping server " + this.serverName +
988         "; all regions closed.");
989     }
990 
991     //fsOk flag may be changed when closing regions throws exception.
992     if (this.fsOk) {
993       shutdownWAL(!abortRequested);
994     }
995 
996     // Make sure the proxy is down.
997     if (this.rssStub != null) {
998       this.rssStub = null;
999     }
1000     if (this.rpcClient != null) {
1001       this.rpcClient.close();
1002     }
1003     if (this.leases != null) {
1004       this.leases.close();
1005     }
1006     if (this.pauseMonitor != null) {
1007       this.pauseMonitor.stop();
1008     }
1009 
1010     if (!killed) {
1011       stopServiceThreads();
1012     }
1013 
1014     if (this.rpcServices != null) {
1015       this.rpcServices.stop();
1016     }
1017 
1018     try {
1019       deleteMyEphemeralNode();
1020     } catch (KeeperException.NoNodeException nn) {
1021     } catch (KeeperException e) {
1022       LOG.warn("Failed deleting my ephemeral node", e);
1023     }
1024     // We may have failed to delete the znode at the previous step, but
1025     //  we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1026     ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1027 
1028     if (this.zooKeeper != null) {
1029       this.zooKeeper.close();
1030     }
1031     LOG.info("stopping server " + this.serverName +
1032       "; zookeeper connection closed.");
1033 
1034     LOG.info(Thread.currentThread().getName() + " exiting");
1035   }
1036 
1037   private boolean containsMetaTableRegions() {
1038     return onlineRegions.containsKey(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1039   }
1040 
1041   private boolean areAllUserRegionsOffline() {
1042     if (getNumberOfOnlineRegions() > 2) return false;
1043     boolean allUserRegionsOffline = true;
1044     for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1045       if (!e.getValue().getRegionInfo().isMetaTable()) {
1046         allUserRegionsOffline = false;
1047         break;
1048       }
1049     }
1050     return allUserRegionsOffline;
1051   }
1052 
1053   /**
1054    * @return Current write count for all online regions.
1055    */
1056   private long getWriteRequestCount() {
1057     int writeCount = 0;
1058     for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1059       writeCount += e.getValue().getWriteRequestsCount();
1060     }
1061     return writeCount;
1062   }
1063 
1064   @VisibleForTesting
1065   protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1066   throws IOException {
1067     RegionServerStatusService.BlockingInterface rss = rssStub;
1068     if (rss == null) {
1069       // the current server could be stopping.
1070       return;
1071     }
1072     ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1073     try {
1074       RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1075       ServerName sn = ServerName.parseVersionedServerName(
1076         this.serverName.getVersionedBytes());
1077       request.setServer(ProtobufUtil.toServerName(sn));
1078       request.setLoad(sl);
1079       rss.regionServerReport(null, request.build());
1080     } catch (ServiceException se) {
1081       IOException ioe = ProtobufUtil.getRemoteException(se);
1082       if (ioe instanceof YouAreDeadException) {
1083         // This will be caught and handled as a fatal error in run()
1084         throw ioe;
1085       }
1086       if (rssStub == rss) {
1087         rssStub = null;
1088       }
1089       // Couldn't connect to the master, get location from zk and reconnect
1090       // Method blocks until new master is found or we are stopped
1091       createRegionServerStatusStub();
1092     }
1093   }
1094 
1095   ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1096       throws IOException {
1097     // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1098     // per second, and other metrics  As long as metrics are part of ServerLoad it's best to use
1099     // the wrapper to compute those numbers in one place.
1100     // In the long term most of these should be moved off of ServerLoad and the heart beat.
1101     // Instead they should be stored in an HBase table so that external visibility into HBase is
1102     // improved; Additionally the load balancer will be able to take advantage of a more complete
1103     // history.
1104     MetricsRegionServerWrapper regionServerWrapper = this.metricsRegionServer.getRegionServerWrapper();
1105     Collection<HRegion> regions = getOnlineRegionsLocalContext();
1106     MemoryUsage memory =
1107       ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
1108 
1109     ClusterStatusProtos.ServerLoad.Builder serverLoad =
1110       ClusterStatusProtos.ServerLoad.newBuilder();
1111     serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1112     serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
1113     serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024));
1114     serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024));
1115     Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1116     for (String coprocessor : coprocessors) {
1117       serverLoad.addCoprocessors(
1118         Coprocessor.newBuilder().setName(coprocessor).build());
1119     }
1120     RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1121     RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1122     for (HRegion region : regions) {
1123       serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1124       for (String coprocessor :
1125           getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()) {
1126         serverLoad.addCoprocessors(Coprocessor.newBuilder().setName(coprocessor).build());
1127       }
1128     }
1129     serverLoad.setReportStartTime(reportStartTime);
1130     serverLoad.setReportEndTime(reportEndTime);
1131     if (this.infoServer != null) {
1132       serverLoad.setInfoServerPort(this.infoServer.getPort());
1133     } else {
1134       serverLoad.setInfoServerPort(-1);
1135     }
1136     return serverLoad.build();
1137   }
1138 
1139   String getOnlineRegionsAsPrintableString() {
1140     StringBuilder sb = new StringBuilder();
1141     for (HRegion r: this.onlineRegions.values()) {
1142       if (sb.length() > 0) sb.append(", ");
1143       sb.append(r.getRegionInfo().getEncodedName());
1144     }
1145     return sb.toString();
1146   }
1147 
1148   /**
1149    * Wait on regions close.
1150    */
1151   private void waitOnAllRegionsToClose(final boolean abort) {
1152     // Wait till all regions are closed before going out.
1153     int lastCount = -1;
1154     long previousLogTime = 0;
1155     Set<String> closedRegions = new HashSet<String>();
1156     boolean interrupted = false;
1157     try {
1158       while (!isOnlineRegionsEmpty()) {
1159         int count = getNumberOfOnlineRegions();
1160         // Only print a message if the count of regions has changed.
1161         if (count != lastCount) {
1162           // Log every second at most
1163           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1164             previousLogTime = System.currentTimeMillis();
1165             lastCount = count;
1166             LOG.info("Waiting on " + count + " regions to close");
1167             // Only print out regions still closing if a small number else will
1168             // swamp the log.
1169             if (count < 10 && LOG.isDebugEnabled()) {
1170               LOG.debug(this.onlineRegions);
1171             }
1172           }
1173         }
1174         // Ensure all user regions have been sent a close. Use this to
1175         // protect against the case where an open comes in after we start the
1176         // iterator of onlineRegions to close all user regions.
1177         for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1178           HRegionInfo hri = e.getValue().getRegionInfo();
1179           if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1180               && !closedRegions.contains(hri.getEncodedName())) {
1181             closedRegions.add(hri.getEncodedName());
1182             // Don't update zk with this close transition; pass false.
1183             closeRegionIgnoreErrors(hri, abort);
1184               }
1185         }
1186         // No regions in RIT, we could stop waiting now.
1187         if (this.regionsInTransitionInRS.isEmpty()) {
1188           if (!isOnlineRegionsEmpty()) {
1189             LOG.info("We were exiting though online regions are not empty," +
1190                 " because some regions failed closing");
1191           }
1192           break;
1193         }
1194         if (sleep(200)) {
1195           interrupted = true;
1196         }
1197       }
1198     } finally {
1199       if (interrupted) {
1200         Thread.currentThread().interrupt();
1201       }
1202     }
1203   }
1204 
1205   private boolean sleep(long millis) {
1206     boolean interrupted = false;
1207     try {
1208       Thread.sleep(millis);
1209     } catch (InterruptedException e) {
1210       LOG.warn("Interrupted while sleeping");
1211       interrupted = true;
1212     }
1213     return interrupted;
1214   }
1215 
1216   private void shutdownWAL(final boolean close) {
1217     if (this.walFactory != null) {
1218       try {
1219         if (close) {
1220           walFactory.close();
1221         } else {
1222           walFactory.shutdown();
1223         }
1224       } catch (Throwable e) {
1225         e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1226         LOG.error("Shutdown / close of WAL failed: " + e);
1227         LOG.debug("Shutdown / close exception details:", e);
1228       }
1229     }
1230   }
1231 
1232   /*
1233    * Run init. Sets up wal and starts up all server threads.
1234    *
1235    * @param c Extra configuration.
1236    */
1237   protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1238   throws IOException {
1239     try {
1240       for (NameStringPair e : c.getMapEntriesList()) {
1241         String key = e.getName();
1242         // The hostname the master sees us as.
1243         if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1244           String hostnameFromMasterPOV = e.getValue();
1245           this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1246             rpcServices.isa.getPort(), this.startcode);
1247           if (!hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) {
1248             LOG.info("Master passed us a different hostname to use; was=" +
1249               rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV);
1250           }
1251           continue;
1252         }
1253         String value = e.getValue();
1254         if (LOG.isDebugEnabled()) {
1255           LOG.info("Config from master: " + key + "=" + value);
1256         }
1257         this.conf.set(key, value);
1258       }
1259 
1260       // hack! Maps DFSClient => RegionServer for logs.  HDFS made this
1261       // config param for task trackers, but we can piggyback off of it.
1262       if (this.conf.get("mapreduce.task.attempt.id") == null) {
1263         this.conf.set("mapreduce.task.attempt.id", "hb_rs_" +
1264           this.serverName.toString());
1265       }
1266 
1267       // Save it in a file, this will allow to see if we crash
1268       ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1269 
1270       this.cacheConfig = new CacheConfig(conf);
1271       this.walFactory = setupWALAndReplication();
1272       // Init in here rather than in constructor after thread name has been set
1273       this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
1274 
1275       startServiceThreads();
1276       startHeapMemoryManager();
1277       LOG.info("Serving as " + this.serverName +
1278         ", RpcServer on " + rpcServices.isa +
1279         ", sessionid=0x" +
1280         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1281 
1282       // Wake up anyone waiting for this server to online
1283       synchronized (online) {
1284         online.set(true);
1285         online.notifyAll();
1286       }
1287     } catch (Throwable e) {
1288       stop("Failed initialization");
1289       throw convertThrowableToIOE(cleanup(e, "Failed init"),
1290           "Region server startup failed");
1291     } finally {
1292       sleeper.skipSleepCycle();
1293     }
1294   }
1295 
1296   private void startHeapMemoryManager() {
1297     this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this);
1298     if (this.hMemManager != null) {
1299       this.hMemManager.start();
1300     }
1301   }
1302 
1303   private void createMyEphemeralNode() throws KeeperException, IOException {
1304     RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1305     rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1306     byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1307     ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper,
1308       getMyEphemeralNodePath(), data);
1309   }
1310 
1311   private void deleteMyEphemeralNode() throws KeeperException {
1312     ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1313   }
1314 
1315   @Override
1316   public RegionServerAccounting getRegionServerAccounting() {
1317     return regionServerAccounting;
1318   }
1319 
1320   @Override
1321   public TableLockManager getTableLockManager() {
1322     return tableLockManager;
1323   }
1324 
1325   /*
1326    * @param r Region to get RegionLoad for.
1327    * @param regionLoadBldr the RegionLoad.Builder, can be null
1328    * @param regionSpecifier the RegionSpecifier.Builder, can be null
1329    * @return RegionLoad instance.
1330    *
1331    * @throws IOException
1332    */
1333   private RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1334       RegionSpecifier.Builder regionSpecifier) {
1335     byte[] name = r.getRegionName();
1336     int stores = 0;
1337     int storefiles = 0;
1338     int storeUncompressedSizeMB = 0;
1339     int storefileSizeMB = 0;
1340     int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
1341     int storefileIndexSizeMB = 0;
1342     int rootIndexSizeKB = 0;
1343     int totalStaticIndexSizeKB = 0;
1344     int totalStaticBloomSizeKB = 0;
1345     long totalCompactingKVs = 0;
1346     long currentCompactedKVs = 0;
1347     synchronized (r.stores) {
1348       stores += r.stores.size();
1349       for (Store store : r.stores.values()) {
1350         storefiles += store.getStorefilesCount();
1351         storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed()
1352             / 1024 / 1024);
1353         storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1354         storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
1355         CompactionProgress progress = store.getCompactionProgress();
1356         if (progress != null) {
1357           totalCompactingKVs += progress.totalCompactingKVs;
1358           currentCompactedKVs += progress.currentCompactedKVs;
1359         }
1360 
1361         rootIndexSizeKB +=
1362             (int) (store.getStorefilesIndexSize() / 1024);
1363 
1364         totalStaticIndexSizeKB +=
1365           (int) (store.getTotalStaticIndexSize() / 1024);
1366 
1367         totalStaticBloomSizeKB +=
1368           (int) (store.getTotalStaticBloomSize() / 1024);
1369       }
1370     }
1371     float dataLocality =
1372         r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname());
1373     if (regionLoadBldr == null) {
1374       regionLoadBldr = RegionLoad.newBuilder();
1375     }
1376     if (regionSpecifier == null) {
1377       regionSpecifier = RegionSpecifier.newBuilder();
1378     }
1379     regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1380     regionSpecifier.setValue(ByteStringer.wrap(name));
1381     regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1382       .setStores(stores)
1383       .setStorefiles(storefiles)
1384       .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1385       .setStorefileSizeMB(storefileSizeMB)
1386       .setMemstoreSizeMB(memstoreSizeMB)
1387       .setStorefileIndexSizeMB(storefileIndexSizeMB)
1388       .setRootIndexSizeKB(rootIndexSizeKB)
1389       .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1390       .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1391       .setReadRequestsCount(r.readRequestsCount.get())
1392       .setWriteRequestsCount(r.writeRequestsCount.get())
1393       .setTotalCompactingKVs(totalCompactingKVs)
1394       .setCurrentCompactedKVs(currentCompactedKVs)
1395       .setCompleteSequenceId(r.maxFlushedSeqId)
1396       .setDataLocality(dataLocality);
1397 
1398     return regionLoadBldr.build();
1399   }
1400 
1401   /**
1402    * @param encodedRegionName
1403    * @return An instance of RegionLoad.
1404    */
1405   public RegionLoad createRegionLoad(final String encodedRegionName) {
1406     HRegion r = null;
1407     r = this.onlineRegions.get(encodedRegionName);
1408     return r != null ? createRegionLoad(r, null, null) : null;
1409   }
1410 
1411   /*
1412    * Inner class that runs on a long period checking if regions need compaction.
1413    */
1414   private static class CompactionChecker extends Chore {
1415     private final HRegionServer instance;
1416     private final int majorCompactPriority;
1417     private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1418     private long iteration = 0;
1419 
1420     CompactionChecker(final HRegionServer h, final int sleepTime,
1421         final Stoppable stopper) {
1422       super("CompactionChecker", sleepTime, h);
1423       this.instance = h;
1424       LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));
1425 
1426       /* MajorCompactPriority is configurable.
1427        * If not set, the compaction will use default priority.
1428        */
1429       this.majorCompactPriority = this.instance.conf.
1430         getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1431         DEFAULT_PRIORITY);
1432     }
1433 
1434     @Override
1435     protected void chore() {
1436       for (HRegion r : this.instance.onlineRegions.values()) {
1437         if (r == null)
1438           continue;
1439         for (Store s : r.getStores().values()) {
1440           try {
1441             long multiplier = s.getCompactionCheckMultiplier();
1442             assert multiplier > 0;
1443             if (iteration % multiplier != 0) continue;
1444             if (s.needsCompaction()) {
1445               // Queue a compaction. Will recognize if major is needed.
1446               this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
1447                   + " requests compaction");
1448             } else if (s.isMajorCompaction()) {
1449               if (majorCompactPriority == DEFAULT_PRIORITY
1450                   || majorCompactPriority > r.getCompactPriority()) {
1451                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1452                     + " requests major compaction; use default priority", null);
1453               } else {
1454                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1455                     + " requests major compaction; use configured priority",
1456                   this.majorCompactPriority, null);
1457               }
1458             }
1459           } catch (IOException e) {
1460             LOG.warn("Failed major compaction check on " + r, e);
1461           }
1462         }
1463       }
1464       iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1465     }
1466   }
1467 
1468   class PeriodicMemstoreFlusher extends Chore {
1469     final HRegionServer server;
1470     final static int RANGE_OF_DELAY = 20000; //millisec
1471     final static int MIN_DELAY_TIME = 3000; //millisec
1472     public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1473       super(server.getServerName() + "-MemstoreFlusherChore", cacheFlushInterval, server);
1474       this.server = server;
1475     }
1476 
1477     @Override
1478     protected void chore() {
1479       for (HRegion r : this.server.onlineRegions.values()) {
1480         if (r == null)
1481           continue;
1482         if (r.shouldFlush()) {
1483           FlushRequester requester = server.getFlushRequester();
1484           if (requester != null) {
1485             long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
1486             LOG.info(getName() + " requesting flush for region " + r.getRegionNameAsString() +
1487                 " after a delay of " + randomDelay);
1488             //Throttle the flushes by putting a delay. If we don't throttle, and there
1489             //is a balanced write-load on the regions in a table, we might end up
1490             //overwhelming the filesystem with too many flushes at once.
1491             requester.requestDelayedFlush(r, randomDelay, false);
1492           }
1493         }
1494       }
1495     }
1496   }
1497 
1498   /**
1499    * Report the status of the server. A server is online once all the startup is
1500    * completed (setting up filesystem, starting service threads, etc.). This
1501    * method is designed mostly to be useful in tests.
1502    *
1503    * @return true if online, false if not.
1504    */
1505   public boolean isOnline() {
1506     return online.get();
1507   }
1508 
1509   /**
1510    * Setup WAL log and replication if enabled.
1511    * Replication setup is done in here because it wants to be hooked up to WAL.
1512    * @return A WAL instance.
1513    * @throws IOException
1514    */
1515   private WALFactory setupWALAndReplication() throws IOException {
1516     // TODO Replication make assumptions here based on the default filesystem impl
1517     final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1518     final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString());
1519 
1520     Path logdir = new Path(rootDir, logName);
1521     if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1522     if (this.fs.exists(logdir)) {
1523       throw new RegionServerRunningException("Region server has already " +
1524         "created directory at " + this.serverName.toString());
1525     }
1526 
1527     // Instantiate replication manager if replication enabled.  Pass it the
1528     // log directories.
1529     createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
1530 
1531     // listeners the wal factory will add to wals it creates.
1532     final List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1533     listeners.add(new MetricsWAL());
1534     if (this.replicationSourceHandler != null &&
1535         this.replicationSourceHandler.getWALActionsListener() != null) {
1536       // Replication handler is an implementation of WALActionsListener.
1537       listeners.add(this.replicationSourceHandler.getWALActionsListener());
1538     }
1539 
1540     return new WALFactory(conf, listeners, serverName.toString());
1541   }
1542 
1543   /**
1544    * We initialize the roller for the wal that handles meta lazily
1545    * since we don't know if this regionserver will handle it. All calls to
1546    * this method return a reference to the that same roller. As newly referenced
1547    * meta regions are brought online, they will be offered to the roller for maintenance.
1548    * As a part of that registration process, the roller will add itself as a
1549    * listener on the wal.
1550    */
1551   protected LogRoller ensureMetaWALRoller() {
1552     // Using a tmp log roller to ensure metaLogRoller is alive once it is not
1553     // null
1554     LogRoller roller = metawalRoller.get();
1555     if (null == roller) {
1556       LogRoller tmpLogRoller = new LogRoller(this, this);
1557       String n = Thread.currentThread().getName();
1558       Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
1559           n + "-MetaLogRoller", uncaughtExceptionHandler);
1560       if (metawalRoller.compareAndSet(null, tmpLogRoller)) {
1561         roller = tmpLogRoller;
1562       } else {
1563         // Another thread won starting the roller
1564         Threads.shutdown(tmpLogRoller.getThread());
1565         roller = metawalRoller.get();
1566       }
1567     }
1568     return roller;
1569   }
1570 
1571   public MetricsRegionServer getRegionServerMetrics() {
1572     return this.metricsRegionServer;
1573   }
1574 
1575   /**
1576    * @return Master address tracker instance.
1577    */
1578   public MasterAddressTracker getMasterAddressTracker() {
1579     return this.masterAddressTracker;
1580   }
1581 
1582   /*
1583    * Start maintenance Threads, Server, Worker and lease checker threads.
1584    * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1585    * get an unhandled exception. We cannot set the handler on all threads.
1586    * Server's internal Listener thread is off limits. For Server, if an OOME, it
1587    * waits a while then retries. Meantime, a flush or a compaction that tries to
1588    * run should trigger same critical condition and the shutdown will run. On
1589    * its way out, this server will shut down Server. Leases are sort of
1590    * inbetween. It has an internal thread that while it inherits from Chore, it
1591    * keeps its own internal stop mechanism so needs to be stopped by this
1592    * hosting server. Worker logs the exception and exits.
1593    */
1594   private void startServiceThreads() throws IOException {
1595     // Start executor services
1596     this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
1597       conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1598     this.service.startExecutorService(ExecutorType.RS_OPEN_META,
1599       conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1600     this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1601       conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1602     this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
1603       conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1604     if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1605       this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
1606         conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
1607     }
1608     this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
1609        "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
1610 
1611     Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
1612         uncaughtExceptionHandler);
1613     this.cacheFlusher.start(uncaughtExceptionHandler);
1614     Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), getName() +
1615       ".compactionChecker", uncaughtExceptionHandler);
1616     Threads.setDaemonThreadRunning(this.periodicFlusher.getThread(), getName() +
1617         ".periodicFlusher", uncaughtExceptionHandler);
1618     if (this.healthCheckChore != null) {
1619       Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), getName() + ".healthChecker",
1620             uncaughtExceptionHandler);
1621     }
1622     if (this.nonceManagerChore != null) {
1623       Threads.setDaemonThreadRunning(this.nonceManagerChore.getThread(), getName() + ".nonceCleaner",
1624             uncaughtExceptionHandler);
1625     }
1626     if (this.storefileRefresher != null) {
1627       Threads.setDaemonThreadRunning(this.storefileRefresher.getThread(), getName() + ".storefileRefresher",
1628             uncaughtExceptionHandler);
1629     }
1630 
1631     // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1632     // an unhandled exception, it will just exit.
1633     this.leases.setName(getName() + ".leaseChecker");
1634     this.leases.start();
1635 
1636     if (this.replicationSourceHandler == this.replicationSinkHandler &&
1637         this.replicationSourceHandler != null) {
1638       this.replicationSourceHandler.startReplicationService();
1639     } else {
1640       if (this.replicationSourceHandler != null) {
1641         this.replicationSourceHandler.startReplicationService();
1642       }
1643       if (this.replicationSinkHandler != null) {
1644         this.replicationSinkHandler.startReplicationService();
1645       }
1646     }
1647 
1648     // Create the log splitting worker and start it
1649     // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
1650     // quite a while inside HConnection layer. The worker won't be available for other
1651     // tasks even after current task is preempted after a split task times out.
1652     Configuration sinkConf = HBaseConfiguration.create(conf);
1653     sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1654       conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
1655     sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1656       conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
1657     sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
1658     this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory);
1659     splitLogWorker.start();
1660   }
1661 
1662   /**
1663    * Puts up the webui.
1664    * @return Returns final port -- maybe different from what we started with.
1665    * @throws IOException
1666    */
1667   private int putUpWebUI() throws IOException {
1668     int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
1669       HConstants.DEFAULT_REGIONSERVER_INFOPORT);
1670     // -1 is for disabling info server
1671     if (port < 0) return port;
1672     String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
1673     if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
1674       String msg =
1675           "Failed to start http info server. Address " + addr
1676               + " does not belong to this host. Correct configuration parameter: "
1677               + "hbase.regionserver.info.bindAddress";
1678       LOG.error(msg);
1679       throw new IOException(msg);
1680     }
1681     // check if auto port bind enabled
1682     boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
1683         false);
1684     while (true) {
1685       try {
1686         this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
1687         infoServer.addServlet("dump", "/dump", getDumpServlet());
1688         configureInfoServer();
1689         this.infoServer.start();
1690         break;
1691       } catch (BindException e) {
1692         if (!auto) {
1693           // auto bind disabled throw BindException
1694           LOG.error("Failed binding http info server to port: " + port);
1695           throw e;
1696         }
1697         // auto bind enabled, try to use another port
1698         LOG.info("Failed binding http info server to port: " + port);
1699         port++;
1700       }
1701     }
1702     port = this.infoServer.getPort();
1703     conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
1704     int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT,
1705       HConstants.DEFAULT_MASTER_INFOPORT);
1706     conf.setInt("hbase.master.info.port.orig", masterInfoPort);
1707     conf.setInt(HConstants.MASTER_INFO_PORT, port);
1708     return port;
1709   }
1710 
1711   /*
1712    * Verify that server is healthy
1713    */
1714   private boolean isHealthy() {
1715     if (!fsOk) {
1716       // File system problem
1717       return false;
1718     }
1719     // Verify that all threads are alive
1720     if (!(leases.isAlive()
1721         && cacheFlusher.isAlive() && walRoller.isAlive()
1722         && this.compactionChecker.isAlive()
1723         && this.periodicFlusher.isAlive())) {
1724       stop("One or more threads are no longer alive -- stop");
1725       return false;
1726     }
1727     final LogRoller metawalRoller = this.metawalRoller.get();
1728     if (metawalRoller != null && !metawalRoller.isAlive()) {
1729       stop("Meta WAL roller thread is no longer alive -- stop");
1730       return false;
1731     }
1732     return true;
1733   }
1734 
1735   private static final byte[] UNSPECIFIED_REGION = new byte[]{};
1736 
1737   @Override
1738   public WAL getWAL(HRegionInfo regionInfo) throws IOException {
1739     WAL wal;
1740     LogRoller roller = walRoller;
1741     //_ROOT_ and hbase:meta regions have separate WAL.
1742     if (regionInfo != null && regionInfo.isMetaTable()) {
1743       roller = ensureMetaWALRoller();
1744       wal = walFactory.getMetaWAL(regionInfo.getEncodedNameAsBytes());
1745     } else if (regionInfo == null) {
1746       wal = walFactory.getWAL(UNSPECIFIED_REGION);
1747     } else {
1748       wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes());
1749     }
1750     roller.addWAL(wal);
1751     return wal;
1752   }
1753 
1754   @Override
1755   public ClusterConnection getConnection() {
1756     return this.clusterConnection;
1757   }
1758 
1759   @Override
1760   public MetaTableLocator getMetaTableLocator() {
1761     return this.metaTableLocator;
1762   }
1763 
1764   @Override
1765   public void stop(final String msg) {
1766     if (!this.stopped) {
1767       try {
1768         if (this.rsHost != null) {
1769           this.rsHost.preStop(msg);
1770         }
1771         this.stopped = true;
1772         LOG.info("STOPPED: " + msg);
1773         // Wakes run() if it is sleeping
1774         sleeper.skipSleepCycle();
1775       } catch (IOException exp) {
1776         LOG.warn("The region server did not stop", exp);
1777       }
1778     }
1779   }
1780 
1781   public void waitForServerOnline(){
1782     while (!isStopped() && !isOnline()) {
1783       synchronized (online) {
1784         try {
1785           online.wait(msgInterval);
1786         } catch (InterruptedException ie) {
1787           Thread.currentThread().interrupt();
1788           break;
1789         }
1790       }
1791     }
1792   }
1793 
1794   @Override
1795   public void postOpenDeployTasks(final HRegion r)
1796   throws KeeperException, IOException {
1797     rpcServices.checkOpen();
1798     LOG.info("Post open deploy tasks for " + r.getRegionNameAsString());
1799     // Do checks to see if we need to compact (references or too many files)
1800     for (Store s : r.getStores().values()) {
1801       if (s.hasReferences() || s.needsCompaction()) {
1802        this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
1803       }
1804     }
1805     long openSeqNum = r.getOpenSeqNum();
1806     if (openSeqNum == HConstants.NO_SEQNUM) {
1807       // If we opened a region, we should have read some sequence number from it.
1808       LOG.error("No sequence number found when opening " + r.getRegionNameAsString());
1809       openSeqNum = 0;
1810     }
1811 
1812     // Update flushed sequence id of a recovering region in ZK
1813     updateRecoveringRegionLastFlushedSequenceId(r);
1814 
1815     // Notify master
1816     if (!reportRegionStateTransition(
1817         TransitionCode.OPENED, openSeqNum, r.getRegionInfo())) {
1818       throw new IOException("Failed to report opened region to master: "
1819         + r.getRegionNameAsString());
1820     }
1821 
1822     LOG.debug("Finished post open deploy task for " + r.getRegionNameAsString());
1823   }
1824 
1825   @Override
1826   public boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris) {
1827     return reportRegionStateTransition(code, HConstants.NO_SEQNUM, hris);
1828   }
1829 
1830   @Override
1831   public boolean reportRegionStateTransition(
1832       TransitionCode code, long openSeqNum, HRegionInfo... hris) {
1833     if (TEST_SKIP_REPORTING_TRANSITION) {
1834       // This is for testing only in case there is no master
1835       // to handle the region transition report at all.
1836       if (code == TransitionCode.OPENED) {
1837         Preconditions.checkArgument(hris != null && hris.length == 1);
1838         if (hris[0].isMetaRegion()) {
1839           try {
1840             MetaTableLocator.setMetaLocation(getZooKeeper(), serverName, State.OPEN);
1841           } catch (KeeperException e) {
1842             LOG.info("Failed to update meta location", e);
1843             return false;
1844           }
1845         } else {
1846           try {
1847             MetaTableAccessor.updateRegionLocation(clusterConnection,
1848               hris[0], serverName, openSeqNum);
1849           } catch (IOException e) {
1850             LOG.info("Failed to update meta", e);
1851             return false;
1852           }
1853         }
1854       }
1855       return true;
1856     }
1857 
1858     ReportRegionStateTransitionRequest.Builder builder =
1859       ReportRegionStateTransitionRequest.newBuilder();
1860     builder.setServer(ProtobufUtil.toServerName(serverName));
1861     RegionStateTransition.Builder transition = builder.addTransitionBuilder();
1862     transition.setTransitionCode(code);
1863     if (code == TransitionCode.OPENED && openSeqNum >= 0) {
1864       transition.setOpenSeqNum(openSeqNum);
1865     }
1866     for (HRegionInfo hri: hris) {
1867       transition.addRegionInfo(HRegionInfo.convert(hri));
1868     }
1869     ReportRegionStateTransitionRequest request = builder.build();
1870     while (keepLooping()) {
1871       RegionServerStatusService.BlockingInterface rss = rssStub;
1872       try {
1873         if (rss == null) {
1874           createRegionServerStatusStub();
1875           continue;
1876         }
1877         ReportRegionStateTransitionResponse response =
1878           rss.reportRegionStateTransition(null, request);
1879         if (response.hasErrorMessage()) {
1880           LOG.info("Failed to transition " + hris[0]
1881             + " to " + code + ": " + response.getErrorMessage());
1882           return false;
1883         }
1884         return true;
1885       } catch (ServiceException se) {
1886         IOException ioe = ProtobufUtil.getRemoteException(se);
1887         LOG.info("Failed to report region transition, will retry", ioe);
1888         if (rssStub == rss) {
1889           rssStub = null;
1890         }
1891       }
1892     }
1893     return false;
1894   }
1895 
1896   @Override
1897   public RpcServerInterface getRpcServer() {
1898     return rpcServices.rpcServer;
1899   }
1900 
1901   @VisibleForTesting
1902   public RSRpcServices getRSRpcServices() {
1903     return rpcServices;
1904   }
1905 
1906   /**
1907    * Cause the server to exit without closing the regions it is serving, the log
1908    * it is using and without notifying the master. Used unit testing and on
1909    * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
1910    *
1911    * @param reason
1912    *          the reason we are aborting
1913    * @param cause
1914    *          the exception that caused the abort, or null
1915    */
1916   @Override
1917   public void abort(String reason, Throwable cause) {
1918     String msg = "ABORTING region server " + this + ": " + reason;
1919     if (cause != null) {
1920       LOG.fatal(msg, cause);
1921     } else {
1922       LOG.fatal(msg);
1923     }
1924     this.abortRequested = true;
1925     // HBASE-4014: show list of coprocessors that were loaded to help debug
1926     // regionserver crashes.Note that we're implicitly using
1927     // java.util.HashSet's toString() method to print the coprocessor names.
1928     LOG.fatal("RegionServer abort: loaded coprocessors are: " +
1929         CoprocessorHost.getLoadedCoprocessors());
1930     // Try and dump metrics if abort -- might give clue as to how fatal came about....
1931     try {
1932       LOG.info("Dump of metrics as JSON on abort: " + JSONBean.dumpRegionServerMetrics());
1933     } catch (MalformedObjectNameException | IOException e) {
1934       LOG.warn("Failed dumping metrics", e);
1935     }
1936 
1937     // Do our best to report our abort to the master, but this may not work
1938     try {
1939       if (cause != null) {
1940         msg += "\nCause:\n" + StringUtils.stringifyException(cause);
1941       }
1942       // Report to the master but only if we have already registered with the master.
1943       if (rssStub != null && this.serverName != null) {
1944         ReportRSFatalErrorRequest.Builder builder =
1945           ReportRSFatalErrorRequest.newBuilder();
1946         ServerName sn =
1947           ServerName.parseVersionedServerName(this.serverName.getVersionedBytes());
1948         builder.setServer(ProtobufUtil.toServerName(sn));
1949         builder.setErrorMessage(msg);
1950         rssStub.reportRSFatalError(null, builder.build());
1951       }
1952     } catch (Throwable t) {
1953       LOG.warn("Unable to report fatal error to master", t);
1954     }
1955     stop(reason);
1956   }
1957 
1958   /**
1959    * @see HRegionServer#abort(String, Throwable)
1960    */
1961   public void abort(String reason) {
1962     abort(reason, null);
1963   }
1964 
1965   @Override
1966   public boolean isAborted() {
1967     return this.abortRequested;
1968   }
1969 
1970   /*
1971    * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
1972    * logs but it does close socket in case want to bring up server on old
1973    * hostname+port immediately.
1974    */
1975   protected void kill() {
1976     this.killed = true;
1977     abort("Simulated kill");
1978   }
1979 
1980   /**
1981    * Wait on all threads to finish. Presumption is that all closes and stops
1982    * have already been called.
1983    */
1984   protected void stopServiceThreads() {
1985     if (this.nonceManagerChore != null) {
1986       Threads.shutdown(this.nonceManagerChore.getThread());
1987     }
1988     if (this.compactionChecker != null) {
1989       Threads.shutdown(this.compactionChecker.getThread());
1990     }
1991     if (this.periodicFlusher != null) {
1992       Threads.shutdown(this.periodicFlusher.getThread());
1993     }
1994     if (this.cacheFlusher != null) {
1995       this.cacheFlusher.join();
1996     }
1997     if (this.healthCheckChore != null) {
1998       Threads.shutdown(this.healthCheckChore.getThread());
1999     }
2000     if (this.spanReceiverHost != null) {
2001       this.spanReceiverHost.closeReceivers();
2002     }
2003     if (this.walRoller != null) {
2004       Threads.shutdown(this.walRoller.getThread());
2005     }
2006     final LogRoller metawalRoller = this.metawalRoller.get();
2007     if (metawalRoller != null) {
2008       Threads.shutdown(metawalRoller.getThread());
2009     }
2010     if (this.compactSplitThread != null) {
2011       this.compactSplitThread.join();
2012     }
2013     if (this.service != null) this.service.shutdown();
2014     if (this.replicationSourceHandler != null &&
2015         this.replicationSourceHandler == this.replicationSinkHandler) {
2016       this.replicationSourceHandler.stopReplicationService();
2017     } else {
2018       if (this.replicationSourceHandler != null) {
2019         this.replicationSourceHandler.stopReplicationService();
2020       }
2021       if (this.replicationSinkHandler != null) {
2022         this.replicationSinkHandler.stopReplicationService();
2023       }
2024     }
2025     if (this.storefileRefresher != null) {
2026       Threads.shutdown(this.storefileRefresher.getThread());
2027     }
2028   }
2029 
2030   /**
2031    * @return Return the object that implements the replication
2032    * source service.
2033    */
2034   ReplicationSourceService getReplicationSourceService() {
2035     return replicationSourceHandler;
2036   }
2037 
2038   /**
2039    * @return Return the object that implements the replication
2040    * sink service.
2041    */
2042   ReplicationSinkService getReplicationSinkService() {
2043     return replicationSinkHandler;
2044   }
2045 
2046   /**
2047    * Get the current master from ZooKeeper and open the RPC connection to it.
2048    *
2049    * Method will block until a master is available. You can break from this
2050    * block by requesting the server stop.
2051    *
2052    * @return master + port, or null if server has been stopped
2053    */
2054   private synchronized ServerName createRegionServerStatusStub() {
2055     if (rssStub != null) {
2056       return masterAddressTracker.getMasterAddress();
2057     }
2058     ServerName sn = null;
2059     long previousLogTime = 0;
2060     boolean refresh = false; // for the first time, use cached data
2061     RegionServerStatusService.BlockingInterface intf = null;
2062     boolean interrupted = false;
2063     try {
2064       while (keepLooping()) {
2065         sn = this.masterAddressTracker.getMasterAddress(refresh);
2066         if (sn == null) {
2067           if (!keepLooping()) {
2068             // give up with no connection.
2069             LOG.debug("No master found and cluster is stopped; bailing out");
2070             return null;
2071           }
2072           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2073             LOG.debug("No master found; retry");
2074             previousLogTime = System.currentTimeMillis();
2075           }
2076           refresh = true; // let's try pull it from ZK directly
2077           if (sleep(200)) {
2078             interrupted = true;
2079           }
2080           continue;
2081         }
2082 
2083         // If we are on the active master, use the shortcut
2084         if (this instanceof HMaster && sn.equals(getServerName())) {
2085           intf = ((HMaster)this).getMasterRpcServices();
2086           break;
2087         }
2088         try {
2089           BlockingRpcChannel channel =
2090             this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), operationTimeout);
2091           intf = RegionServerStatusService.newBlockingStub(channel);
2092           break;
2093         } catch (IOException e) {
2094           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2095             e = e instanceof RemoteException ?
2096               ((RemoteException)e).unwrapRemoteException() : e;
2097             if (e instanceof ServerNotRunningYetException) {
2098               LOG.info("Master isn't available yet, retrying");
2099             } else {
2100               LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2101             }
2102             previousLogTime = System.currentTimeMillis();
2103           }
2104           if (sleep(200)) {
2105             interrupted = true;
2106           }
2107         }
2108       }
2109     } finally {
2110       if (interrupted) {
2111         Thread.currentThread().interrupt();
2112       }
2113     }
2114     rssStub = intf;
2115     return sn;
2116   }
2117 
2118   /**
2119    * @return True if we should break loop because cluster is going down or
2120    * this server has been stopped or hdfs has gone bad.
2121    */
2122   private boolean keepLooping() {
2123     return !this.stopped && isClusterUp();
2124   }
2125 
2126   /*
2127    * Let the master know we're here Run initialization using parameters passed
2128    * us by the master.
2129    * @return A Map of key/value configurations we got from the Master else
2130    * null if we failed to register.
2131    * @throws IOException
2132    */
2133   private RegionServerStartupResponse reportForDuty() throws IOException {
2134     ServerName masterServerName = createRegionServerStatusStub();
2135     if (masterServerName == null) return null;
2136     RegionServerStartupResponse result = null;
2137     try {
2138       rpcServices.requestCount.set(0);
2139       LOG.info("reportForDuty to master=" + masterServerName + " with port="
2140         + rpcServices.isa.getPort() + ", startcode=" + this.startcode);
2141       long now = EnvironmentEdgeManager.currentTime();
2142       int port = rpcServices.isa.getPort();
2143       RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2144       request.setPort(port);
2145       request.setServerStartCode(this.startcode);
2146       request.setServerCurrentTime(now);
2147       result = this.rssStub.regionServerStartup(null, request.build());
2148     } catch (ServiceException se) {
2149       IOException ioe = ProtobufUtil.getRemoteException(se);
2150       if (ioe instanceof ClockOutOfSyncException) {
2151         LOG.fatal("Master rejected startup because clock is out of sync", ioe);
2152         // Re-throw IOE will cause RS to abort
2153         throw ioe;
2154       } else if (ioe instanceof ServerNotRunningYetException) {
2155         LOG.debug("Master is not running yet");
2156       } else {
2157         LOG.warn("error telling master we are up", se);
2158       }
2159     }
2160     return result;
2161   }
2162 
2163   @Override
2164   public long getLastSequenceId(byte[] region) {
2165     Long lastFlushedSequenceId = -1l;
2166     try {
2167       GetLastFlushedSequenceIdRequest req = RequestConverter
2168           .buildGetLastFlushedSequenceIdRequest(region);
2169       RegionServerStatusService.BlockingInterface rss = rssStub;
2170       if (rss == null) { // Try to connect one more time
2171         createRegionServerStatusStub();
2172         rss = rssStub;
2173         if (rss == null) {
2174           // Still no luck, we tried
2175           LOG.warn("Unable to connect to the master to check "
2176             + "the last flushed sequence id");
2177           return -1l;
2178         }
2179       }
2180       lastFlushedSequenceId = rss.getLastFlushedSequenceId(null, req)
2181           .getLastFlushedSequenceId();
2182     } catch (ServiceException e) {
2183       lastFlushedSequenceId = -1l;
2184       LOG.warn("Unable to connect to the master to check "
2185         + "the last flushed sequence id", e);
2186     }
2187     return lastFlushedSequenceId;
2188   }
2189 
2190   /**
2191    * Closes all regions.  Called on our way out.
2192    * Assumes that its not possible for new regions to be added to onlineRegions
2193    * while this method runs.
2194    */
2195   protected void closeAllRegions(final boolean abort) {
2196     closeUserRegions(abort);
2197     closeMetaTableRegions(abort);
2198   }
2199 
2200   /**
2201    * Close meta region if we carry it
2202    * @param abort Whether we're running an abort.
2203    */
2204   void closeMetaTableRegions(final boolean abort) {
2205     HRegion meta = null;
2206     this.lock.writeLock().lock();
2207     try {
2208       for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) {
2209         HRegionInfo hri = e.getValue().getRegionInfo();
2210         if (hri.isMetaRegion()) {
2211           meta = e.getValue();
2212         }
2213         if (meta != null) break;
2214       }
2215     } finally {
2216       this.lock.writeLock().unlock();
2217     }
2218     if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2219   }
2220 
2221   /**
2222    * Schedule closes on all user regions.
2223    * Should be safe calling multiple times because it wont' close regions
2224    * that are already closed or that are closing.
2225    * @param abort Whether we're running an abort.
2226    */
2227   void closeUserRegions(final boolean abort) {
2228     this.lock.writeLock().lock();
2229     try {
2230       for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
2231         HRegion r = e.getValue();
2232         if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) {
2233           // Don't update zk with this close transition; pass false.
2234           closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2235         }
2236       }
2237     } finally {
2238       this.lock.writeLock().unlock();
2239     }
2240   }
2241 
2242   /** @return the info server */
2243   public InfoServer getInfoServer() {
2244     return infoServer;
2245   }
2246 
2247   /**
2248    * @return true if a stop has been requested.
2249    */
2250   @Override
2251   public boolean isStopped() {
2252     return this.stopped;
2253   }
2254 
2255   @Override
2256   public boolean isStopping() {
2257     return this.stopping;
2258   }
2259 
2260   @Override
2261   public Map<String, HRegion> getRecoveringRegions() {
2262     return this.recoveringRegions;
2263   }
2264 
2265   /**
2266    *
2267    * @return the configuration
2268    */
2269   @Override
2270   public Configuration getConfiguration() {
2271     return conf;
2272   }
2273 
2274   /** @return the write lock for the server */
2275   ReentrantReadWriteLock.WriteLock getWriteLock() {
2276     return lock.writeLock();
2277   }
2278 
2279   public int getNumberOfOnlineRegions() {
2280     return this.onlineRegions.size();
2281   }
2282 
2283   boolean isOnlineRegionsEmpty() {
2284     return this.onlineRegions.isEmpty();
2285   }
2286 
2287   /**
2288    * For tests, web ui and metrics.
2289    * This method will only work if HRegionServer is in the same JVM as client;
2290    * HRegion cannot be serialized to cross an rpc.
2291    */
2292   public Collection<HRegion> getOnlineRegionsLocalContext() {
2293     Collection<HRegion> regions = this.onlineRegions.values();
2294     return Collections.unmodifiableCollection(regions);
2295   }
2296 
2297   @Override
2298   public void addToOnlineRegions(HRegion region) {
2299     this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2300     configurationManager.registerObserver(region);
2301   }
2302 
2303   /**
2304    * @return A new Map of online regions sorted by region size with the first entry being the
2305    * biggest.  If two regions are the same size, then the last one found wins; i.e. this method
2306    * may NOT return all regions.
2307    */
2308   SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() {
2309     // we'll sort the regions in reverse
2310     SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
2311         new Comparator<Long>() {
2312           @Override
2313           public int compare(Long a, Long b) {
2314             return -1 * a.compareTo(b);
2315           }
2316         });
2317     // Copy over all regions. Regions are sorted by size with biggest first.
2318     for (HRegion region : this.onlineRegions.values()) {
2319       sortedRegions.put(region.memstoreSize.get(), region);
2320     }
2321     return sortedRegions;
2322   }
2323 
2324   /**
2325    * @return time stamp in millis of when this region server was started
2326    */
2327   public long getStartcode() {
2328     return this.startcode;
2329   }
2330 
2331   /** @return reference to FlushRequester */
2332   @Override
2333   public FlushRequester getFlushRequester() {
2334     return this.cacheFlusher;
2335   }
2336 
2337   /**
2338    * Get the top N most loaded regions this server is serving so we can tell the
2339    * master which regions it can reallocate if we're overloaded. TODO: actually
2340    * calculate which regions are most loaded. (Right now, we're just grabbing
2341    * the first N regions being served regardless of load.)
2342    */
2343   protected HRegionInfo[] getMostLoadedRegions() {
2344     ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2345     for (HRegion r : onlineRegions.values()) {
2346       if (!r.isAvailable()) {
2347         continue;
2348       }
2349       if (regions.size() < numRegionsToReport) {
2350         regions.add(r.getRegionInfo());
2351       } else {
2352         break;
2353       }
2354     }
2355     return regions.toArray(new HRegionInfo[regions.size()]);
2356   }
2357 
2358   @Override
2359   public Leases getLeases() {
2360     return leases;
2361   }
2362 
2363   /**
2364    * @return Return the rootDir.
2365    */
2366   protected Path getRootDir() {
2367     return rootDir;
2368   }
2369 
2370   /**
2371    * @return Return the fs.
2372    */
2373   @Override
2374   public FileSystem getFileSystem() {
2375     return fs;
2376   }
2377 
2378   @Override
2379   public String toString() {
2380     return getServerName().toString();
2381   }
2382 
2383   /**
2384    * Interval at which threads should run
2385    *
2386    * @return the interval
2387    */
2388   public int getThreadWakeFrequency() {
2389     return threadWakeFrequency;
2390   }
2391 
2392   @Override
2393   public ZooKeeperWatcher getZooKeeper() {
2394     return zooKeeper;
2395   }
2396 
2397   @Override
2398   public BaseCoordinatedStateManager getCoordinatedStateManager() {
2399     return csm;
2400   }
2401 
2402   @Override
2403   public ServerName getServerName() {
2404     return serverName;
2405   }
2406 
2407   @Override
2408   public CompactionRequestor getCompactionRequester() {
2409     return this.compactSplitThread;
2410   }
2411 
2412   public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
2413     return this.rsHost;
2414   }
2415 
2416   @Override
2417   public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2418     return this.regionsInTransitionInRS;
2419   }
2420 
2421   @Override
2422   public ExecutorService getExecutorService() {
2423     return service;
2424   }
2425 
2426   @Override
2427   public RegionServerQuotaManager getRegionServerQuotaManager() {
2428     return rsQuotaManager;
2429   }
2430 
2431   //
2432   // Main program and support routines
2433   //
2434 
2435   /**
2436    * Load the replication service objects, if any
2437    */
2438   static private void createNewReplicationInstance(Configuration conf,
2439     HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
2440 
2441     // If replication is not enabled, then return immediately.
2442     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
2443         HConstants.REPLICATION_ENABLE_DEFAULT)) {
2444       return;
2445     }
2446 
2447     // read in the name of the source replication class from the config file.
2448     String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2449                                HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2450 
2451     // read in the name of the sink replication class from the config file.
2452     String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2453                              HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2454 
2455     // If both the sink and the source class names are the same, then instantiate
2456     // only one object.
2457     if (sourceClassname.equals(sinkClassname)) {
2458       server.replicationSourceHandler = (ReplicationSourceService)
2459                                          newReplicationInstance(sourceClassname,
2460                                          conf, server, fs, logDir, oldLogDir);
2461       server.replicationSinkHandler = (ReplicationSinkService)
2462                                          server.replicationSourceHandler;
2463     } else {
2464       server.replicationSourceHandler = (ReplicationSourceService)
2465                                          newReplicationInstance(sourceClassname,
2466                                          conf, server, fs, logDir, oldLogDir);
2467       server.replicationSinkHandler = (ReplicationSinkService)
2468                                          newReplicationInstance(sinkClassname,
2469                                          conf, server, fs, logDir, oldLogDir);
2470     }
2471   }
2472 
2473   static private ReplicationService newReplicationInstance(String classname,
2474     Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
2475     Path oldLogDir) throws IOException{
2476 
2477     Class<?> clazz = null;
2478     try {
2479       ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2480       clazz = Class.forName(classname, true, classLoader);
2481     } catch (java.lang.ClassNotFoundException nfe) {
2482       throw new IOException("Could not find class for " + classname);
2483     }
2484 
2485     // create an instance of the replication object.
2486     ReplicationService service = (ReplicationService)
2487                               ReflectionUtils.newInstance(clazz, conf);
2488     service.initialize(server, fs, logDir, oldLogDir);
2489     return service;
2490   }
2491 
2492   /**
2493    * Utility for constructing an instance of the passed HRegionServer class.
2494    *
2495    * @param regionServerClass
2496    * @param conf2
2497    * @return HRegionServer instance.
2498    */
2499   public static HRegionServer constructRegionServer(
2500       Class<? extends HRegionServer> regionServerClass,
2501       final Configuration conf2, CoordinatedStateManager cp) {
2502     try {
2503       Constructor<? extends HRegionServer> c = regionServerClass
2504           .getConstructor(Configuration.class, CoordinatedStateManager.class);
2505       return c.newInstance(conf2, cp);
2506     } catch (Exception e) {
2507       throw new RuntimeException("Failed construction of " + "Regionserver: "
2508           + regionServerClass.toString(), e);
2509     }
2510   }
2511 
2512   /**
2513    * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2514    */
2515   public static void main(String[] args) throws Exception {
2516     VersionInfo.logVersion();
2517     Configuration conf = HBaseConfiguration.create();
2518     @SuppressWarnings("unchecked")
2519     Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2520         .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2521 
2522     new HRegionServerCommandLine(regionServerClass).doMain(args);
2523   }
2524 
2525   /**
2526    * Gets the online regions of the specified table.
2527    * This method looks at the in-memory onlineRegions.  It does not go to <code>hbase:meta</code>.
2528    * Only returns <em>online</em> regions.  If a region on this table has been
2529    * closed during a disable, etc., it will not be included in the returned list.
2530    * So, the returned list may not necessarily be ALL regions in this table, its
2531    * all the ONLINE regions in the table.
2532    * @param tableName
2533    * @return Online regions from <code>tableName</code>
2534    */
2535   @Override
2536   public List<HRegion> getOnlineRegions(TableName tableName) {
2537      List<HRegion> tableRegions = new ArrayList<HRegion>();
2538      synchronized (this.onlineRegions) {
2539        for (HRegion region: this.onlineRegions.values()) {
2540          HRegionInfo regionInfo = region.getRegionInfo();
2541          if(regionInfo.getTable().equals(tableName)) {
2542            tableRegions.add(region);
2543          }
2544        }
2545      }
2546      return tableRegions;
2547    }
2548 
2549   /**
2550    * Gets the online tables in this RS.
2551    * This method looks at the in-memory onlineRegions.
2552    * @return all the online tables in this RS
2553    */
2554   @Override
2555   public Set<TableName> getOnlineTables() {
2556     Set<TableName> tables = new HashSet<TableName>();
2557     synchronized (this.onlineRegions) {
2558       for (HRegion region: this.onlineRegions.values()) {
2559         tables.add(region.getTableDesc().getTableName());
2560       }
2561     }
2562     return tables;
2563   }
2564 
2565   // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
2566   public String[] getRegionServerCoprocessors() {
2567     TreeSet<String> coprocessors = new TreeSet<String>();
2568     try {
2569       coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
2570     } catch (IOException exception) {
2571       LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
2572           "skipping.");
2573       LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2574     }
2575     Collection<HRegion> regions = getOnlineRegionsLocalContext();
2576     for (HRegion region: regions) {
2577       coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2578       try {
2579         coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
2580       } catch (IOException exception) {
2581         LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
2582             "; skipping.");
2583         LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2584       }
2585     }
2586     return coprocessors.toArray(new String[coprocessors.size()]);
2587   }
2588 
2589   /**
2590    * Try to close the region, logs a warning on failure but continues.
2591    * @param region Region to close
2592    */
2593   private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
2594     try {
2595       if (!closeRegion(region.getEncodedName(), abort, null)) {
2596         LOG.warn("Failed to close " + region.getRegionNameAsString() +
2597             " - ignoring and continuing");
2598       }
2599     } catch (IOException e) {
2600       LOG.warn("Failed to close " + region.getRegionNameAsString() +
2601           " - ignoring and continuing", e);
2602     }
2603   }
2604 
2605   /**
2606    * Close asynchronously a region, can be called from the master or internally by the regionserver
2607    * when stopping. If called from the master, the region will update the znode status.
2608    *
2609    * <p>
2610    * If an opening was in progress, this method will cancel it, but will not start a new close. The
2611    * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
2612    * </p>
2613 
2614    * <p>
2615    *   If a close was in progress, this new request will be ignored, and an exception thrown.
2616    * </p>
2617    *
2618    * @param encodedName Region to close
2619    * @param abort True if we are aborting
2620    * @return True if closed a region.
2621    * @throws NotServingRegionException if the region is not online
2622    */
2623   protected boolean closeRegion(String encodedName, final boolean abort, final ServerName sn)
2624       throws NotServingRegionException {
2625     //Check for permissions to close.
2626     HRegion actualRegion = this.getFromOnlineRegions(encodedName);
2627     if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2628       try {
2629         actualRegion.getCoprocessorHost().preClose(false);
2630       } catch (IOException exp) {
2631         LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2632         return false;
2633       }
2634     }
2635 
2636     final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(encodedName.getBytes(),
2637         Boolean.FALSE);
2638 
2639     if (Boolean.TRUE.equals(previous)) {
2640       LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
2641           "trying to OPEN. Cancelling OPENING.");
2642       if (!regionsInTransitionInRS.replace(encodedName.getBytes(), previous, Boolean.FALSE)){
2643         // The replace failed. That should be an exceptional case, but theoretically it can happen.
2644         // We're going to try to do a standard close then.
2645         LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
2646             " Doing a standard close now");
2647         return closeRegion(encodedName, abort, sn);
2648       }
2649       // Let's get the region from the online region list again
2650       actualRegion = this.getFromOnlineRegions(encodedName);
2651       if (actualRegion == null) { // If already online, we still need to close it.
2652         LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
2653         // The master deletes the znode when it receives this exception.
2654         throw new NotServingRegionException("The region " + encodedName +
2655           " was opening but not yet served. Opening is cancelled.");
2656       }
2657     } else if (Boolean.FALSE.equals(previous)) {
2658       LOG.info("Received CLOSE for the region: " + encodedName +
2659         ", which we are already trying to CLOSE, but not completed yet");
2660       return true;
2661     }
2662 
2663     if (actualRegion == null) {
2664       LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
2665       this.regionsInTransitionInRS.remove(encodedName.getBytes());
2666       // The master deletes the znode when it receives this exception.
2667       throw new NotServingRegionException("The region " + encodedName +
2668           " is not online, and is not opening.");
2669     }
2670 
2671     CloseRegionHandler crh;
2672     final HRegionInfo hri = actualRegion.getRegionInfo();
2673     if (hri.isMetaRegion()) {
2674       crh = new CloseMetaHandler(this, this, hri, abort);
2675     } else {
2676       crh = new CloseRegionHandler(this, this, hri, abort, sn);
2677     }
2678     this.service.submit(crh);
2679     return true;
2680   }
2681 
2682    /**
2683    * @param regionName
2684    * @return HRegion for the passed binary <code>regionName</code> or null if
2685    *         named region is not member of the online regions.
2686    */
2687   public HRegion getOnlineRegion(final byte[] regionName) {
2688     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2689     return this.onlineRegions.get(encodedRegionName);
2690   }
2691 
2692   public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) {
2693     return this.regionFavoredNodesMap.get(encodedRegionName);
2694   }
2695 
2696   @Override
2697   public HRegion getFromOnlineRegions(final String encodedRegionName) {
2698     return this.onlineRegions.get(encodedRegionName);
2699   }
2700 
2701 
2702   @Override
2703   public boolean removeFromOnlineRegions(final HRegion r, ServerName destination) {
2704     HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
2705 
2706     if (destination != null) {
2707       try {
2708         WAL wal = getWAL(r.getRegionInfo());
2709         long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes());
2710         if (closeSeqNum == HConstants.NO_SEQNUM) {
2711           // No edits in WAL for this region; get the sequence number when the region was opened.
2712           closeSeqNum = r.getOpenSeqNum();
2713           if (closeSeqNum == HConstants.NO_SEQNUM) {
2714             closeSeqNum = 0;
2715           }
2716         }
2717         addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
2718       } catch (IOException exception) {
2719         LOG.error("Could not retrieve WAL information for region " + r.getRegionInfo() +
2720             "; not adding to moved regions.");
2721         LOG.debug("Exception details for failure to get wal", exception);
2722       }
2723     }
2724     this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
2725     return toReturn != null;
2726   }
2727 
2728   /**
2729    * Protected utility method for safely obtaining an HRegion handle.
2730    *
2731    * @param regionName
2732    *          Name of online {@link HRegion} to return
2733    * @return {@link HRegion} for <code>regionName</code>
2734    * @throws NotServingRegionException
2735    */
2736   protected HRegion getRegion(final byte[] regionName)
2737       throws NotServingRegionException {
2738     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2739     return getRegionByEncodedName(regionName, encodedRegionName);
2740   }
2741 
2742   public HRegion getRegionByEncodedName(String encodedRegionName)
2743       throws NotServingRegionException {
2744     return getRegionByEncodedName(null, encodedRegionName);
2745   }
2746 
2747   protected HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
2748     throws NotServingRegionException {
2749     HRegion region = this.onlineRegions.get(encodedRegionName);
2750     if (region == null) {
2751       MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
2752       if (moveInfo != null) {
2753         throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
2754       }
2755       Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
2756       String regionNameStr = regionName == null?
2757         encodedRegionName: Bytes.toStringBinary(regionName);
2758       if (isOpening != null && isOpening.booleanValue()) {
2759         throw new RegionOpeningException("Region " + regionNameStr +
2760           " is opening on " + this.serverName);
2761       }
2762       throw new NotServingRegionException("Region " + regionNameStr +
2763         " is not online on " + this.serverName);
2764     }
2765     return region;
2766   }
2767 
2768   /*
2769    * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
2770    * IOE if it isn't already.
2771    *
2772    * @param t Throwable
2773    *
2774    * @param msg Message to log in error. Can be null.
2775    *
2776    * @return Throwable converted to an IOE; methods can only let out IOEs.
2777    */
2778   private Throwable cleanup(final Throwable t, final String msg) {
2779     // Don't log as error if NSRE; NSRE is 'normal' operation.
2780     if (t instanceof NotServingRegionException) {
2781       LOG.debug("NotServingRegionException; " + t.getMessage());
2782       return t;
2783     }
2784     Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
2785     if (msg == null) {
2786       LOG.error("", e);
2787     } else {
2788       LOG.error(msg, e);
2789     }
2790     if (!rpcServices.checkOOME(t)) {
2791       checkFileSystem();
2792     }
2793     return t;
2794   }
2795 
2796   /*
2797    * @param t
2798    *
2799    * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
2800    *
2801    * @return Make <code>t</code> an IOE if it isn't already.
2802    */
2803   protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
2804     return (t instanceof IOException ? (IOException) t : msg == null
2805         || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
2806   }
2807 
2808   /**
2809    * Checks to see if the file system is still accessible. If not, sets
2810    * abortRequested and stopRequested
2811    *
2812    * @return false if file system is not available
2813    */
2814   public boolean checkFileSystem() {
2815     if (this.fsOk && this.fs != null) {
2816       try {
2817         FSUtils.checkFileSystemAvailable(this.fs);
2818       } catch (IOException e) {
2819         abort("File System not available", e);
2820         this.fsOk = false;
2821       }
2822     }
2823     return this.fsOk;
2824   }
2825 
2826   @Override
2827   public void updateRegionFavoredNodesMapping(String encodedRegionName,
2828       List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
2829     InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
2830     // Refer to the comment on the declaration of regionFavoredNodesMap on why
2831     // it is a map of region name to InetSocketAddress[]
2832     for (int i = 0; i < favoredNodes.size(); i++) {
2833       addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
2834           favoredNodes.get(i).getPort());
2835     }
2836     regionFavoredNodesMap.put(encodedRegionName, addr);
2837   }
2838 
2839   /**
2840    * Return the favored nodes for a region given its encoded name. Look at the
2841    * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[]
2842    * @param encodedRegionName
2843    * @return array of favored locations
2844    */
2845   @Override
2846   public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
2847     return regionFavoredNodesMap.get(encodedRegionName);
2848   }
2849 
2850   @Override
2851   public ServerNonceManager getNonceManager() {
2852     return this.nonceManager;
2853   }
2854 
2855   private static class MovedRegionInfo {
2856     private final ServerName serverName;
2857     private final long seqNum;
2858     private final long ts;
2859 
2860     public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
2861       this.serverName = serverName;
2862       this.seqNum = closeSeqNum;
2863       ts = EnvironmentEdgeManager.currentTime();
2864      }
2865 
2866     public ServerName getServerName() {
2867       return serverName;
2868     }
2869 
2870     public long getSeqNum() {
2871       return seqNum;
2872     }
2873 
2874     public long getMoveTime() {
2875       return ts;
2876     }
2877   }
2878 
2879   // This map will contains all the regions that we closed for a move.
2880   //  We add the time it was moved as we don't want to keep too old information
2881   protected Map<String, MovedRegionInfo> movedRegions =
2882       new ConcurrentHashMap<String, MovedRegionInfo>(3000);
2883 
2884   // We need a timeout. If not there is a risk of giving a wrong information: this would double
2885   //  the number of network calls instead of reducing them.
2886   private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
2887 
2888   protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
2889     if (ServerName.isSameHostnameAndPort(destination, this.getServerName())) {
2890       LOG.warn("Not adding moved region record: " + encodedName + " to self.");
2891       return;
2892     }
2893     LOG.info("Adding moved region record: "
2894       + encodedName + " to " + destination + " as of " + closeSeqNum);
2895     movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
2896   }
2897 
2898   void removeFromMovedRegions(String encodedName) {
2899     movedRegions.remove(encodedName);
2900   }
2901 
2902   private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
2903     MovedRegionInfo dest = movedRegions.get(encodedRegionName);
2904 
2905     long now = EnvironmentEdgeManager.currentTime();
2906     if (dest != null) {
2907       if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
2908         return dest;
2909       } else {
2910         movedRegions.remove(encodedRegionName);
2911       }
2912     }
2913 
2914     return null;
2915   }
2916 
2917   /**
2918    * Remove the expired entries from the moved regions list.
2919    */
2920   protected void cleanMovedRegions() {
2921     final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
2922     Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();
2923 
2924     while (it.hasNext()){
2925       Map.Entry<String, MovedRegionInfo> e = it.next();
2926       if (e.getValue().getMoveTime() < cutOff) {
2927         it.remove();
2928       }
2929     }
2930   }
2931 
2932   /**
2933    * Creates a Chore thread to clean the moved region cache.
2934    */
2935   protected static class MovedRegionsCleaner extends Chore implements Stoppable {
2936     private HRegionServer regionServer;
2937     Stoppable stoppable;
2938 
2939     private MovedRegionsCleaner(
2940       HRegionServer regionServer, Stoppable stoppable){
2941       super("MovedRegionsCleaner for region "+regionServer, TIMEOUT_REGION_MOVED, stoppable);
2942       this.regionServer = regionServer;
2943       this.stoppable = stoppable;
2944     }
2945 
2946     static MovedRegionsCleaner createAndStart(HRegionServer rs){
2947       Stoppable stoppable = new Stoppable() {
2948         private volatile boolean isStopped = false;
2949         @Override public void stop(String why) { isStopped = true;}
2950         @Override public boolean isStopped() {return isStopped;}
2951       };
2952 
2953       return new MovedRegionsCleaner(rs, stoppable);
2954     }
2955 
2956     @Override
2957     protected void chore() {
2958       regionServer.cleanMovedRegions();
2959     }
2960 
2961     @Override
2962     public void stop(String why) {
2963       stoppable.stop(why);
2964     }
2965 
2966     @Override
2967     public boolean isStopped() {
2968       return stoppable.isStopped();
2969     }
2970   }
2971 
2972   private String getMyEphemeralNodePath() {
2973     return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
2974   }
2975 
2976   private boolean isHealthCheckerConfigured() {
2977     String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
2978     return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
2979   }
2980 
2981   /**
2982    * @return the underlying {@link CompactSplitThread} for the servers
2983    */
2984   public CompactSplitThread getCompactSplitThread() {
2985     return this.compactSplitThread;
2986   }
2987 
2988   /**
2989    * A helper function to store the last flushed sequence Id with the previous failed RS for a
2990    * recovering region. The Id is used to skip wal edits which are flushed. Since the flushed
2991    * sequence id is only valid for each RS, we associate the Id with corresponding failed RS.
2992    * @throws KeeperException
2993    * @throws IOException
2994    */
2995   private void updateRecoveringRegionLastFlushedSequenceId(HRegion r) throws KeeperException,
2996       IOException {
2997     if (!r.isRecovering()) {
2998       // return immdiately for non-recovering regions
2999       return;
3000     }
3001 
3002     HRegionInfo region = r.getRegionInfo();
3003     ZooKeeperWatcher zkw = getZooKeeper();
3004     String previousRSName = this.getLastFailedRSFromZK(region.getEncodedName());
3005     Map<byte[], Long> maxSeqIdInStores = r.getMaxStoreSeqIdForLogReplay();
3006     long minSeqIdForLogReplay = -1;
3007     for (Long storeSeqIdForReplay : maxSeqIdInStores.values()) {
3008       if (minSeqIdForLogReplay == -1 || storeSeqIdForReplay < minSeqIdForLogReplay) {
3009         minSeqIdForLogReplay = storeSeqIdForReplay;
3010       }
3011     }
3012 
3013     try {
3014       long lastRecordedFlushedSequenceId = -1;
3015       String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode,
3016         region.getEncodedName());
3017       // recovering-region level
3018       byte[] data;
3019       try {
3020         data = ZKUtil.getData(zkw, nodePath);
3021       } catch (InterruptedException e) {
3022         throw new InterruptedIOException();
3023       }
3024       if (data != null) {
3025       lastRecordedFlushedSequenceId = ZKSplitLog.parseLastFlushedSequenceIdFrom(data);
3026       }
3027       if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
3028         ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
3029       }
3030       if (previousRSName != null) {
3031         // one level deeper for the failed RS
3032         nodePath = ZKUtil.joinZNode(nodePath, previousRSName);
3033         ZKUtil.setData(zkw, nodePath,
3034           ZKUtil.regionSequenceIdsToByteArray(minSeqIdForLogReplay, maxSeqIdInStores));
3035         LOG.debug("Update last flushed sequence id of region " + region.getEncodedName() + " for "
3036             + previousRSName);
3037       } else {
3038         LOG.warn("Can't find failed region server for recovering region " +
3039           region.getEncodedName());
3040       }
3041     } catch (NoNodeException ignore) {
3042       LOG.debug("Region " + region.getEncodedName() +
3043         " must have completed recovery because its recovery znode has been removed", ignore);
3044     }
3045   }
3046 
3047   /**
3048    * Return the last failed RS name under /hbase/recovering-regions/encodedRegionName
3049    * @param encodedRegionName
3050    * @throws KeeperException
3051    */
3052   private String getLastFailedRSFromZK(String encodedRegionName) throws KeeperException {
3053     String result = null;
3054     long maxZxid = 0;
3055     ZooKeeperWatcher zkw = this.getZooKeeper();
3056     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
3057     List<String> failedServers = ZKUtil.listChildrenNoWatch(zkw, nodePath);
3058     if (failedServers == null || failedServers.isEmpty()) {
3059       return result;
3060     }
3061     for (String failedServer : failedServers) {
3062       String rsPath = ZKUtil.joinZNode(nodePath, failedServer);
3063       Stat stat = new Stat();
3064       ZKUtil.getDataNoWatch(zkw, rsPath, stat);
3065       if (maxZxid < stat.getCzxid()) {
3066         maxZxid = stat.getCzxid();
3067         result = failedServer;
3068       }
3069     }
3070     return result;
3071   }
3072 
3073   public CoprocessorServiceResponse execRegionServerService(final RpcController controller,
3074       final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3075     try {
3076       ServerRpcController execController = new ServerRpcController();
3077       CoprocessorServiceCall call = serviceRequest.getCall();
3078       String serviceName = call.getServiceName();
3079       String methodName = call.getMethodName();
3080       if (!coprocessorServiceHandlers.containsKey(serviceName)) {
3081         throw new UnknownProtocolException(null,
3082             "No registered coprocessor service found for name " + serviceName);
3083       }
3084       Service service = coprocessorServiceHandlers.get(serviceName);
3085       Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
3086       Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3087       if (methodDesc == null) {
3088         throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName
3089             + " called on service " + serviceName);
3090       }
3091       Message request =
3092           service.getRequestPrototype(methodDesc).newBuilderForType().mergeFrom(call.getRequest())
3093               .build();
3094       final Message.Builder responseBuilder =
3095           service.getResponsePrototype(methodDesc).newBuilderForType();
3096       service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
3097         @Override
3098         public void run(Message message) {
3099           if (message != null) {
3100             responseBuilder.mergeFrom(message);
3101           }
3102         }
3103       });
3104       Message execResult = responseBuilder.build();
3105       if (execController.getFailedOn() != null) {
3106         throw execController.getFailedOn();
3107       }
3108       ClientProtos.CoprocessorServiceResponse.Builder builder =
3109           ClientProtos.CoprocessorServiceResponse.newBuilder();
3110       builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
3111         HConstants.EMPTY_BYTE_ARRAY));
3112       builder.setValue(builder.getValueBuilder().setName(execResult.getClass().getName())
3113           .setValue(execResult.toByteString()));
3114       return builder.build();
3115     } catch (IOException ie) {
3116       throw new ServiceException(ie);
3117     }
3118   }
3119 
3120   /**
3121    * @return The cache config instance used by the regionserver.
3122    */
3123   public CacheConfig getCacheConfig() {
3124     return this.cacheConfig;
3125   }
3126 
3127   /**
3128    * @return : Returns the ConfigurationManager object for testing purposes.
3129    */
3130   protected ConfigurationManager getConfigurationManager() {
3131     return configurationManager;
3132   }
3133 
3134   /**
3135    * Reload the configuration from disk.
3136    */
3137   public void updateConfiguration() {
3138     LOG.info("Reloading the configuration from disk.");
3139     // Reload the configuration from disk.
3140     conf.reloadConfiguration();
3141     configurationManager.notifyAllObservers(conf);
3142   }
3143 }