View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.lang.Thread.UncaughtExceptionHandler;
24  import java.lang.management.ManagementFactory;
25  import java.lang.management.MemoryUsage;
26  import java.lang.reflect.Constructor;
27  import java.net.BindException;
28  import java.net.InetSocketAddress;
29  import java.util.ArrayList;
30  import java.util.Collection;
31  import java.util.Collections;
32  import java.util.Comparator;
33  import java.util.HashMap;
34  import java.util.HashSet;
35  import java.util.Iterator;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.Map.Entry;
39  import java.util.Set;
40  import java.util.SortedMap;
41  import java.util.TreeMap;
42  import java.util.TreeSet;
43  import java.util.concurrent.ConcurrentHashMap;
44  import java.util.concurrent.ConcurrentMap;
45  import java.util.concurrent.ConcurrentSkipListMap;
46  import java.util.concurrent.atomic.AtomicBoolean;
47  import java.util.concurrent.atomic.AtomicReference;
48  import java.util.concurrent.locks.ReentrantReadWriteLock;
49  import java.net.InetAddress;
50  
51  import javax.management.ObjectName;
52  import javax.servlet.http.HttpServlet;
53  
54  import org.apache.commons.lang.math.RandomUtils;
55  import org.apache.commons.logging.Log;
56  import org.apache.commons.logging.LogFactory;
57  import org.apache.hadoop.conf.Configuration;
58  import org.apache.hadoop.fs.FileSystem;
59  import org.apache.hadoop.fs.Path;
60  import org.apache.hadoop.hbase.Chore;
61  import org.apache.hadoop.hbase.ClockOutOfSyncException;
62  import org.apache.hadoop.hbase.CoordinatedStateManager;
63  import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
64  import org.apache.hadoop.hbase.HBaseConfiguration;
65  import org.apache.hadoop.hbase.HConstants;
66  import org.apache.hadoop.hbase.HRegionInfo;
67  import org.apache.hadoop.hbase.HealthCheckChore;
68  import org.apache.hadoop.hbase.MetaTableAccessor;
69  import org.apache.hadoop.hbase.NotServingRegionException;
70  import org.apache.hadoop.hbase.ServerName;
71  import org.apache.hadoop.hbase.Stoppable;
72  import org.apache.hadoop.hbase.TableDescriptors;
73  import org.apache.hadoop.hbase.TableName;
74  import org.apache.hadoop.hbase.YouAreDeadException;
75  import org.apache.hadoop.hbase.ZNodeClearer;
76  import org.apache.hadoop.hbase.classification.InterfaceAudience;
77  import org.apache.hadoop.hbase.client.ConnectionFactory;
78  import org.apache.hadoop.hbase.client.ConnectionUtils;
79  import org.apache.hadoop.hbase.conf.ConfigurationManager;
80  import org.apache.hadoop.hbase.client.ClusterConnection;
81  import org.apache.hadoop.hbase.client.ConnectionFactory;
82  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
83  import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
84  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
85  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
86  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
87  import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
88  import org.apache.hadoop.hbase.executor.ExecutorService;
89  import org.apache.hadoop.hbase.executor.ExecutorType;
90  import org.apache.hadoop.hbase.fs.HFileSystem;
91  import org.apache.hadoop.hbase.http.InfoServer;
92  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
93  import org.apache.hadoop.hbase.ipc.RpcClient;
94  import org.apache.hadoop.hbase.ipc.RpcServerInterface;
95  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
96  import org.apache.hadoop.hbase.ipc.ServerRpcController;
97  import org.apache.hadoop.hbase.master.HMaster;
98  import org.apache.hadoop.hbase.master.RegionState.State;
99  import org.apache.hadoop.hbase.master.TableLockManager;
100 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
101 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
102 import org.apache.hadoop.hbase.protobuf.RequestConverter;
103 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
104 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
105 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
106 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
107 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
108 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
109 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
110 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
111 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
112 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
113 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
114 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
115 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
116 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
117 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
118 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
119 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
120 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
121 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
122 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
123 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
124 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
125 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
126 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
127 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
128 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
129 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
130 import org.apache.hadoop.hbase.wal.WAL;
131 import org.apache.hadoop.hbase.wal.WALFactory;
132 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
133 import org.apache.hadoop.hbase.security.UserProvider;
134 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
135 import org.apache.hadoop.hbase.util.Addressing;
136 import org.apache.hadoop.hbase.util.ByteStringer;
137 import org.apache.hadoop.hbase.util.Bytes;
138 import org.apache.hadoop.hbase.util.CompressionTest;
139 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
140 import org.apache.hadoop.hbase.util.FSTableDescriptors;
141 import org.apache.hadoop.hbase.util.FSUtils;
142 import org.apache.hadoop.hbase.util.HasThread;
143 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
144 import org.apache.hadoop.hbase.util.Sleeper;
145 import org.apache.hadoop.hbase.util.Threads;
146 import org.apache.hadoop.hbase.util.VersionInfo;
147 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
148 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
149 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
150 import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
151 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
152 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
153 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
154 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
155 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
156 import org.apache.hadoop.ipc.RemoteException;
157 import org.apache.hadoop.metrics.util.MBeanUtil;
158 import org.apache.hadoop.util.ReflectionUtils;
159 import org.apache.hadoop.util.StringUtils;
160 import org.apache.zookeeper.KeeperException;
161 import org.apache.zookeeper.KeeperException.NoNodeException;
162 import org.apache.zookeeper.data.Stat;
163 
164 import com.google.common.annotations.VisibleForTesting;
165 import com.google.common.base.Preconditions;
166 import com.google.common.collect.Maps;
167 import com.google.protobuf.BlockingRpcChannel;
168 import com.google.protobuf.Descriptors;
169 import com.google.protobuf.Message;
170 import com.google.protobuf.RpcCallback;
171 import com.google.protobuf.RpcController;
172 import com.google.protobuf.Service;
173 import com.google.protobuf.ServiceException;
174 
175 /**
176  * HRegionServer makes a set of HRegions available to clients. It checks in with
177  * the HMaster. There are many HRegionServers in a single HBase deployment.
178  */
179 @InterfaceAudience.Private
180 @SuppressWarnings("deprecation")
181 public class HRegionServer extends HasThread implements
182     RegionServerServices, LastSequenceId {
183 
184   public static final Log LOG = LogFactory.getLog(HRegionServer.class);
185 
186   /**
187    * For testing only!  Set to true to skip notifying region assignment to master .
188    */
189   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
190   public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
191 
192   /*
193    * Strings to be used in forming the exception message for
194    * RegionsAlreadyInTransitionException.
195    */
196   protected static final String OPEN = "OPEN";
197   protected static final String CLOSE = "CLOSE";
198 
199   //RegionName vs current action in progress
200   //true - if open region action in progress
201   //false - if close region action in progress
202   protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
203     new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
204 
205   // Cache flushing
206   protected MemStoreFlusher cacheFlusher;
207 
208   protected HeapMemoryManager hMemManager;
209 
210   /**
211    * Cluster connection to be shared by services.
212    * Initialized at server startup and closed when server shuts down.
213    * Clients must never close it explicitly.
214    */
215   protected ClusterConnection clusterConnection;
216 
217   /*
218    * Long-living meta table locator, which is created when the server is started and stopped
219    * when server shuts down. References to this locator shall be used to perform according
220    * operations in EventHandlers. Primary reason for this decision is to make it mockable
221    * for tests.
222    */
223   protected MetaTableLocator metaTableLocator;
224 
225   // Watch if a region is out of recovering state from ZooKeeper
226   @SuppressWarnings("unused")
227   private RecoveringRegionWatcher recoveringRegionWatcher;
228 
229   /**
230    * Go here to get table descriptors.
231    */
232   protected TableDescriptors tableDescriptors;
233 
234   // Replication services. If no replication, this handler will be null.
235   protected ReplicationSourceService replicationSourceHandler;
236   protected ReplicationSinkService replicationSinkHandler;
237 
238   // Compactions
239   public CompactSplitThread compactSplitThread;
240 
241   /**
242    * Map of regions currently being served by this region server. Key is the
243    * encoded region name.  All access should be synchronized.
244    */
245   protected final Map<String, HRegion> onlineRegions =
246     new ConcurrentHashMap<String, HRegion>();
247 
248   /**
249    * Map of encoded region names to the DataNode locations they should be hosted on
250    * We store the value as InetSocketAddress since this is used only in HDFS
251    * API (create() that takes favored nodes as hints for placing file blocks).
252    * We could have used ServerName here as the value class, but we'd need to
253    * convert it to InetSocketAddress at some point before the HDFS API call, and
254    * it seems a bit weird to store ServerName since ServerName refers to RegionServers
255    * and here we really mean DataNode locations.
256    */
257   protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
258       new ConcurrentHashMap<String, InetSocketAddress[]>();
259 
260   /**
261    * Set of regions currently being in recovering state which means it can accept writes(edits from
262    * previous failed region server) but not reads. A recovering region is also an online region.
263    */
264   protected final Map<String, HRegion> recoveringRegions = Collections
265       .synchronizedMap(new HashMap<String, HRegion>());
266 
267   // Leases
268   protected Leases leases;
269 
270   // Instance of the hbase executor service.
271   protected ExecutorService service;
272 
273   // If false, the file system has become unavailable
274   protected volatile boolean fsOk;
275   protected HFileSystem fs;
276 
277   // Set when a report to the master comes back with a message asking us to
278   // shutdown. Also set by call to stop when debugging or running unit tests
279   // of HRegionServer in isolation.
280   private volatile boolean stopped = false;
281 
282   // Go down hard. Used if file system becomes unavailable and also in
283   // debugging and unit tests.
284   private volatile boolean abortRequested;
285 
286   ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
287 
288   // A state before we go into stopped state.  At this stage we're closing user
289   // space regions.
290   private boolean stopping = false;
291 
292   private volatile boolean killed = false;
293 
294   protected final Configuration conf;
295 
296   private Path rootDir;
297 
298   protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
299 
300   final int numRetries;
301   protected final int threadWakeFrequency;
302   protected final int msgInterval;
303 
304   protected final int numRegionsToReport;
305 
306   // Stub to do region server status calls against the master.
307   private volatile RegionServerStatusService.BlockingInterface rssStub;
308   // RPC client. Used to make the stub above that does region server status checking.
309   RpcClient rpcClient;
310 
311   private UncaughtExceptionHandler uncaughtExceptionHandler;
312 
313   // Info server. Default access so can be used by unit tests. REGIONSERVER
314   // is name of the webapp and the attribute name used stuffing this instance
315   // into web context.
316   protected InfoServer infoServer;
317   private JvmPauseMonitor pauseMonitor;
318 
319   /** region server process name */
320   public static final String REGIONSERVER = "regionserver";
321 
322   MetricsRegionServer metricsRegionServer;
323   private SpanReceiverHost spanReceiverHost;
324 
325   /*
326    * Check for compactions requests.
327    */
328   Chore compactionChecker;
329 
330   /*
331    * Check for flushes
332    */
333   Chore periodicFlusher;
334 
335   protected volatile WALFactory walFactory;
336 
337   // WAL roller. log is protected rather than private to avoid
338   // eclipse warning when accessed by inner classes
339   final LogRoller walRoller;
340   // Lazily initialized if this RegionServer hosts a meta table.
341   final AtomicReference<LogRoller> metawalRoller = new AtomicReference<LogRoller>();
342 
343   // flag set after we're done setting up server threads
344   final AtomicBoolean online = new AtomicBoolean(false);
345 
346   // zookeeper connection and watcher
347   protected ZooKeeperWatcher zooKeeper;
348 
349   // master address tracker
350   private MasterAddressTracker masterAddressTracker;
351 
352   // Cluster Status Tracker
353   protected ClusterStatusTracker clusterStatusTracker;
354 
355   // Log Splitting Worker
356   private SplitLogWorker splitLogWorker;
357 
358   // A sleeper that sleeps for msgInterval.
359   protected final Sleeper sleeper;
360 
361   private final int operationTimeout;
362 
363   private final RegionServerAccounting regionServerAccounting;
364 
365   // Cache configuration and block cache reference
366   protected CacheConfig cacheConfig;
367 
368   /** The health check chore. */
369   private HealthCheckChore healthCheckChore;
370 
371   /** The nonce manager chore. */
372   private Chore nonceManagerChore;
373 
374   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
375 
376   /**
377    * The server name the Master sees us as.  Its made from the hostname the
378    * master passes us, port, and server startcode. Gets set after registration
379    * against  Master.
380    */
381   protected ServerName serverName;
382 
383   /**
384    * This servers startcode.
385    */
386   protected final long startcode;
387 
388   /**
389    * Unique identifier for the cluster we are a part of.
390    */
391   private String clusterId;
392 
393   /**
394    * MX Bean for RegionServerInfo
395    */
396   private ObjectName mxBean = null;
397 
398   /**
399    * Chore to clean periodically the moved region list
400    */
401   private MovedRegionsCleaner movedRegionsCleaner;
402 
403   // chore for refreshing store files for secondary regions
404   private StorefileRefresherChore storefileRefresher;
405 
406   private RegionServerCoprocessorHost rsHost;
407 
408   private RegionServerProcedureManagerHost rspmHost;
409 
410   private RegionServerQuotaManager rsQuotaManager;
411 
412   // Table level lock manager for locking for region operations
413   protected TableLockManager tableLockManager;
414 
415   /**
416    * Nonce manager. Nonces are used to make operations like increment and append idempotent
417    * in the case where client doesn't receive the response from a successful operation and
418    * retries. We track the successful ops for some time via a nonce sent by client and handle
419    * duplicate operations (currently, by failing them; in future we might use MVCC to return
420    * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
421    * HBASE-3787) are:
422    * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
423    *   of past records. If we don't read the records, we don't read and recover the nonces.
424    *   Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
425    * - There's no WAL recovery during normal region move, so nonces will not be transfered.
426    * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
427    * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
428    * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
429    * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
430    * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
431    * latest nonce in it expired. It can also be recovered during move.
432    */
433   final ServerNonceManager nonceManager;
434 
435   private UserProvider userProvider;
436 
437   protected final RSRpcServices rpcServices;
438 
439   protected BaseCoordinatedStateManager csm;
440 
441   /**
442    * Configuration manager is used to register/deregister and notify the configuration observers
443    * when the regionserver is notified that there was a change in the on disk configs.
444    */
445   private final ConfigurationManager configurationManager;
446 
447   /**
448    * Starts a HRegionServer at the default location.
449    * @param conf
450    * @throws IOException
451    * @throws InterruptedException
452    */
453   public HRegionServer(Configuration conf) throws IOException, InterruptedException {
454     this(conf, CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
455   }
456 
457   /**
458    * Starts a HRegionServer at the default location
459    * @param conf
460    * @param csm implementation of CoordinatedStateManager to be used
461    * @throws IOException
462    */
463   public HRegionServer(Configuration conf, CoordinatedStateManager csm)
464       throws IOException {
465     this.fsOk = true;
466     this.conf = conf;
467     checkCodecs(this.conf);
468     this.userProvider = UserProvider.instantiate(conf);
469     FSUtils.setupShortCircuitRead(this.conf);
470 
471     // Config'ed params
472     this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
473         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
474     this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
475     this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
476 
477     this.sleeper = new Sleeper(this.msgInterval, this);
478 
479     boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
480     this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
481 
482     this.numRegionsToReport = conf.getInt(
483       "hbase.regionserver.numregionstoreport", 10);
484 
485     this.operationTimeout = conf.getInt(
486       HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
487       HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
488 
489     this.abortRequested = false;
490     this.stopped = false;
491 
492     rpcServices = createRpcServices();
493     this.startcode = System.currentTimeMillis();
494     String hostName = rpcServices.isa.getHostName();
495     serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode);
496 
497     // login the zookeeper client principal (if using security)
498     ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
499       "hbase.zookeeper.client.kerberos.principal", hostName);
500     // login the server principal (if using secure Hadoop)
501     login(userProvider, hostName);
502 
503     regionServerAccounting = new RegionServerAccounting();
504     uncaughtExceptionHandler = new UncaughtExceptionHandler() {
505       @Override
506       public void uncaughtException(Thread t, Throwable e) {
507         abort("Uncaught exception in service thread " + t.getName(), e);
508       }
509     };
510 
511     // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
512     // underlying hadoop hdfs accessors will be going against wrong filesystem
513     // (unless all is set to defaults).
514     FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
515     // Get fs instance used by this RS.  Do we use checksum verification in the hbase? If hbase
516     // checksum verification enabled, then automatically switch off hdfs checksum verification.
517     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
518     this.fs = new HFileSystem(this.conf, useHBaseChecksum);
519     this.rootDir = FSUtils.getRootDir(this.conf);
520     this.tableDescriptors = new FSTableDescriptors(this.conf,
521       this.fs, this.rootDir, !canUpdateTableDescriptor(), false);
522 
523     service = new ExecutorService(getServerName().toShortString());
524     spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
525 
526     // Some unit tests don't need a cluster, so no zookeeper at all
527     if (!conf.getBoolean("hbase.testing.nocluster", false)) {
528       // Open connection to zookeeper and set primary watcher
529       zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
530         rpcServices.isa.getPort(), this, canCreateBaseZNode());
531 
532       this.csm = (BaseCoordinatedStateManager) csm;
533       this.csm.initialize(this);
534       this.csm.start();
535 
536       tableLockManager = TableLockManager.createTableLockManager(
537         conf, zooKeeper, serverName);
538 
539       masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
540       masterAddressTracker.start();
541 
542       clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
543       clusterStatusTracker.start();
544     }
545     this.configurationManager = new ConfigurationManager();
546 
547     rpcServices.start();
548     putUpWebUI();
549     this.walRoller = new LogRoller(this, this);
550   }
551 
552   protected void login(UserProvider user, String host) throws IOException {
553     user.login("hbase.regionserver.keytab.file",
554       "hbase.regionserver.kerberos.principal", host);
555   }
556 
557   protected void waitForMasterActive(){
558   }
559 
560   protected String getProcessName() {
561     return REGIONSERVER;
562   }
563 
564   protected boolean canCreateBaseZNode() {
565     return false;
566   }
567 
568   protected boolean canUpdateTableDescriptor() {
569     return false;
570   }
571 
572   protected RSRpcServices createRpcServices() throws IOException {
573     return new RSRpcServices(this);
574   }
575 
576   protected void configureInfoServer() {
577     infoServer.addServlet("rs-status", "/rs-status", RSStatusServlet.class);
578     infoServer.setAttribute(REGIONSERVER, this);
579   }
580 
581   protected Class<? extends HttpServlet> getDumpServlet() {
582     return RSDumpServlet.class;
583   }
584 
585   protected void doMetrics() {
586   }
587 
588   @Override
589   public boolean registerService(Service instance) {
590     /*
591      * No stacking of instances is allowed for a single service name
592      */
593     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
594     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
595       LOG.error("Coprocessor service " + serviceDesc.getFullName()
596           + " already registered, rejecting request from " + instance);
597       return false;
598     }
599 
600     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
601     if (LOG.isDebugEnabled()) {
602       LOG.debug("Registered regionserver coprocessor service: service=" + serviceDesc.getFullName());
603     }
604     return true;
605   }
606 
607   /**
608    * Create a 'smarter' HConnection, one that is capable of by-passing RPC if the request is to
609    * the local server.  Safe to use going to local or remote server.
610    * Create this instance in a method can be intercepted and mocked in tests.
611    * @throws IOException
612    */
613   @VisibleForTesting
614   protected ClusterConnection createClusterConnection() throws IOException {
615     // Create a cluster connection that when appropriate, can short-circuit and go directly to the
616     // local server if the request is to the local server bypassing RPC. Can be used for both local
617     // and remote invocations.
618     return ConnectionUtils.createShortCircuitHConnection(
619       ConnectionFactory.createConnection(conf), serverName, rpcServices, rpcServices);
620   }
621 
622   /**
623    * Run test on configured codecs to make sure supporting libs are in place.
624    * @param c
625    * @throws IOException
626    */
627   private static void checkCodecs(final Configuration c) throws IOException {
628     // check to see if the codec list is available:
629     String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
630     if (codecs == null) return;
631     for (String codec : codecs) {
632       if (!CompressionTest.testCompression(codec)) {
633         throw new IOException("Compression codec " + codec +
634           " not supported, aborting RS construction");
635       }
636     }
637   }
638 
639   public String getClusterId() {
640     return this.clusterId;
641   }
642 
643   /**
644    * Setup our cluster connection if not already initialized.
645    * @throws IOException
646    */
647   protected synchronized void setupClusterConnection() throws IOException {
648     if (clusterConnection == null) {
649       clusterConnection = createClusterConnection();
650       metaTableLocator = new MetaTableLocator();
651     }
652   }
653 
654   /**
655    * All initialization needed before we go register with Master.
656    *
657    * @throws IOException
658    * @throws InterruptedException
659    */
660   private void preRegistrationInitialization(){
661     try {
662       setupClusterConnection();
663 
664       // Health checker thread.
665       if (isHealthCheckerConfigured()) {
666         int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
667           HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
668         healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
669       }
670       this.pauseMonitor = new JvmPauseMonitor(conf);
671       pauseMonitor.start();
672 
673       initializeZooKeeper();
674       if (!isStopped() && !isAborted()) {
675         initializeThreads();
676       }
677     } catch (Throwable t) {
678       // Call stop if error or process will stick around for ever since server
679       // puts up non-daemon threads.
680       this.rpcServices.stop();
681       abort("Initialization of RS failed.  Hence aborting RS.", t);
682     }
683   }
684 
685   /**
686    * Bring up connection to zk ensemble and then wait until a master for this
687    * cluster and then after that, wait until cluster 'up' flag has been set.
688    * This is the order in which master does things.
689    * Finally open long-living server short-circuit connection.
690    * @throws IOException
691    * @throws InterruptedException
692    */
693   private void initializeZooKeeper() throws IOException, InterruptedException {
694     // Create the master address tracker, register with zk, and start it.  Then
695     // block until a master is available.  No point in starting up if no master
696     // running.
697     blockAndCheckIfStopped(this.masterAddressTracker);
698 
699     // Wait on cluster being up.  Master will set this flag up in zookeeper
700     // when ready.
701     blockAndCheckIfStopped(this.clusterStatusTracker);
702 
703     // Retrieve clusterId
704     // Since cluster status is now up
705     // ID should have already been set by HMaster
706     try {
707       clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
708       if (clusterId == null) {
709         this.abort("Cluster ID has not been set");
710       }
711       LOG.info("ClusterId : "+clusterId);
712     } catch (KeeperException e) {
713       this.abort("Failed to retrieve Cluster ID",e);
714     }
715 
716     // In case colocated master, wait here till it's active.
717     // So backup masters won't start as regionservers.
718     // This is to avoid showing backup masters as regionservers
719     // in master web UI, or assigning any region to them.
720     waitForMasterActive();
721     if (isStopped() || isAborted()) {
722       return; // No need for further initialization
723     }
724 
725     // watch for snapshots and other procedures
726     try {
727       rspmHost = new RegionServerProcedureManagerHost();
728       rspmHost.loadProcedures(conf);
729       rspmHost.initialize(this);
730     } catch (KeeperException e) {
731       this.abort("Failed to reach zk cluster when creating procedure handler.", e);
732     }
733     // register watcher for recovering regions
734     this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
735   }
736 
737   /**
738    * Utilty method to wait indefinitely on a znode availability while checking
739    * if the region server is shut down
740    * @param tracker znode tracker to use
741    * @throws IOException any IO exception, plus if the RS is stopped
742    * @throws InterruptedException
743    */
744   private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker)
745       throws IOException, InterruptedException {
746     while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
747       if (this.stopped) {
748         throw new IOException("Received the shutdown message while waiting.");
749       }
750     }
751   }
752 
753   /**
754    * @return False if cluster shutdown in progress
755    */
756   private boolean isClusterUp() {
757     return clusterStatusTracker != null && clusterStatusTracker.isClusterUp();
758   }
759 
760   private void initializeThreads() throws IOException {
761     // Cache flushing thread.
762     this.cacheFlusher = new MemStoreFlusher(conf, this);
763 
764     // Compaction thread
765     this.compactSplitThread = new CompactSplitThread(this);
766 
767     // Background thread to check for compactions; needed if region has not gotten updates
768     // in a while. It will take care of not checking too frequently on store-by-store basis.
769     this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
770     this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
771     this.leases = new Leases(this.threadWakeFrequency);
772 
773     // Create the thread to clean the moved regions list
774     movedRegionsCleaner = MovedRegionsCleaner.createAndStart(this);
775 
776     if (this.nonceManager != null) {
777       // Create the chore that cleans up nonces.
778       nonceManagerChore = this.nonceManager.createCleanupChore(this);
779     }
780 
781     // Setup the Quota Manager
782     rsQuotaManager = new RegionServerQuotaManager(this);
783 
784     // Setup RPC client for master communication
785     rpcClient = new RpcClient(conf, clusterId, new InetSocketAddress(
786       rpcServices.isa.getAddress(), 0));
787 
788     int storefileRefreshPeriod = conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
789       , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
790     if (storefileRefreshPeriod > 0) {
791       this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, this, this);
792     }
793     registerConfigurationObservers();
794   }
795 
796   private void registerConfigurationObservers() {
797     // Registering the compactSplitThread object with the ConfigurationManager.
798     configurationManager.registerObserver(this.compactSplitThread);
799   }
800 
801   /**
802    * The HRegionServer sticks in this loop until closed.
803    */
804   @Override
805   public void run() {
806     try {
807       // Do pre-registration initializations; zookeeper, lease threads, etc.
808       preRegistrationInitialization();
809     } catch (Throwable e) {
810       abort("Fatal exception during initialization", e);
811     }
812 
813     try {
814       if (!isStopped() && !isAborted()) {
815         ShutdownHook.install(conf, fs, this, Thread.currentThread());
816         // Set our ephemeral znode up in zookeeper now we have a name.
817         createMyEphemeralNode();
818         // Initialize the RegionServerCoprocessorHost now that our ephemeral
819         // node was created, in case any coprocessors want to use ZooKeeper
820         this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
821       }
822 
823       // Try and register with the Master; tell it we are here.  Break if
824       // server is stopped or the clusterup flag is down or hdfs went wacky.
825       while (keepLooping()) {
826         RegionServerStartupResponse w = reportForDuty();
827         if (w == null) {
828           LOG.warn("reportForDuty failed; sleeping and then retrying.");
829           this.sleeper.sleep();
830         } else {
831           handleReportForDutyResponse(w);
832           break;
833         }
834       }
835 
836       if (!isStopped() && isHealthy()){
837         // start the snapshot handler and other procedure handlers,
838         // since the server is ready to run
839         rspmHost.start();
840 
841         // Start the Quota Manager
842         rsQuotaManager.start(getRpcServer().getScheduler());
843       }
844 
845       // We registered with the Master.  Go into run mode.
846       long lastMsg = System.currentTimeMillis();
847       long oldRequestCount = -1;
848       // The main run loop.
849       while (!isStopped() && isHealthy()) {
850         if (!isClusterUp()) {
851           if (isOnlineRegionsEmpty()) {
852             stop("Exiting; cluster shutdown set and not carrying any regions");
853           } else if (!this.stopping) {
854             this.stopping = true;
855             LOG.info("Closing user regions");
856             closeUserRegions(this.abortRequested);
857           } else if (this.stopping) {
858             boolean allUserRegionsOffline = areAllUserRegionsOffline();
859             if (allUserRegionsOffline) {
860               // Set stopped if no more write requests tp meta tables
861               // since last time we went around the loop.  Any open
862               // meta regions will be closed on our way out.
863               if (oldRequestCount == getWriteRequestCount()) {
864                 stop("Stopped; only catalog regions remaining online");
865                 break;
866               }
867               oldRequestCount = getWriteRequestCount();
868             } else {
869               // Make sure all regions have been closed -- some regions may
870               // have not got it because we were splitting at the time of
871               // the call to closeUserRegions.
872               closeUserRegions(this.abortRequested);
873             }
874             LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
875           }
876         }
877         long now = System.currentTimeMillis();
878         if ((now - lastMsg) >= msgInterval) {
879           tryRegionServerReport(lastMsg, now);
880           lastMsg = System.currentTimeMillis();
881           doMetrics();
882         }
883         if (!isStopped() && !isAborted()) {
884           this.sleeper.sleep();
885         }
886       } // for
887     } catch (Throwable t) {
888       if (!rpcServices.checkOOME(t)) {
889         String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
890         abort(prefix + t.getMessage(), t);
891       }
892     }
893     // Run shutdown.
894     if (mxBean != null) {
895       MBeanUtil.unregisterMBean(mxBean);
896       mxBean = null;
897     }
898     if (this.leases != null) this.leases.closeAfterLeasesExpire();
899     if (this.splitLogWorker != null) {
900       splitLogWorker.stop();
901     }
902     if (this.infoServer != null) {
903       LOG.info("Stopping infoServer");
904       try {
905         this.infoServer.stop();
906       } catch (Exception e) {
907         LOG.error("Failed to stop infoServer", e);
908       }
909     }
910     // Send cache a shutdown.
911     if (cacheConfig != null && cacheConfig.isBlockCacheEnabled()) {
912       cacheConfig.getBlockCache().shutdown();
913     }
914 
915     if (movedRegionsCleaner != null) {
916       movedRegionsCleaner.stop("Region Server stopping");
917     }
918 
919     // Send interrupts to wake up threads if sleeping so they notice shutdown.
920     // TODO: Should we check they are alive? If OOME could have exited already
921     if(this.hMemManager != null) this.hMemManager.stop();
922     if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
923     if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
924     if (this.compactionChecker != null)
925       this.compactionChecker.interrupt();
926     if (this.healthCheckChore != null) {
927       this.healthCheckChore.interrupt();
928     }
929     if (this.nonceManagerChore != null) {
930       this.nonceManagerChore.interrupt();
931     }
932     if (this.storefileRefresher != null) {
933       this.storefileRefresher.interrupt();
934     }
935 
936     // Stop the quota manager
937     if (rsQuotaManager != null) {
938       rsQuotaManager.stop();
939     }
940 
941     // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
942     if (rspmHost != null) {
943       rspmHost.stop(this.abortRequested || this.killed);
944     }
945 
946     if (this.killed) {
947       // Just skip out w/o closing regions.  Used when testing.
948     } else if (abortRequested) {
949       if (this.fsOk) {
950         closeUserRegions(abortRequested); // Don't leave any open file handles
951       }
952       LOG.info("aborting server " + this.serverName);
953     } else {
954       closeUserRegions(abortRequested);
955       LOG.info("stopping server " + this.serverName);
956     }
957 
958     // so callers waiting for meta without timeout can stop
959     if (this.metaTableLocator != null) this.metaTableLocator.stop();
960     if (this.clusterConnection != null && !clusterConnection.isClosed()) {
961       try {
962         this.clusterConnection.close();
963       } catch (IOException e) {
964         // Although the {@link Closeable} interface throws an {@link
965         // IOException}, in reality, the implementation would never do that.
966         LOG.warn("Attempt to close server's short circuit HConnection failed.", e);
967       }
968     }
969 
970     // Closing the compactSplit thread before closing meta regions
971     if (!this.killed && containsMetaTableRegions()) {
972       if (!abortRequested || this.fsOk) {
973         if (this.compactSplitThread != null) {
974           this.compactSplitThread.join();
975           this.compactSplitThread = null;
976         }
977         closeMetaTableRegions(abortRequested);
978       }
979     }
980 
981     if (!this.killed && this.fsOk) {
982       waitOnAllRegionsToClose(abortRequested);
983       LOG.info("stopping server " + this.serverName +
984         "; all regions closed.");
985     }
986 
987     //fsOk flag may be changed when closing regions throws exception.
988     if (this.fsOk) {
989       shutdownWAL(!abortRequested);
990     }
991 
992     // Make sure the proxy is down.
993     if (this.rssStub != null) {
994       this.rssStub = null;
995     }
996     if (this.rpcClient != null) {
997       this.rpcClient.stop();
998     }
999     if (this.leases != null) {
1000       this.leases.close();
1001     }
1002     if (this.pauseMonitor != null) {
1003       this.pauseMonitor.stop();
1004     }
1005 
1006     if (!killed) {
1007       stopServiceThreads();
1008     }
1009 
1010     if (this.rpcServices != null) {
1011       this.rpcServices.stop();
1012     }
1013 
1014     try {
1015       deleteMyEphemeralNode();
1016     } catch (KeeperException.NoNodeException nn) {
1017     } catch (KeeperException e) {
1018       LOG.warn("Failed deleting my ephemeral node", e);
1019     }
1020     // We may have failed to delete the znode at the previous step, but
1021     //  we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1022     ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1023 
1024     if (this.zooKeeper != null) {
1025       this.zooKeeper.close();
1026     }
1027     LOG.info("stopping server " + this.serverName +
1028       "; zookeeper connection closed.");
1029 
1030     LOG.info(Thread.currentThread().getName() + " exiting");
1031   }
1032 
1033   private boolean containsMetaTableRegions() {
1034     return onlineRegions.containsKey(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1035   }
1036 
1037   private boolean areAllUserRegionsOffline() {
1038     if (getNumberOfOnlineRegions() > 2) return false;
1039     boolean allUserRegionsOffline = true;
1040     for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1041       if (!e.getValue().getRegionInfo().isMetaTable()) {
1042         allUserRegionsOffline = false;
1043         break;
1044       }
1045     }
1046     return allUserRegionsOffline;
1047   }
1048 
1049   /**
1050    * @return Current write count for all online regions.
1051    */
1052   private long getWriteRequestCount() {
1053     int writeCount = 0;
1054     for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1055       writeCount += e.getValue().getWriteRequestsCount();
1056     }
1057     return writeCount;
1058   }
1059 
1060   @VisibleForTesting
1061   protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1062   throws IOException {
1063     RegionServerStatusService.BlockingInterface rss = rssStub;
1064     if (rss == null) {
1065       // the current server could be stopping.
1066       return;
1067     }
1068     ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1069     try {
1070       RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1071       ServerName sn = ServerName.parseVersionedServerName(
1072         this.serverName.getVersionedBytes());
1073       request.setServer(ProtobufUtil.toServerName(sn));
1074       request.setLoad(sl);
1075       rss.regionServerReport(null, request.build());
1076     } catch (ServiceException se) {
1077       IOException ioe = ProtobufUtil.getRemoteException(se);
1078       if (ioe instanceof YouAreDeadException) {
1079         // This will be caught and handled as a fatal error in run()
1080         throw ioe;
1081       }
1082       if (rssStub == rss) {
1083         rssStub = null;
1084       }
1085       // Couldn't connect to the master, get location from zk and reconnect
1086       // Method blocks until new master is found or we are stopped
1087       createRegionServerStatusStub();
1088     }
1089   }
1090 
1091   ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1092       throws IOException {
1093     // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1094     // per second, and other metrics  As long as metrics are part of ServerLoad it's best to use
1095     // the wrapper to compute those numbers in one place.
1096     // In the long term most of these should be moved off of ServerLoad and the heart beat.
1097     // Instead they should be stored in an HBase table so that external visibility into HBase is
1098     // improved; Additionally the load balancer will be able to take advantage of a more complete
1099     // history.
1100     MetricsRegionServerWrapper regionServerWrapper = this.metricsRegionServer.getRegionServerWrapper();
1101     Collection<HRegion> regions = getOnlineRegionsLocalContext();
1102     MemoryUsage memory =
1103       ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
1104 
1105     ClusterStatusProtos.ServerLoad.Builder serverLoad =
1106       ClusterStatusProtos.ServerLoad.newBuilder();
1107     serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1108     serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
1109     serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024));
1110     serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024));
1111     Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1112     for (String coprocessor : coprocessors) {
1113       serverLoad.addCoprocessors(
1114         Coprocessor.newBuilder().setName(coprocessor).build());
1115     }
1116     RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1117     RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1118     for (HRegion region : regions) {
1119       serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1120       for (String coprocessor :
1121           getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()) {
1122         serverLoad.addCoprocessors(Coprocessor.newBuilder().setName(coprocessor).build());
1123       }
1124     }
1125     serverLoad.setReportStartTime(reportStartTime);
1126     serverLoad.setReportEndTime(reportEndTime);
1127     if (this.infoServer != null) {
1128       serverLoad.setInfoServerPort(this.infoServer.getPort());
1129     } else {
1130       serverLoad.setInfoServerPort(-1);
1131     }
1132     return serverLoad.build();
1133   }
1134 
1135   String getOnlineRegionsAsPrintableString() {
1136     StringBuilder sb = new StringBuilder();
1137     for (HRegion r: this.onlineRegions.values()) {
1138       if (sb.length() > 0) sb.append(", ");
1139       sb.append(r.getRegionInfo().getEncodedName());
1140     }
1141     return sb.toString();
1142   }
1143 
1144   /**
1145    * Wait on regions close.
1146    */
1147   private void waitOnAllRegionsToClose(final boolean abort) {
1148     // Wait till all regions are closed before going out.
1149     int lastCount = -1;
1150     long previousLogTime = 0;
1151     Set<String> closedRegions = new HashSet<String>();
1152     boolean interrupted = false;
1153     try {
1154       while (!isOnlineRegionsEmpty()) {
1155         int count = getNumberOfOnlineRegions();
1156         // Only print a message if the count of regions has changed.
1157         if (count != lastCount) {
1158           // Log every second at most
1159           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1160             previousLogTime = System.currentTimeMillis();
1161             lastCount = count;
1162             LOG.info("Waiting on " + count + " regions to close");
1163             // Only print out regions still closing if a small number else will
1164             // swamp the log.
1165             if (count < 10 && LOG.isDebugEnabled()) {
1166               LOG.debug(this.onlineRegions);
1167             }
1168           }
1169         }
1170         // Ensure all user regions have been sent a close. Use this to
1171         // protect against the case where an open comes in after we start the
1172         // iterator of onlineRegions to close all user regions.
1173         for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1174           HRegionInfo hri = e.getValue().getRegionInfo();
1175           if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1176               && !closedRegions.contains(hri.getEncodedName())) {
1177             closedRegions.add(hri.getEncodedName());
1178             // Don't update zk with this close transition; pass false.
1179             closeRegionIgnoreErrors(hri, abort);
1180               }
1181         }
1182         // No regions in RIT, we could stop waiting now.
1183         if (this.regionsInTransitionInRS.isEmpty()) {
1184           if (!isOnlineRegionsEmpty()) {
1185             LOG.info("We were exiting though online regions are not empty," +
1186                 " because some regions failed closing");
1187           }
1188           break;
1189         }
1190         if (sleep(200)) {
1191           interrupted = true;
1192         }
1193       }
1194     } finally {
1195       if (interrupted) {
1196         Thread.currentThread().interrupt();
1197       }
1198     }
1199   }
1200 
1201   private boolean sleep(long millis) {
1202     boolean interrupted = false;
1203     try {
1204       Thread.sleep(millis);
1205     } catch (InterruptedException e) {
1206       LOG.warn("Interrupted while sleeping");
1207       interrupted = true;
1208     }
1209     return interrupted;
1210   }
1211 
1212   private void shutdownWAL(final boolean close) {
1213     if (this.walFactory != null) {
1214       try {
1215         if (close) {
1216           walFactory.close();
1217         } else {
1218           walFactory.shutdown();
1219         }
1220       } catch (Throwable e) {
1221         e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1222         LOG.error("Shutdown / close of WAL failed: " + e);
1223         LOG.debug("Shutdown / close exception details:", e);
1224       }
1225     }
1226   }
1227 
1228   /*
1229    * Run init. Sets up wal and starts up all server threads.
1230    *
1231    * @param c Extra configuration.
1232    */
1233   protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1234   throws IOException {
1235     try {
1236       for (NameStringPair e : c.getMapEntriesList()) {
1237         String key = e.getName();
1238         // The hostname the master sees us as.
1239         if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1240           String hostnameFromMasterPOV = e.getValue();
1241           this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1242             rpcServices.isa.getPort(), this.startcode);
1243           if (!hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) {
1244             LOG.info("Master passed us a different hostname to use; was=" +
1245               rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV);
1246           }
1247           continue;
1248         }
1249         String value = e.getValue();
1250         if (LOG.isDebugEnabled()) {
1251           LOG.info("Config from master: " + key + "=" + value);
1252         }
1253         this.conf.set(key, value);
1254       }
1255 
1256       // hack! Maps DFSClient => RegionServer for logs.  HDFS made this
1257       // config param for task trackers, but we can piggyback off of it.
1258       if (this.conf.get("mapreduce.task.attempt.id") == null) {
1259         this.conf.set("mapreduce.task.attempt.id", "hb_rs_" +
1260           this.serverName.toString());
1261       }
1262 
1263       // Save it in a file, this will allow to see if we crash
1264       ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1265 
1266       this.cacheConfig = new CacheConfig(conf);
1267       this.walFactory = setupWALAndReplication();
1268       // Init in here rather than in constructor after thread name has been set
1269       this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
1270 
1271       startServiceThreads();
1272       startHeapMemoryManager();
1273       LOG.info("Serving as " + this.serverName +
1274         ", RpcServer on " + rpcServices.isa +
1275         ", sessionid=0x" +
1276         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1277 
1278       // Wake up anyone waiting for this server to online
1279       synchronized (online) {
1280         online.set(true);
1281         online.notifyAll();
1282       }
1283     } catch (Throwable e) {
1284       stop("Failed initialization");
1285       throw convertThrowableToIOE(cleanup(e, "Failed init"),
1286           "Region server startup failed");
1287     } finally {
1288       sleeper.skipSleepCycle();
1289     }
1290   }
1291 
1292   private void startHeapMemoryManager() {
1293     this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this);
1294     if (this.hMemManager != null) {
1295       this.hMemManager.start();
1296     }
1297   }
1298 
1299   private void createMyEphemeralNode() throws KeeperException, IOException {
1300     RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1301     rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1302     byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1303     ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper,
1304       getMyEphemeralNodePath(), data);
1305   }
1306 
1307   private void deleteMyEphemeralNode() throws KeeperException {
1308     ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1309   }
1310 
1311   @Override
1312   public RegionServerAccounting getRegionServerAccounting() {
1313     return regionServerAccounting;
1314   }
1315 
1316   @Override
1317   public TableLockManager getTableLockManager() {
1318     return tableLockManager;
1319   }
1320 
1321   /*
1322    * @param r Region to get RegionLoad for.
1323    * @param regionLoadBldr the RegionLoad.Builder, can be null
1324    * @param regionSpecifier the RegionSpecifier.Builder, can be null
1325    * @return RegionLoad instance.
1326    *
1327    * @throws IOException
1328    */
1329   private RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1330       RegionSpecifier.Builder regionSpecifier) {
1331     byte[] name = r.getRegionName();
1332     int stores = 0;
1333     int storefiles = 0;
1334     int storeUncompressedSizeMB = 0;
1335     int storefileSizeMB = 0;
1336     int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
1337     int storefileIndexSizeMB = 0;
1338     int rootIndexSizeKB = 0;
1339     int totalStaticIndexSizeKB = 0;
1340     int totalStaticBloomSizeKB = 0;
1341     long totalCompactingKVs = 0;
1342     long currentCompactedKVs = 0;
1343     synchronized (r.stores) {
1344       stores += r.stores.size();
1345       for (Store store : r.stores.values()) {
1346         storefiles += store.getStorefilesCount();
1347         storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed()
1348             / 1024 / 1024);
1349         storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1350         storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
1351         CompactionProgress progress = store.getCompactionProgress();
1352         if (progress != null) {
1353           totalCompactingKVs += progress.totalCompactingKVs;
1354           currentCompactedKVs += progress.currentCompactedKVs;
1355         }
1356 
1357         rootIndexSizeKB +=
1358             (int) (store.getStorefilesIndexSize() / 1024);
1359 
1360         totalStaticIndexSizeKB +=
1361           (int) (store.getTotalStaticIndexSize() / 1024);
1362 
1363         totalStaticBloomSizeKB +=
1364           (int) (store.getTotalStaticBloomSize() / 1024);
1365       }
1366     }
1367     float dataLocality =
1368         r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname());
1369     if (regionLoadBldr == null) {
1370       regionLoadBldr = RegionLoad.newBuilder();
1371     }
1372     if (regionSpecifier == null) {
1373       regionSpecifier = RegionSpecifier.newBuilder();
1374     }
1375     regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1376     regionSpecifier.setValue(ByteStringer.wrap(name));
1377     regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1378       .setStores(stores)
1379       .setStorefiles(storefiles)
1380       .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1381       .setStorefileSizeMB(storefileSizeMB)
1382       .setMemstoreSizeMB(memstoreSizeMB)
1383       .setStorefileIndexSizeMB(storefileIndexSizeMB)
1384       .setRootIndexSizeKB(rootIndexSizeKB)
1385       .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1386       .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1387       .setReadRequestsCount(r.readRequestsCount.get())
1388       .setWriteRequestsCount(r.writeRequestsCount.get())
1389       .setTotalCompactingKVs(totalCompactingKVs)
1390       .setCurrentCompactedKVs(currentCompactedKVs)
1391       .setCompleteSequenceId(r.lastFlushSeqId)
1392       .setDataLocality(dataLocality);
1393 
1394     return regionLoadBldr.build();
1395   }
1396 
1397   /**
1398    * @param encodedRegionName
1399    * @return An instance of RegionLoad.
1400    */
1401   public RegionLoad createRegionLoad(final String encodedRegionName) {
1402     HRegion r = null;
1403     r = this.onlineRegions.get(encodedRegionName);
1404     return r != null ? createRegionLoad(r, null, null) : null;
1405   }
1406 
1407   /*
1408    * Inner class that runs on a long period checking if regions need compaction.
1409    */
1410   private static class CompactionChecker extends Chore {
1411     private final HRegionServer instance;
1412     private final int majorCompactPriority;
1413     private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1414     private long iteration = 0;
1415 
1416     CompactionChecker(final HRegionServer h, final int sleepTime,
1417         final Stoppable stopper) {
1418       super("CompactionChecker", sleepTime, h);
1419       this.instance = h;
1420       LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));
1421 
1422       /* MajorCompactPriority is configurable.
1423        * If not set, the compaction will use default priority.
1424        */
1425       this.majorCompactPriority = this.instance.conf.
1426         getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1427         DEFAULT_PRIORITY);
1428     }
1429 
1430     @Override
1431     protected void chore() {
1432       for (HRegion r : this.instance.onlineRegions.values()) {
1433         if (r == null)
1434           continue;
1435         for (Store s : r.getStores().values()) {
1436           try {
1437             long multiplier = s.getCompactionCheckMultiplier();
1438             assert multiplier > 0;
1439             if (iteration % multiplier != 0) continue;
1440             if (s.needsCompaction()) {
1441               // Queue a compaction. Will recognize if major is needed.
1442               this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
1443                   + " requests compaction");
1444             } else if (s.isMajorCompaction()) {
1445               if (majorCompactPriority == DEFAULT_PRIORITY
1446                   || majorCompactPriority > r.getCompactPriority()) {
1447                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1448                     + " requests major compaction; use default priority", null);
1449               } else {
1450                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1451                     + " requests major compaction; use configured priority",
1452                   this.majorCompactPriority, null);
1453               }
1454             }
1455           } catch (IOException e) {
1456             LOG.warn("Failed major compaction check on " + r, e);
1457           }
1458         }
1459       }
1460       iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1461     }
1462   }
1463 
1464   class PeriodicMemstoreFlusher extends Chore {
1465     final HRegionServer server;
1466     final static int RANGE_OF_DELAY = 20000; //millisec
1467     final static int MIN_DELAY_TIME = 3000; //millisec
1468     public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1469       super(server.getServerName() + "-MemstoreFlusherChore", cacheFlushInterval, server);
1470       this.server = server;
1471     }
1472 
1473     @Override
1474     protected void chore() {
1475       for (HRegion r : this.server.onlineRegions.values()) {
1476         if (r == null)
1477           continue;
1478         if (r.shouldFlush()) {
1479           FlushRequester requester = server.getFlushRequester();
1480           if (requester != null) {
1481             long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
1482             LOG.info(getName() + " requesting flush for region " + r.getRegionNameAsString() +
1483                 " after a delay of " + randomDelay);
1484             //Throttle the flushes by putting a delay. If we don't throttle, and there
1485             //is a balanced write-load on the regions in a table, we might end up
1486             //overwhelming the filesystem with too many flushes at once.
1487             requester.requestDelayedFlush(r, randomDelay);
1488           }
1489         }
1490       }
1491     }
1492   }
1493 
1494   /**
1495    * Report the status of the server. A server is online once all the startup is
1496    * completed (setting up filesystem, starting service threads, etc.). This
1497    * method is designed mostly to be useful in tests.
1498    *
1499    * @return true if online, false if not.
1500    */
1501   public boolean isOnline() {
1502     return online.get();
1503   }
1504 
1505   /**
1506    * Setup WAL log and replication if enabled.
1507    * Replication setup is done in here because it wants to be hooked up to WAL.
1508    * @return A WAL instance.
1509    * @throws IOException
1510    */
1511   private WALFactory setupWALAndReplication() throws IOException {
1512     // TODO Replication make assumptions here based on the default filesystem impl
1513     final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1514     final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString());
1515 
1516     Path logdir = new Path(rootDir, logName);
1517     if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1518     if (this.fs.exists(logdir)) {
1519       throw new RegionServerRunningException("Region server has already " +
1520         "created directory at " + this.serverName.toString());
1521     }
1522 
1523     // Instantiate replication manager if replication enabled.  Pass it the
1524     // log directories.
1525     createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
1526 
1527     // listeners the wal factory will add to wals it creates.
1528     final List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1529     listeners.add(new MetricsWAL());
1530     if (this.replicationSourceHandler != null &&
1531         this.replicationSourceHandler.getWALActionsListener() != null) {
1532       // Replication handler is an implementation of WALActionsListener.
1533       listeners.add(this.replicationSourceHandler.getWALActionsListener());
1534     }
1535 
1536     return new WALFactory(conf, listeners, serverName.toString());
1537   }
1538 
1539   /**
1540    * We initialize the roller for the wal that handles meta lazily
1541    * since we don't know if this regionserver will handle it. All calls to
1542    * this method return a reference to the that same roller. As newly referenced
1543    * meta regions are brought online, they will be offered to the roller for maintenance.
1544    * As a part of that registration process, the roller will add itself as a
1545    * listener on the wal.
1546    */
1547   protected LogRoller ensureMetaWALRoller() {
1548     // Using a tmp log roller to ensure metaLogRoller is alive once it is not
1549     // null
1550     LogRoller roller = metawalRoller.get();
1551     if (null == roller) {
1552       LogRoller tmpLogRoller = new LogRoller(this, this);
1553       String n = Thread.currentThread().getName();
1554       Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
1555           n + "-MetaLogRoller", uncaughtExceptionHandler);
1556       if (metawalRoller.compareAndSet(null, tmpLogRoller)) {
1557         roller = tmpLogRoller;
1558       } else {
1559         // Another thread won starting the roller
1560         Threads.shutdown(tmpLogRoller.getThread());
1561         roller = metawalRoller.get();
1562       }
1563     }
1564     return roller;
1565   }
1566 
1567   public MetricsRegionServer getRegionServerMetrics() {
1568     return this.metricsRegionServer;
1569   }
1570 
1571   /**
1572    * @return Master address tracker instance.
1573    */
1574   public MasterAddressTracker getMasterAddressTracker() {
1575     return this.masterAddressTracker;
1576   }
1577 
1578   /*
1579    * Start maintenance Threads, Server, Worker and lease checker threads.
1580    * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1581    * get an unhandled exception. We cannot set the handler on all threads.
1582    * Server's internal Listener thread is off limits. For Server, if an OOME, it
1583    * waits a while then retries. Meantime, a flush or a compaction that tries to
1584    * run should trigger same critical condition and the shutdown will run. On
1585    * its way out, this server will shut down Server. Leases are sort of
1586    * inbetween. It has an internal thread that while it inherits from Chore, it
1587    * keeps its own internal stop mechanism so needs to be stopped by this
1588    * hosting server. Worker logs the exception and exits.
1589    */
1590   private void startServiceThreads() throws IOException {
1591     // Start executor services
1592     this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
1593       conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1594     this.service.startExecutorService(ExecutorType.RS_OPEN_META,
1595       conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1596     this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1597       conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1598     this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
1599       conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1600     if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1601       this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
1602         conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
1603     }
1604     this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
1605        "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
1606 
1607     Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
1608         uncaughtExceptionHandler);
1609     this.cacheFlusher.start(uncaughtExceptionHandler);
1610     Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), getName() +
1611       ".compactionChecker", uncaughtExceptionHandler);
1612     Threads.setDaemonThreadRunning(this.periodicFlusher.getThread(), getName() +
1613         ".periodicFlusher", uncaughtExceptionHandler);
1614     if (this.healthCheckChore != null) {
1615       Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), getName() + ".healthChecker",
1616             uncaughtExceptionHandler);
1617     }
1618     if (this.nonceManagerChore != null) {
1619       Threads.setDaemonThreadRunning(this.nonceManagerChore.getThread(), getName() + ".nonceCleaner",
1620             uncaughtExceptionHandler);
1621     }
1622     if (this.storefileRefresher != null) {
1623       Threads.setDaemonThreadRunning(this.storefileRefresher.getThread(), getName() + ".storefileRefresher",
1624             uncaughtExceptionHandler);
1625     }
1626 
1627     // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1628     // an unhandled exception, it will just exit.
1629     this.leases.setName(getName() + ".leaseChecker");
1630     this.leases.start();
1631 
1632     if (this.replicationSourceHandler == this.replicationSinkHandler &&
1633         this.replicationSourceHandler != null) {
1634       this.replicationSourceHandler.startReplicationService();
1635     } else {
1636       if (this.replicationSourceHandler != null) {
1637         this.replicationSourceHandler.startReplicationService();
1638       }
1639       if (this.replicationSinkHandler != null) {
1640         this.replicationSinkHandler.startReplicationService();
1641       }
1642     }
1643 
1644     // Create the log splitting worker and start it
1645     // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
1646     // quite a while inside HConnection layer. The worker won't be available for other
1647     // tasks even after current task is preempted after a split task times out.
1648     Configuration sinkConf = HBaseConfiguration.create(conf);
1649     sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1650       conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
1651     sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1652       conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
1653     sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
1654     this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory);
1655     splitLogWorker.start();
1656   }
1657 
1658   /**
1659    * Puts up the webui.
1660    * @return Returns final port -- maybe different from what we started with.
1661    * @throws IOException
1662    */
1663   private int putUpWebUI() throws IOException {
1664     int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
1665       HConstants.DEFAULT_REGIONSERVER_INFOPORT);
1666     // -1 is for disabling info server
1667     if (port < 0) return port;
1668     String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
1669     if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
1670       String msg =
1671           "Failed to start http info server. Address " + addr
1672               + " does not belong to this host. Correct configuration parameter: "
1673               + "hbase.regionserver.info.bindAddress";
1674       LOG.error(msg);
1675       throw new IOException(msg);
1676     }
1677     // check if auto port bind enabled
1678     boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
1679         false);
1680     while (true) {
1681       try {
1682         this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
1683         infoServer.addServlet("dump", "/dump", getDumpServlet());
1684         configureInfoServer();
1685         this.infoServer.start();
1686         break;
1687       } catch (BindException e) {
1688         if (!auto) {
1689           // auto bind disabled throw BindException
1690           LOG.error("Failed binding http info server to port: " + port);
1691           throw e;
1692         }
1693         // auto bind enabled, try to use another port
1694         LOG.info("Failed binding http info server to port: " + port);
1695         port++;
1696       }
1697     }
1698     port = this.infoServer.getPort();
1699     conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
1700     int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT,
1701       HConstants.DEFAULT_MASTER_INFOPORT);
1702     conf.setInt("hbase.master.info.port.orig", masterInfoPort);
1703     conf.setInt(HConstants.MASTER_INFO_PORT, port);
1704     return port;
1705   }
1706 
1707   /*
1708    * Verify that server is healthy
1709    */
1710   private boolean isHealthy() {
1711     if (!fsOk) {
1712       // File system problem
1713       return false;
1714     }
1715     // Verify that all threads are alive
1716     if (!(leases.isAlive()
1717         && cacheFlusher.isAlive() && walRoller.isAlive()
1718         && this.compactionChecker.isAlive()
1719         && this.periodicFlusher.isAlive())) {
1720       stop("One or more threads are no longer alive -- stop");
1721       return false;
1722     }
1723     final LogRoller metawalRoller = this.metawalRoller.get();
1724     if (metawalRoller != null && !metawalRoller.isAlive()) {
1725       stop("Meta WAL roller thread is no longer alive -- stop");
1726       return false;
1727     }
1728     return true;
1729   }
1730 
1731   private static final byte[] UNSPECIFIED_REGION = new byte[]{};
1732 
1733   @Override
1734   public WAL getWAL(HRegionInfo regionInfo) throws IOException {
1735     WAL wal;
1736     LogRoller roller = walRoller;
1737     //_ROOT_ and hbase:meta regions have separate WAL.
1738     if (regionInfo != null && regionInfo.isMetaTable()) {
1739       roller = ensureMetaWALRoller();
1740       wal = walFactory.getMetaWAL(regionInfo.getEncodedNameAsBytes());
1741     } else if (regionInfo == null) {
1742       wal = walFactory.getWAL(UNSPECIFIED_REGION);
1743     } else {
1744       wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes());
1745     }
1746     roller.addWAL(wal);
1747     return wal;
1748   }
1749 
1750   @Override
1751   public ClusterConnection getConnection() {
1752     return this.clusterConnection;
1753   }
1754 
1755   @Override
1756   public MetaTableLocator getMetaTableLocator() {
1757     return this.metaTableLocator;
1758   }
1759 
1760   @Override
1761   public void stop(final String msg) {
1762     if (!this.stopped) {
1763       try {
1764         if (this.rsHost != null) {
1765           this.rsHost.preStop(msg);
1766         }
1767         this.stopped = true;
1768         LOG.info("STOPPED: " + msg);
1769         // Wakes run() if it is sleeping
1770         sleeper.skipSleepCycle();
1771       } catch (IOException exp) {
1772         LOG.warn("The region server did not stop", exp);
1773       }
1774     }
1775   }
1776 
1777   public void waitForServerOnline(){
1778     while (!isStopped() && !isOnline()) {
1779       synchronized (online) {
1780         try {
1781           online.wait(msgInterval);
1782         } catch (InterruptedException ie) {
1783           Thread.currentThread().interrupt();
1784           break;
1785         }
1786       }
1787     }
1788   }
1789 
1790   @Override
1791   public void postOpenDeployTasks(final HRegion r)
1792   throws KeeperException, IOException {
1793     rpcServices.checkOpen();
1794     LOG.info("Post open deploy tasks for " + r.getRegionNameAsString());
1795     // Do checks to see if we need to compact (references or too many files)
1796     for (Store s : r.getStores().values()) {
1797       if (s.hasReferences() || s.needsCompaction()) {
1798        this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
1799       }
1800     }
1801     long openSeqNum = r.getOpenSeqNum();
1802     if (openSeqNum == HConstants.NO_SEQNUM) {
1803       // If we opened a region, we should have read some sequence number from it.
1804       LOG.error("No sequence number found when opening " + r.getRegionNameAsString());
1805       openSeqNum = 0;
1806     }
1807 
1808     // Update flushed sequence id of a recovering region in ZK
1809     updateRecoveringRegionLastFlushedSequenceId(r);
1810 
1811     // Notify master
1812     if (!reportRegionStateTransition(
1813         TransitionCode.OPENED, openSeqNum, r.getRegionInfo())) {
1814       throw new IOException("Failed to report opened region to master: "
1815         + r.getRegionNameAsString());
1816     }
1817 
1818     LOG.debug("Finished post open deploy task for " + r.getRegionNameAsString());
1819   }
1820 
1821   @Override
1822   public boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris) {
1823     return reportRegionStateTransition(code, HConstants.NO_SEQNUM, hris);
1824   }
1825 
1826   @Override
1827   public boolean reportRegionStateTransition(
1828       TransitionCode code, long openSeqNum, HRegionInfo... hris) {
1829     if (TEST_SKIP_REPORTING_TRANSITION) {
1830       // This is for testing only in case there is no master
1831       // to handle the region transition report at all.
1832       if (code == TransitionCode.OPENED) {
1833         Preconditions.checkArgument(hris != null && hris.length == 1);
1834         if (hris[0].isMetaRegion()) {
1835           try {
1836             MetaTableLocator.setMetaLocation(getZooKeeper(), serverName, State.OPEN);
1837           } catch (KeeperException e) {
1838             LOG.info("Failed to update meta location", e);
1839             return false;
1840           }
1841         } else {
1842           try {
1843             MetaTableAccessor.updateRegionLocation(clusterConnection,
1844               hris[0], serverName, openSeqNum);
1845           } catch (IOException e) {
1846             LOG.info("Failed to update meta", e);
1847             return false;
1848           }
1849         }
1850       }
1851       return true;
1852     }
1853 
1854     ReportRegionStateTransitionRequest.Builder builder =
1855       ReportRegionStateTransitionRequest.newBuilder();
1856     builder.setServer(ProtobufUtil.toServerName(serverName));
1857     RegionStateTransition.Builder transition = builder.addTransitionBuilder();
1858     transition.setTransitionCode(code);
1859     if (code == TransitionCode.OPENED && openSeqNum >= 0) {
1860       transition.setOpenSeqNum(openSeqNum);
1861     }
1862     for (HRegionInfo hri: hris) {
1863       transition.addRegionInfo(HRegionInfo.convert(hri));
1864     }
1865     ReportRegionStateTransitionRequest request = builder.build();
1866     while (keepLooping()) {
1867       RegionServerStatusService.BlockingInterface rss = rssStub;
1868       try {
1869         if (rss == null) {
1870           createRegionServerStatusStub();
1871           continue;
1872         }
1873         ReportRegionStateTransitionResponse response =
1874           rss.reportRegionStateTransition(null, request);
1875         if (response.hasErrorMessage()) {
1876           LOG.info("Failed to transition " + hris[0]
1877             + " to " + code + ": " + response.getErrorMessage());
1878           return false;
1879         }
1880         return true;
1881       } catch (ServiceException se) {
1882         IOException ioe = ProtobufUtil.getRemoteException(se);
1883         LOG.info("Failed to report region transition, will retry", ioe);
1884         if (rssStub == rss) {
1885           rssStub = null;
1886         }
1887       }
1888     }
1889     return false;
1890   }
1891 
1892   @Override
1893   public RpcServerInterface getRpcServer() {
1894     return rpcServices.rpcServer;
1895   }
1896 
1897   @VisibleForTesting
1898   public RSRpcServices getRSRpcServices() {
1899     return rpcServices;
1900   }
1901 
1902   /**
1903    * Cause the server to exit without closing the regions it is serving, the log
1904    * it is using and without notifying the master. Used unit testing and on
1905    * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
1906    *
1907    * @param reason
1908    *          the reason we are aborting
1909    * @param cause
1910    *          the exception that caused the abort, or null
1911    */
1912   @Override
1913   public void abort(String reason, Throwable cause) {
1914     String msg = "ABORTING region server " + this + ": " + reason;
1915     if (cause != null) {
1916       LOG.fatal(msg, cause);
1917     } else {
1918       LOG.fatal(msg);
1919     }
1920     this.abortRequested = true;
1921     // HBASE-4014: show list of coprocessors that were loaded to help debug
1922     // regionserver crashes.Note that we're implicitly using
1923     // java.util.HashSet's toString() method to print the coprocessor names.
1924     LOG.fatal("RegionServer abort: loaded coprocessors are: " +
1925         CoprocessorHost.getLoadedCoprocessors());
1926     // Do our best to report our abort to the master, but this may not work
1927     try {
1928       if (cause != null) {
1929         msg += "\nCause:\n" + StringUtils.stringifyException(cause);
1930       }
1931       // Report to the master but only if we have already registered with the master.
1932       if (rssStub != null && this.serverName != null) {
1933         ReportRSFatalErrorRequest.Builder builder =
1934           ReportRSFatalErrorRequest.newBuilder();
1935         ServerName sn =
1936           ServerName.parseVersionedServerName(this.serverName.getVersionedBytes());
1937         builder.setServer(ProtobufUtil.toServerName(sn));
1938         builder.setErrorMessage(msg);
1939         rssStub.reportRSFatalError(null, builder.build());
1940       }
1941     } catch (Throwable t) {
1942       LOG.warn("Unable to report fatal error to master", t);
1943     }
1944     stop(reason);
1945   }
1946 
1947   /**
1948    * @see HRegionServer#abort(String, Throwable)
1949    */
1950   public void abort(String reason) {
1951     abort(reason, null);
1952   }
1953 
1954   @Override
1955   public boolean isAborted() {
1956     return this.abortRequested;
1957   }
1958 
1959   /*
1960    * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
1961    * logs but it does close socket in case want to bring up server on old
1962    * hostname+port immediately.
1963    */
1964   protected void kill() {
1965     this.killed = true;
1966     abort("Simulated kill");
1967   }
1968 
1969   /**
1970    * Wait on all threads to finish. Presumption is that all closes and stops
1971    * have already been called.
1972    */
1973   protected void stopServiceThreads() {
1974     if (this.nonceManagerChore != null) {
1975       Threads.shutdown(this.nonceManagerChore.getThread());
1976     }
1977     if (this.compactionChecker != null) {
1978       Threads.shutdown(this.compactionChecker.getThread());
1979     }
1980     if (this.periodicFlusher != null) {
1981       Threads.shutdown(this.periodicFlusher.getThread());
1982     }
1983     if (this.cacheFlusher != null) {
1984       this.cacheFlusher.join();
1985     }
1986     if (this.healthCheckChore != null) {
1987       Threads.shutdown(this.healthCheckChore.getThread());
1988     }
1989     if (this.spanReceiverHost != null) {
1990       this.spanReceiverHost.closeReceivers();
1991     }
1992     if (this.walRoller != null) {
1993       Threads.shutdown(this.walRoller.getThread());
1994     }
1995     final LogRoller metawalRoller = this.metawalRoller.get();
1996     if (metawalRoller != null) {
1997       Threads.shutdown(metawalRoller.getThread());
1998     }
1999     if (this.compactSplitThread != null) {
2000       this.compactSplitThread.join();
2001     }
2002     if (this.service != null) this.service.shutdown();
2003     if (this.replicationSourceHandler != null &&
2004         this.replicationSourceHandler == this.replicationSinkHandler) {
2005       this.replicationSourceHandler.stopReplicationService();
2006     } else {
2007       if (this.replicationSourceHandler != null) {
2008         this.replicationSourceHandler.stopReplicationService();
2009       }
2010       if (this.replicationSinkHandler != null) {
2011         this.replicationSinkHandler.stopReplicationService();
2012       }
2013     }
2014     if (this.storefileRefresher != null) {
2015       Threads.shutdown(this.storefileRefresher.getThread());
2016     }
2017   }
2018 
2019   /**
2020    * @return Return the object that implements the replication
2021    * source service.
2022    */
2023   ReplicationSourceService getReplicationSourceService() {
2024     return replicationSourceHandler;
2025   }
2026 
2027   /**
2028    * @return Return the object that implements the replication
2029    * sink service.
2030    */
2031   ReplicationSinkService getReplicationSinkService() {
2032     return replicationSinkHandler;
2033   }
2034 
2035   /**
2036    * Get the current master from ZooKeeper and open the RPC connection to it.
2037    *
2038    * Method will block until a master is available. You can break from this
2039    * block by requesting the server stop.
2040    *
2041    * @return master + port, or null if server has been stopped
2042    */
2043   private synchronized ServerName createRegionServerStatusStub() {
2044     if (rssStub != null) {
2045       return masterAddressTracker.getMasterAddress();
2046     }
2047     ServerName sn = null;
2048     long previousLogTime = 0;
2049     boolean refresh = false; // for the first time, use cached data
2050     RegionServerStatusService.BlockingInterface intf = null;
2051     boolean interrupted = false;
2052     try {
2053       while (keepLooping()) {
2054         sn = this.masterAddressTracker.getMasterAddress(refresh);
2055         if (sn == null) {
2056           if (!keepLooping()) {
2057             // give up with no connection.
2058             LOG.debug("No master found and cluster is stopped; bailing out");
2059             return null;
2060           }
2061           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2062             LOG.debug("No master found; retry");
2063             previousLogTime = System.currentTimeMillis();
2064           }
2065           refresh = true; // let's try pull it from ZK directly
2066           if (sleep(200)) {
2067             interrupted = true;
2068           }
2069           continue;
2070         }
2071 
2072         // If we are on the active master, use the shortcut
2073         if (this instanceof HMaster && sn.equals(getServerName())) {
2074           intf = ((HMaster)this).getMasterRpcServices();
2075           break;
2076         }
2077         try {
2078           BlockingRpcChannel channel =
2079             this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), operationTimeout);
2080           intf = RegionServerStatusService.newBlockingStub(channel);
2081           break;
2082         } catch (IOException e) {
2083           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2084             e = e instanceof RemoteException ?
2085               ((RemoteException)e).unwrapRemoteException() : e;
2086             if (e instanceof ServerNotRunningYetException) {
2087               LOG.info("Master isn't available yet, retrying");
2088             } else {
2089               LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2090             }
2091             previousLogTime = System.currentTimeMillis();
2092           }
2093           if (sleep(200)) {
2094             interrupted = true;
2095           }
2096         }
2097       }
2098     } finally {
2099       if (interrupted) {
2100         Thread.currentThread().interrupt();
2101       }
2102     }
2103     rssStub = intf;
2104     return sn;
2105   }
2106 
2107   /**
2108    * @return True if we should break loop because cluster is going down or
2109    * this server has been stopped or hdfs has gone bad.
2110    */
2111   private boolean keepLooping() {
2112     return !this.stopped && isClusterUp();
2113   }
2114 
2115   /*
2116    * Let the master know we're here Run initialization using parameters passed
2117    * us by the master.
2118    * @return A Map of key/value configurations we got from the Master else
2119    * null if we failed to register.
2120    * @throws IOException
2121    */
2122   private RegionServerStartupResponse reportForDuty() throws IOException {
2123     ServerName masterServerName = createRegionServerStatusStub();
2124     if (masterServerName == null) return null;
2125     RegionServerStartupResponse result = null;
2126     try {
2127       rpcServices.requestCount.set(0);
2128       LOG.info("reportForDuty to master=" + masterServerName + " with port="
2129         + rpcServices.isa.getPort() + ", startcode=" + this.startcode);
2130       long now = EnvironmentEdgeManager.currentTime();
2131       int port = rpcServices.isa.getPort();
2132       RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2133       request.setPort(port);
2134       request.setServerStartCode(this.startcode);
2135       request.setServerCurrentTime(now);
2136       result = this.rssStub.regionServerStartup(null, request.build());
2137     } catch (ServiceException se) {
2138       IOException ioe = ProtobufUtil.getRemoteException(se);
2139       if (ioe instanceof ClockOutOfSyncException) {
2140         LOG.fatal("Master rejected startup because clock is out of sync", ioe);
2141         // Re-throw IOE will cause RS to abort
2142         throw ioe;
2143       } else if (ioe instanceof ServerNotRunningYetException) {
2144         LOG.debug("Master is not running yet");
2145       } else {
2146         LOG.warn("error telling master we are up", se);
2147       }
2148     }
2149     return result;
2150   }
2151 
2152   @Override
2153   public long getLastSequenceId(byte[] region) {
2154     Long lastFlushedSequenceId = -1l;
2155     try {
2156       GetLastFlushedSequenceIdRequest req = RequestConverter
2157           .buildGetLastFlushedSequenceIdRequest(region);
2158       RegionServerStatusService.BlockingInterface rss = rssStub;
2159       if (rss == null) { // Try to connect one more time
2160         createRegionServerStatusStub();
2161         rss = rssStub;
2162         if (rss == null) {
2163           // Still no luck, we tried
2164           LOG.warn("Unable to connect to the master to check "
2165             + "the last flushed sequence id");
2166           return -1l;
2167         }
2168       }
2169       lastFlushedSequenceId = rss.getLastFlushedSequenceId(null, req)
2170           .getLastFlushedSequenceId();
2171     } catch (ServiceException e) {
2172       lastFlushedSequenceId = -1l;
2173       LOG.warn("Unable to connect to the master to check "
2174         + "the last flushed sequence id", e);
2175     }
2176     return lastFlushedSequenceId;
2177   }
2178 
2179   /**
2180    * Closes all regions.  Called on our way out.
2181    * Assumes that its not possible for new regions to be added to onlineRegions
2182    * while this method runs.
2183    */
2184   protected void closeAllRegions(final boolean abort) {
2185     closeUserRegions(abort);
2186     closeMetaTableRegions(abort);
2187   }
2188 
2189   /**
2190    * Close meta region if we carry it
2191    * @param abort Whether we're running an abort.
2192    */
2193   void closeMetaTableRegions(final boolean abort) {
2194     HRegion meta = null;
2195     this.lock.writeLock().lock();
2196     try {
2197       for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) {
2198         HRegionInfo hri = e.getValue().getRegionInfo();
2199         if (hri.isMetaRegion()) {
2200           meta = e.getValue();
2201         }
2202         if (meta != null) break;
2203       }
2204     } finally {
2205       this.lock.writeLock().unlock();
2206     }
2207     if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2208   }
2209 
2210   /**
2211    * Schedule closes on all user regions.
2212    * Should be safe calling multiple times because it wont' close regions
2213    * that are already closed or that are closing.
2214    * @param abort Whether we're running an abort.
2215    */
2216   void closeUserRegions(final boolean abort) {
2217     this.lock.writeLock().lock();
2218     try {
2219       for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
2220         HRegion r = e.getValue();
2221         if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) {
2222           // Don't update zk with this close transition; pass false.
2223           closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2224         }
2225       }
2226     } finally {
2227       this.lock.writeLock().unlock();
2228     }
2229   }
2230 
2231   /** @return the info server */
2232   public InfoServer getInfoServer() {
2233     return infoServer;
2234   }
2235 
2236   /**
2237    * @return true if a stop has been requested.
2238    */
2239   @Override
2240   public boolean isStopped() {
2241     return this.stopped;
2242   }
2243 
2244   @Override
2245   public boolean isStopping() {
2246     return this.stopping;
2247   }
2248 
2249   @Override
2250   public Map<String, HRegion> getRecoveringRegions() {
2251     return this.recoveringRegions;
2252   }
2253 
2254   /**
2255    *
2256    * @return the configuration
2257    */
2258   @Override
2259   public Configuration getConfiguration() {
2260     return conf;
2261   }
2262 
2263   /** @return the write lock for the server */
2264   ReentrantReadWriteLock.WriteLock getWriteLock() {
2265     return lock.writeLock();
2266   }
2267 
2268   public int getNumberOfOnlineRegions() {
2269     return this.onlineRegions.size();
2270   }
2271 
2272   boolean isOnlineRegionsEmpty() {
2273     return this.onlineRegions.isEmpty();
2274   }
2275 
2276   /**
2277    * For tests, web ui and metrics.
2278    * This method will only work if HRegionServer is in the same JVM as client;
2279    * HRegion cannot be serialized to cross an rpc.
2280    */
2281   public Collection<HRegion> getOnlineRegionsLocalContext() {
2282     Collection<HRegion> regions = this.onlineRegions.values();
2283     return Collections.unmodifiableCollection(regions);
2284   }
2285 
2286   @Override
2287   public void addToOnlineRegions(HRegion region) {
2288     this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2289     configurationManager.registerObserver(region);
2290   }
2291 
2292   /**
2293    * @return A new Map of online regions sorted by region size with the first entry being the
2294    * biggest.  If two regions are the same size, then the last one found wins; i.e. this method
2295    * may NOT return all regions.
2296    */
2297   SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() {
2298     // we'll sort the regions in reverse
2299     SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
2300         new Comparator<Long>() {
2301           @Override
2302           public int compare(Long a, Long b) {
2303             return -1 * a.compareTo(b);
2304           }
2305         });
2306     // Copy over all regions. Regions are sorted by size with biggest first.
2307     for (HRegion region : this.onlineRegions.values()) {
2308       sortedRegions.put(region.memstoreSize.get(), region);
2309     }
2310     return sortedRegions;
2311   }
2312 
2313   /**
2314    * @return time stamp in millis of when this region server was started
2315    */
2316   public long getStartcode() {
2317     return this.startcode;
2318   }
2319 
2320   /** @return reference to FlushRequester */
2321   @Override
2322   public FlushRequester getFlushRequester() {
2323     return this.cacheFlusher;
2324   }
2325 
2326   /**
2327    * Get the top N most loaded regions this server is serving so we can tell the
2328    * master which regions it can reallocate if we're overloaded. TODO: actually
2329    * calculate which regions are most loaded. (Right now, we're just grabbing
2330    * the first N regions being served regardless of load.)
2331    */
2332   protected HRegionInfo[] getMostLoadedRegions() {
2333     ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2334     for (HRegion r : onlineRegions.values()) {
2335       if (!r.isAvailable()) {
2336         continue;
2337       }
2338       if (regions.size() < numRegionsToReport) {
2339         regions.add(r.getRegionInfo());
2340       } else {
2341         break;
2342       }
2343     }
2344     return regions.toArray(new HRegionInfo[regions.size()]);
2345   }
2346 
2347   @Override
2348   public Leases getLeases() {
2349     return leases;
2350   }
2351 
2352   /**
2353    * @return Return the rootDir.
2354    */
2355   protected Path getRootDir() {
2356     return rootDir;
2357   }
2358 
2359   /**
2360    * @return Return the fs.
2361    */
2362   @Override
2363   public FileSystem getFileSystem() {
2364     return fs;
2365   }
2366 
2367   @Override
2368   public String toString() {
2369     return getServerName().toString();
2370   }
2371 
2372   /**
2373    * Interval at which threads should run
2374    *
2375    * @return the interval
2376    */
2377   public int getThreadWakeFrequency() {
2378     return threadWakeFrequency;
2379   }
2380 
2381   @Override
2382   public ZooKeeperWatcher getZooKeeper() {
2383     return zooKeeper;
2384   }
2385 
2386   @Override
2387   public BaseCoordinatedStateManager getCoordinatedStateManager() {
2388     return csm;
2389   }
2390 
2391   @Override
2392   public ServerName getServerName() {
2393     return serverName;
2394   }
2395 
2396   @Override
2397   public CompactionRequestor getCompactionRequester() {
2398     return this.compactSplitThread;
2399   }
2400 
2401   public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
2402     return this.rsHost;
2403   }
2404 
2405   @Override
2406   public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2407     return this.regionsInTransitionInRS;
2408   }
2409 
2410   @Override
2411   public ExecutorService getExecutorService() {
2412     return service;
2413   }
2414 
2415   @Override
2416   public RegionServerQuotaManager getRegionServerQuotaManager() {
2417     return rsQuotaManager;
2418   }
2419 
2420   //
2421   // Main program and support routines
2422   //
2423 
2424   /**
2425    * Load the replication service objects, if any
2426    */
2427   static private void createNewReplicationInstance(Configuration conf,
2428     HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
2429 
2430     // If replication is not enabled, then return immediately.
2431     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
2432         HConstants.REPLICATION_ENABLE_DEFAULT)) {
2433       return;
2434     }
2435 
2436     // read in the name of the source replication class from the config file.
2437     String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2438                                HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2439 
2440     // read in the name of the sink replication class from the config file.
2441     String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2442                              HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2443 
2444     // If both the sink and the source class names are the same, then instantiate
2445     // only one object.
2446     if (sourceClassname.equals(sinkClassname)) {
2447       server.replicationSourceHandler = (ReplicationSourceService)
2448                                          newReplicationInstance(sourceClassname,
2449                                          conf, server, fs, logDir, oldLogDir);
2450       server.replicationSinkHandler = (ReplicationSinkService)
2451                                          server.replicationSourceHandler;
2452     } else {
2453       server.replicationSourceHandler = (ReplicationSourceService)
2454                                          newReplicationInstance(sourceClassname,
2455                                          conf, server, fs, logDir, oldLogDir);
2456       server.replicationSinkHandler = (ReplicationSinkService)
2457                                          newReplicationInstance(sinkClassname,
2458                                          conf, server, fs, logDir, oldLogDir);
2459     }
2460   }
2461 
2462   static private ReplicationService newReplicationInstance(String classname,
2463     Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
2464     Path oldLogDir) throws IOException{
2465 
2466     Class<?> clazz = null;
2467     try {
2468       ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2469       clazz = Class.forName(classname, true, classLoader);
2470     } catch (java.lang.ClassNotFoundException nfe) {
2471       throw new IOException("Could not find class for " + classname);
2472     }
2473 
2474     // create an instance of the replication object.
2475     ReplicationService service = (ReplicationService)
2476                               ReflectionUtils.newInstance(clazz, conf);
2477     service.initialize(server, fs, logDir, oldLogDir);
2478     return service;
2479   }
2480 
2481   /**
2482    * Utility for constructing an instance of the passed HRegionServer class.
2483    *
2484    * @param regionServerClass
2485    * @param conf2
2486    * @return HRegionServer instance.
2487    */
2488   public static HRegionServer constructRegionServer(
2489       Class<? extends HRegionServer> regionServerClass,
2490       final Configuration conf2, CoordinatedStateManager cp) {
2491     try {
2492       Constructor<? extends HRegionServer> c = regionServerClass
2493           .getConstructor(Configuration.class, CoordinatedStateManager.class);
2494       return c.newInstance(conf2, cp);
2495     } catch (Exception e) {
2496       throw new RuntimeException("Failed construction of " + "Regionserver: "
2497           + regionServerClass.toString(), e);
2498     }
2499   }
2500 
2501   /**
2502    * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2503    */
2504   public static void main(String[] args) throws Exception {
2505     VersionInfo.logVersion();
2506     Configuration conf = HBaseConfiguration.create();
2507     @SuppressWarnings("unchecked")
2508     Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2509         .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2510 
2511     new HRegionServerCommandLine(regionServerClass).doMain(args);
2512   }
2513 
2514   /**
2515    * Gets the online regions of the specified table.
2516    * This method looks at the in-memory onlineRegions.  It does not go to <code>hbase:meta</code>.
2517    * Only returns <em>online</em> regions.  If a region on this table has been
2518    * closed during a disable, etc., it will not be included in the returned list.
2519    * So, the returned list may not necessarily be ALL regions in this table, its
2520    * all the ONLINE regions in the table.
2521    * @param tableName
2522    * @return Online regions from <code>tableName</code>
2523    */
2524   @Override
2525   public List<HRegion> getOnlineRegions(TableName tableName) {
2526      List<HRegion> tableRegions = new ArrayList<HRegion>();
2527      synchronized (this.onlineRegions) {
2528        for (HRegion region: this.onlineRegions.values()) {
2529          HRegionInfo regionInfo = region.getRegionInfo();
2530          if(regionInfo.getTable().equals(tableName)) {
2531            tableRegions.add(region);
2532          }
2533        }
2534      }
2535      return tableRegions;
2536    }
2537 
2538   /**
2539    * Gets the online tables in this RS.
2540    * This method looks at the in-memory onlineRegions.
2541    * @return all the online tables in this RS
2542    */
2543   @Override
2544   public Set<TableName> getOnlineTables() {
2545     Set<TableName> tables = new HashSet<TableName>();
2546     synchronized (this.onlineRegions) {
2547       for (HRegion region: this.onlineRegions.values()) {
2548         tables.add(region.getTableDesc().getTableName());
2549       }
2550     }
2551     return tables;
2552   }
2553 
2554   // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
2555   public String[] getRegionServerCoprocessors() {
2556     TreeSet<String> coprocessors = new TreeSet<String>();
2557     try {
2558       coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
2559     } catch (IOException exception) {
2560       LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
2561           "skipping.");
2562       LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2563     }
2564     Collection<HRegion> regions = getOnlineRegionsLocalContext();
2565     for (HRegion region: regions) {
2566       coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2567       try {
2568         coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
2569       } catch (IOException exception) {
2570         LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
2571             "; skipping.");
2572         LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2573       }
2574     }
2575     return coprocessors.toArray(new String[coprocessors.size()]);
2576   }
2577 
2578   /**
2579    * Try to close the region, logs a warning on failure but continues.
2580    * @param region Region to close
2581    */
2582   private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
2583     try {
2584       if (!closeRegion(region.getEncodedName(), abort, null)) {
2585         LOG.warn("Failed to close " + region.getRegionNameAsString() +
2586             " - ignoring and continuing");
2587       }
2588     } catch (IOException e) {
2589       LOG.warn("Failed to close " + region.getRegionNameAsString() +
2590           " - ignoring and continuing", e);
2591     }
2592   }
2593 
2594   /**
2595    * Close asynchronously a region, can be called from the master or internally by the regionserver
2596    * when stopping. If called from the master, the region will update the znode status.
2597    *
2598    * <p>
2599    * If an opening was in progress, this method will cancel it, but will not start a new close. The
2600    * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
2601    * </p>
2602 
2603    * <p>
2604    *   If a close was in progress, this new request will be ignored, and an exception thrown.
2605    * </p>
2606    *
2607    * @param encodedName Region to close
2608    * @param abort True if we are aborting
2609    * @return True if closed a region.
2610    * @throws NotServingRegionException if the region is not online
2611    */
2612   protected boolean closeRegion(String encodedName, final boolean abort, final ServerName sn)
2613       throws NotServingRegionException {
2614     //Check for permissions to close.
2615     HRegion actualRegion = this.getFromOnlineRegions(encodedName);
2616     if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2617       try {
2618         actualRegion.getCoprocessorHost().preClose(false);
2619       } catch (IOException exp) {
2620         LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2621         return false;
2622       }
2623     }
2624 
2625     final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(encodedName.getBytes(),
2626         Boolean.FALSE);
2627 
2628     if (Boolean.TRUE.equals(previous)) {
2629       LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
2630           "trying to OPEN. Cancelling OPENING.");
2631       if (!regionsInTransitionInRS.replace(encodedName.getBytes(), previous, Boolean.FALSE)){
2632         // The replace failed. That should be an exceptional case, but theoretically it can happen.
2633         // We're going to try to do a standard close then.
2634         LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
2635             " Doing a standard close now");
2636         return closeRegion(encodedName, abort, sn);
2637       }
2638       // Let's get the region from the online region list again
2639       actualRegion = this.getFromOnlineRegions(encodedName);
2640       if (actualRegion == null) { // If already online, we still need to close it.
2641         LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
2642         // The master deletes the znode when it receives this exception.
2643         throw new NotServingRegionException("The region " + encodedName +
2644           " was opening but not yet served. Opening is cancelled.");
2645       }
2646     } else if (Boolean.FALSE.equals(previous)) {
2647       LOG.info("Received CLOSE for the region: " + encodedName +
2648         ", which we are already trying to CLOSE, but not completed yet");
2649       return true;
2650     }
2651 
2652     if (actualRegion == null) {
2653       LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
2654       this.regionsInTransitionInRS.remove(encodedName.getBytes());
2655       // The master deletes the znode when it receives this exception.
2656       throw new NotServingRegionException("The region " + encodedName +
2657           " is not online, and is not opening.");
2658     }
2659 
2660     CloseRegionHandler crh;
2661     final HRegionInfo hri = actualRegion.getRegionInfo();
2662     if (hri.isMetaRegion()) {
2663       crh = new CloseMetaHandler(this, this, hri, abort);
2664     } else {
2665       crh = new CloseRegionHandler(this, this, hri, abort, sn);
2666     }
2667     this.service.submit(crh);
2668     return true;
2669   }
2670 
2671    /**
2672    * @param regionName
2673    * @return HRegion for the passed binary <code>regionName</code> or null if
2674    *         named region is not member of the online regions.
2675    */
2676   public HRegion getOnlineRegion(final byte[] regionName) {
2677     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2678     return this.onlineRegions.get(encodedRegionName);
2679   }
2680 
2681   public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) {
2682     return this.regionFavoredNodesMap.get(encodedRegionName);
2683   }
2684 
2685   @Override
2686   public HRegion getFromOnlineRegions(final String encodedRegionName) {
2687     return this.onlineRegions.get(encodedRegionName);
2688   }
2689 
2690 
2691   @Override
2692   public boolean removeFromOnlineRegions(final HRegion r, ServerName destination) {
2693     HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
2694 
2695     if (destination != null) {
2696       try {
2697         WAL wal = getWAL(r.getRegionInfo());
2698         long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes());
2699         if (closeSeqNum == HConstants.NO_SEQNUM) {
2700           // No edits in WAL for this region; get the sequence number when the region was opened.
2701           closeSeqNum = r.getOpenSeqNum();
2702           if (closeSeqNum == HConstants.NO_SEQNUM) {
2703             closeSeqNum = 0;
2704           }
2705         }
2706         addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
2707       } catch (IOException exception) {
2708         LOG.error("Could not retrieve WAL information for region " + r.getRegionInfo() +
2709             "; not adding to moved regions.");
2710         LOG.debug("Exception details for failure to get wal", exception);
2711       }
2712     }
2713     this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
2714     return toReturn != null;
2715   }
2716 
2717   /**
2718    * Protected utility method for safely obtaining an HRegion handle.
2719    *
2720    * @param regionName
2721    *          Name of online {@link HRegion} to return
2722    * @return {@link HRegion} for <code>regionName</code>
2723    * @throws NotServingRegionException
2724    */
2725   protected HRegion getRegion(final byte[] regionName)
2726       throws NotServingRegionException {
2727     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2728     return getRegionByEncodedName(regionName, encodedRegionName);
2729   }
2730 
2731   public HRegion getRegionByEncodedName(String encodedRegionName)
2732       throws NotServingRegionException {
2733     return getRegionByEncodedName(null, encodedRegionName);
2734   }
2735 
2736   protected HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
2737     throws NotServingRegionException {
2738     HRegion region = this.onlineRegions.get(encodedRegionName);
2739     if (region == null) {
2740       MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
2741       if (moveInfo != null) {
2742         throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
2743       }
2744       Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
2745       String regionNameStr = regionName == null?
2746         encodedRegionName: Bytes.toStringBinary(regionName);
2747       if (isOpening != null && isOpening.booleanValue()) {
2748         throw new RegionOpeningException("Region " + regionNameStr +
2749           " is opening on " + this.serverName);
2750       }
2751       throw new NotServingRegionException("Region " + regionNameStr +
2752         " is not online on " + this.serverName);
2753     }
2754     return region;
2755   }
2756 
2757   /*
2758    * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
2759    * IOE if it isn't already.
2760    *
2761    * @param t Throwable
2762    *
2763    * @param msg Message to log in error. Can be null.
2764    *
2765    * @return Throwable converted to an IOE; methods can only let out IOEs.
2766    */
2767   private Throwable cleanup(final Throwable t, final String msg) {
2768     // Don't log as error if NSRE; NSRE is 'normal' operation.
2769     if (t instanceof NotServingRegionException) {
2770       LOG.debug("NotServingRegionException; " + t.getMessage());
2771       return t;
2772     }
2773     Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
2774     if (msg == null) {
2775       LOG.error("", e);
2776     } else {
2777       LOG.error(msg, e);
2778     }
2779     if (!rpcServices.checkOOME(t)) {
2780       checkFileSystem();
2781     }
2782     return t;
2783   }
2784 
2785   /*
2786    * @param t
2787    *
2788    * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
2789    *
2790    * @return Make <code>t</code> an IOE if it isn't already.
2791    */
2792   protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
2793     return (t instanceof IOException ? (IOException) t : msg == null
2794         || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
2795   }
2796 
2797   /**
2798    * Checks to see if the file system is still accessible. If not, sets
2799    * abortRequested and stopRequested
2800    *
2801    * @return false if file system is not available
2802    */
2803   public boolean checkFileSystem() {
2804     if (this.fsOk && this.fs != null) {
2805       try {
2806         FSUtils.checkFileSystemAvailable(this.fs);
2807       } catch (IOException e) {
2808         abort("File System not available", e);
2809         this.fsOk = false;
2810       }
2811     }
2812     return this.fsOk;
2813   }
2814 
2815   @Override
2816   public void updateRegionFavoredNodesMapping(String encodedRegionName,
2817       List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
2818     InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
2819     // Refer to the comment on the declaration of regionFavoredNodesMap on why
2820     // it is a map of region name to InetSocketAddress[]
2821     for (int i = 0; i < favoredNodes.size(); i++) {
2822       addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
2823           favoredNodes.get(i).getPort());
2824     }
2825     regionFavoredNodesMap.put(encodedRegionName, addr);
2826   }
2827 
2828   /**
2829    * Return the favored nodes for a region given its encoded name. Look at the
2830    * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[]
2831    * @param encodedRegionName
2832    * @return array of favored locations
2833    */
2834   @Override
2835   public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
2836     return regionFavoredNodesMap.get(encodedRegionName);
2837   }
2838 
2839   @Override
2840   public ServerNonceManager getNonceManager() {
2841     return this.nonceManager;
2842   }
2843 
2844   private static class MovedRegionInfo {
2845     private final ServerName serverName;
2846     private final long seqNum;
2847     private final long ts;
2848 
2849     public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
2850       this.serverName = serverName;
2851       this.seqNum = closeSeqNum;
2852       ts = EnvironmentEdgeManager.currentTime();
2853      }
2854 
2855     public ServerName getServerName() {
2856       return serverName;
2857     }
2858 
2859     public long getSeqNum() {
2860       return seqNum;
2861     }
2862 
2863     public long getMoveTime() {
2864       return ts;
2865     }
2866   }
2867 
2868   // This map will contains all the regions that we closed for a move.
2869   //  We add the time it was moved as we don't want to keep too old information
2870   protected Map<String, MovedRegionInfo> movedRegions =
2871       new ConcurrentHashMap<String, MovedRegionInfo>(3000);
2872 
2873   // We need a timeout. If not there is a risk of giving a wrong information: this would double
2874   //  the number of network calls instead of reducing them.
2875   private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
2876 
2877   protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
2878     if (ServerName.isSameHostnameAndPort(destination, this.getServerName())) {
2879       LOG.warn("Not adding moved region record: " + encodedName + " to self.");
2880       return;
2881     }
2882     LOG.info("Adding moved region record: "
2883       + encodedName + " to " + destination + " as of " + closeSeqNum);
2884     movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
2885   }
2886 
2887   void removeFromMovedRegions(String encodedName) {
2888     movedRegions.remove(encodedName);
2889   }
2890 
2891   private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
2892     MovedRegionInfo dest = movedRegions.get(encodedRegionName);
2893 
2894     long now = EnvironmentEdgeManager.currentTime();
2895     if (dest != null) {
2896       if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
2897         return dest;
2898       } else {
2899         movedRegions.remove(encodedRegionName);
2900       }
2901     }
2902 
2903     return null;
2904   }
2905 
2906   /**
2907    * Remove the expired entries from the moved regions list.
2908    */
2909   protected void cleanMovedRegions() {
2910     final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
2911     Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();
2912 
2913     while (it.hasNext()){
2914       Map.Entry<String, MovedRegionInfo> e = it.next();
2915       if (e.getValue().getMoveTime() < cutOff) {
2916         it.remove();
2917       }
2918     }
2919   }
2920 
2921   /**
2922    * Creates a Chore thread to clean the moved region cache.
2923    */
2924   protected static class MovedRegionsCleaner extends Chore implements Stoppable {
2925     private HRegionServer regionServer;
2926     Stoppable stoppable;
2927 
2928     private MovedRegionsCleaner(
2929       HRegionServer regionServer, Stoppable stoppable){
2930       super("MovedRegionsCleaner for region "+regionServer, TIMEOUT_REGION_MOVED, stoppable);
2931       this.regionServer = regionServer;
2932       this.stoppable = stoppable;
2933     }
2934 
2935     static MovedRegionsCleaner createAndStart(HRegionServer rs){
2936       Stoppable stoppable = new Stoppable() {
2937         private volatile boolean isStopped = false;
2938         @Override public void stop(String why) { isStopped = true;}
2939         @Override public boolean isStopped() {return isStopped;}
2940       };
2941 
2942       return new MovedRegionsCleaner(rs, stoppable);
2943     }
2944 
2945     @Override
2946     protected void chore() {
2947       regionServer.cleanMovedRegions();
2948     }
2949 
2950     @Override
2951     public void stop(String why) {
2952       stoppable.stop(why);
2953     }
2954 
2955     @Override
2956     public boolean isStopped() {
2957       return stoppable.isStopped();
2958     }
2959   }
2960 
2961   private String getMyEphemeralNodePath() {
2962     return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
2963   }
2964 
2965   private boolean isHealthCheckerConfigured() {
2966     String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
2967     return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
2968   }
2969 
2970   /**
2971    * @return the underlying {@link CompactSplitThread} for the servers
2972    */
2973   public CompactSplitThread getCompactSplitThread() {
2974     return this.compactSplitThread;
2975   }
2976 
2977   /**
2978    * A helper function to store the last flushed sequence Id with the previous failed RS for a
2979    * recovering region. The Id is used to skip wal edits which are flushed. Since the flushed
2980    * sequence id is only valid for each RS, we associate the Id with corresponding failed RS.
2981    * @throws KeeperException
2982    * @throws IOException
2983    */
2984   private void updateRecoveringRegionLastFlushedSequenceId(HRegion r) throws KeeperException,
2985       IOException {
2986     if (!r.isRecovering()) {
2987       // return immdiately for non-recovering regions
2988       return;
2989     }
2990 
2991     HRegionInfo region = r.getRegionInfo();
2992     ZooKeeperWatcher zkw = getZooKeeper();
2993     String previousRSName = this.getLastFailedRSFromZK(region.getEncodedName());
2994     Map<byte[], Long> maxSeqIdInStores = r.getMaxStoreSeqIdForLogReplay();
2995     long minSeqIdForLogReplay = -1;
2996     for (Long storeSeqIdForReplay : maxSeqIdInStores.values()) {
2997       if (minSeqIdForLogReplay == -1 || storeSeqIdForReplay < minSeqIdForLogReplay) {
2998         minSeqIdForLogReplay = storeSeqIdForReplay;
2999       }
3000     }
3001 
3002     try {
3003       long lastRecordedFlushedSequenceId = -1;
3004       String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode,
3005         region.getEncodedName());
3006       // recovering-region level
3007       byte[] data;
3008       try {
3009         data = ZKUtil.getData(zkw, nodePath);
3010       } catch (InterruptedException e) {
3011         throw new InterruptedIOException();
3012       }
3013       if (data != null) {
3014       lastRecordedFlushedSequenceId = ZKSplitLog.parseLastFlushedSequenceIdFrom(data);
3015       }
3016       if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
3017         ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
3018       }
3019       if (previousRSName != null) {
3020         // one level deeper for the failed RS
3021         nodePath = ZKUtil.joinZNode(nodePath, previousRSName);
3022         ZKUtil.setData(zkw, nodePath,
3023           ZKUtil.regionSequenceIdsToByteArray(minSeqIdForLogReplay, maxSeqIdInStores));
3024         LOG.debug("Update last flushed sequence id of region " + region.getEncodedName() + " for "
3025             + previousRSName);
3026       } else {
3027         LOG.warn("Can't find failed region server for recovering region " +
3028           region.getEncodedName());
3029       }
3030     } catch (NoNodeException ignore) {
3031       LOG.debug("Region " + region.getEncodedName() +
3032         " must have completed recovery because its recovery znode has been removed", ignore);
3033     }
3034   }
3035 
3036   /**
3037    * Return the last failed RS name under /hbase/recovering-regions/encodedRegionName
3038    * @param encodedRegionName
3039    * @throws KeeperException
3040    */
3041   private String getLastFailedRSFromZK(String encodedRegionName) throws KeeperException {
3042     String result = null;
3043     long maxZxid = 0;
3044     ZooKeeperWatcher zkw = this.getZooKeeper();
3045     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
3046     List<String> failedServers = ZKUtil.listChildrenNoWatch(zkw, nodePath);
3047     if (failedServers == null || failedServers.isEmpty()) {
3048       return result;
3049     }
3050     for (String failedServer : failedServers) {
3051       String rsPath = ZKUtil.joinZNode(nodePath, failedServer);
3052       Stat stat = new Stat();
3053       ZKUtil.getDataNoWatch(zkw, rsPath, stat);
3054       if (maxZxid < stat.getCzxid()) {
3055         maxZxid = stat.getCzxid();
3056         result = failedServer;
3057       }
3058     }
3059     return result;
3060   }
3061 
3062   public CoprocessorServiceResponse execRegionServerService(final RpcController controller,
3063       final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3064     try {
3065       ServerRpcController execController = new ServerRpcController();
3066       CoprocessorServiceCall call = serviceRequest.getCall();
3067       String serviceName = call.getServiceName();
3068       String methodName = call.getMethodName();
3069       if (!coprocessorServiceHandlers.containsKey(serviceName)) {
3070         throw new UnknownProtocolException(null,
3071             "No registered coprocessor service found for name " + serviceName);
3072       }
3073       Service service = coprocessorServiceHandlers.get(serviceName);
3074       Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
3075       Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3076       if (methodDesc == null) {
3077         throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName
3078             + " called on service " + serviceName);
3079       }
3080       Message request =
3081           service.getRequestPrototype(methodDesc).newBuilderForType().mergeFrom(call.getRequest())
3082               .build();
3083       final Message.Builder responseBuilder =
3084           service.getResponsePrototype(methodDesc).newBuilderForType();
3085       service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
3086         @Override
3087         public void run(Message message) {
3088           if (message != null) {
3089             responseBuilder.mergeFrom(message);
3090           }
3091         }
3092       });
3093       Message execResult = responseBuilder.build();
3094       if (execController.getFailedOn() != null) {
3095         throw execController.getFailedOn();
3096       }
3097       ClientProtos.CoprocessorServiceResponse.Builder builder =
3098           ClientProtos.CoprocessorServiceResponse.newBuilder();
3099       builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
3100         HConstants.EMPTY_BYTE_ARRAY));
3101       builder.setValue(builder.getValueBuilder().setName(execResult.getClass().getName())
3102           .setValue(execResult.toByteString()));
3103       return builder.build();
3104     } catch (IOException ie) {
3105       throw new ServiceException(ie);
3106     }
3107   }
3108 
3109   /**
3110    * @return The cache config instance used by the regionserver.
3111    */
3112   public CacheConfig getCacheConfig() {
3113     return this.cacheConfig;
3114   }
3115 
3116   /**
3117    * @return : Returns the ConfigurationManager object for testing purposes.
3118    */
3119   protected ConfigurationManager getConfigurationManager() {
3120     return configurationManager;
3121   }
3122 
3123   /**
3124    * Reload the configuration from disk.
3125    */
3126   public void updateConfiguration() {
3127     LOG.info("Reloading the configuration from disk.");
3128     // Reload the configuration from disk.
3129     conf.reloadConfiguration();
3130     configurationManager.notifyAllObservers(conf);
3131   }
3132 }