View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.lang.Thread.UncaughtExceptionHandler;
24  import java.lang.management.ManagementFactory;
25  import java.lang.management.MemoryUsage;
26  import java.lang.reflect.Constructor;
27  import java.net.BindException;
28  import java.net.InetSocketAddress;
29  import java.util.ArrayList;
30  import java.util.Collection;
31  import java.util.Collections;
32  import java.util.Comparator;
33  import java.util.HashMap;
34  import java.util.HashSet;
35  import java.util.Iterator;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.Map.Entry;
39  import java.util.Set;
40  import java.util.SortedMap;
41  import java.util.TreeMap;
42  import java.util.TreeSet;
43  import java.util.concurrent.ConcurrentHashMap;
44  import java.util.concurrent.ConcurrentMap;
45  import java.util.concurrent.ConcurrentSkipListMap;
46  import java.util.concurrent.atomic.AtomicBoolean;
47  import java.util.concurrent.locks.ReentrantReadWriteLock;
48  
49  import javax.management.ObjectName;
50  import javax.servlet.http.HttpServlet;
51  
52  import org.apache.commons.lang.math.RandomUtils;
53  import org.apache.commons.logging.Log;
54  import org.apache.commons.logging.LogFactory;
55  import org.apache.hadoop.classification.InterfaceAudience;
56  import org.apache.hadoop.conf.Configuration;
57  import org.apache.hadoop.fs.FileSystem;
58  import org.apache.hadoop.fs.Path;
59  import org.apache.hadoop.hbase.Chore;
60  import org.apache.hadoop.hbase.ClockOutOfSyncException;
61  import org.apache.hadoop.hbase.CoordinatedStateManager;
62  import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
63  import org.apache.hadoop.hbase.HBaseConfiguration;
64  import org.apache.hadoop.hbase.HConstants;
65  import org.apache.hadoop.hbase.HRegionInfo;
66  import org.apache.hadoop.hbase.HealthCheckChore;
67  import org.apache.hadoop.hbase.NotServingRegionException;
68  import org.apache.hadoop.hbase.RemoteExceptionHandler;
69  import org.apache.hadoop.hbase.ServerName;
70  import org.apache.hadoop.hbase.Stoppable;
71  import org.apache.hadoop.hbase.TableDescriptors;
72  import org.apache.hadoop.hbase.TableName;
73  import org.apache.hadoop.hbase.YouAreDeadException;
74  import org.apache.hadoop.hbase.ZNodeClearer;
75  import org.apache.hadoop.hbase.catalog.CatalogTracker;
76  import org.apache.hadoop.hbase.catalog.MetaEditor;
77  import org.apache.hadoop.hbase.client.ConnectionUtils;
78  import org.apache.hadoop.hbase.client.HConnection;
79  import org.apache.hadoop.hbase.client.HConnectionManager;
80  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
81  import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
82  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
83  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
84  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
85  import org.apache.hadoop.hbase.executor.ExecutorService;
86  import org.apache.hadoop.hbase.executor.ExecutorType;
87  import org.apache.hadoop.hbase.fs.HFileSystem;
88  import org.apache.hadoop.hbase.http.InfoServer;
89  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
90  import org.apache.hadoop.hbase.ipc.RpcClient;
91  import org.apache.hadoop.hbase.ipc.RpcServerInterface;
92  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
93  import org.apache.hadoop.hbase.master.HMaster;
94  import org.apache.hadoop.hbase.master.SplitLogManager;
95  import org.apache.hadoop.hbase.master.TableLockManager;
96  import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
97  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
98  import org.apache.hadoop.hbase.protobuf.RequestConverter;
99  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
100 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
101 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
102 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
103 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
104 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
105 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
106 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
107 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
108 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
109 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
110 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
111 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
112 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
113 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
114 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
115 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
116 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
117 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
118 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
119 import org.apache.hadoop.hbase.regionserver.wal.HLog;
120 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
121 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
122 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
123 import org.apache.hadoop.hbase.security.UserProvider;
124 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
125 import org.apache.hadoop.hbase.util.Bytes;
126 import org.apache.hadoop.hbase.util.CompressionTest;
127 import org.apache.hadoop.hbase.util.ConfigUtil;
128 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
129 import org.apache.hadoop.hbase.util.FSTableDescriptors;
130 import org.apache.hadoop.hbase.util.FSUtils;
131 import org.apache.hadoop.hbase.util.HasThread;
132 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
133 import org.apache.hadoop.hbase.util.Sleeper;
134 import org.apache.hadoop.hbase.util.Threads;
135 import org.apache.hadoop.hbase.util.VersionInfo;
136 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
137 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
138 import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
139 import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
140 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
141 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
142 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
143 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
144 import org.apache.hadoop.ipc.RemoteException;
145 import org.apache.hadoop.metrics.util.MBeanUtil;
146 import org.apache.hadoop.util.ReflectionUtils;
147 import org.apache.hadoop.util.StringUtils;
148 import org.apache.zookeeper.KeeperException;
149 import org.apache.zookeeper.data.Stat;
150 
151 import com.google.common.annotations.VisibleForTesting;
152 import com.google.protobuf.BlockingRpcChannel;
153 import com.google.protobuf.HBaseZeroCopyByteString;
154 import com.google.protobuf.ServiceException;
155 
156 /**
157  * HRegionServer makes a set of HRegions available to clients. It checks in with
158  * the HMaster. There are many HRegionServers in a single HBase deployment.
159  */
160 @InterfaceAudience.Private
161 @SuppressWarnings("deprecation")
162 public class HRegionServer extends HasThread implements
163     RegionServerServices, LastSequenceId {
164 
165   public static final Log LOG = LogFactory.getLog(HRegionServer.class);
166 
167   /*
168    * Strings to be used in forming the exception message for
169    * RegionsAlreadyInTransitionException.
170    */
171   protected static final String OPEN = "OPEN";
172   protected static final String CLOSE = "CLOSE";
173 
174   //RegionName vs current action in progress
175   //true - if open region action in progress
176   //false - if close region action in progress
177   protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
178     new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
179 
180   // Cache flushing
181   protected MemStoreFlusher cacheFlusher;
182 
183   protected HeapMemoryManager hMemManager;
184 
185   // catalog tracker
186   protected CatalogTracker catalogTracker;
187 
188   // Watch if a region is out of recovering state from ZooKeeper
189   @SuppressWarnings("unused")
190   private RecoveringRegionWatcher recoveringRegionWatcher;
191 
192   /**
193    * Go here to get table descriptors.
194    */
195   protected TableDescriptors tableDescriptors;
196 
197   // Replication services. If no replication, this handler will be null.
198   protected ReplicationSourceService replicationSourceHandler;
199   protected ReplicationSinkService replicationSinkHandler;
200 
201   // Compactions
202   public CompactSplitThread compactSplitThread;
203 
204   /**
205    * Map of regions currently being served by this region server. Key is the
206    * encoded region name.  All access should be synchronized.
207    */
208   protected final Map<String, HRegion> onlineRegions =
209     new ConcurrentHashMap<String, HRegion>();
210 
211   /**
212    * Map of encoded region names to the DataNode locations they should be hosted on
213    * We store the value as InetSocketAddress since this is used only in HDFS
214    * API (create() that takes favored nodes as hints for placing file blocks).
215    * We could have used ServerName here as the value class, but we'd need to
216    * convert it to InetSocketAddress at some point before the HDFS API call, and
217    * it seems a bit weird to store ServerName since ServerName refers to RegionServers
218    * and here we really mean DataNode locations.
219    */
220   protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
221       new ConcurrentHashMap<String, InetSocketAddress[]>();
222 
223   /**
224    * Set of regions currently being in recovering state which means it can accept writes(edits from
225    * previous failed region server) but not reads. A recovering region is also an online region.
226    */
227   protected final Map<String, HRegion> recoveringRegions = Collections
228       .synchronizedMap(new HashMap<String, HRegion>());
229 
230   // Leases
231   protected Leases leases;
232 
233   // Instance of the hbase executor service.
234   protected ExecutorService service;
235 
236   // If false, the file system has become unavailable
237   protected volatile boolean fsOk;
238   protected HFileSystem fs;
239 
240   // Set when a report to the master comes back with a message asking us to
241   // shutdown. Also set by call to stop when debugging or running unit tests
242   // of HRegionServer in isolation.
243   private volatile boolean stopped = false;
244 
245   // Go down hard. Used if file system becomes unavailable and also in
246   // debugging and unit tests.
247   private volatile boolean abortRequested;
248 
249   ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
250 
251   // A state before we go into stopped state.  At this stage we're closing user
252   // space regions.
253   private boolean stopping = false;
254 
255   private volatile boolean killed = false;
256 
257   protected final Configuration conf;
258 
259   private Path rootDir;
260 
261   protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
262 
263   final int numRetries;
264   protected final int threadWakeFrequency;
265   protected final int msgInterval;
266 
267   protected final int numRegionsToReport;
268 
269   // Stub to do region server status calls against the master.
270   private volatile RegionServerStatusService.BlockingInterface rssStub;
271   // RPC client. Used to make the stub above that does region server status checking.
272   RpcClient rpcClient;
273 
274   private UncaughtExceptionHandler uncaughtExceptionHandler;
275 
276   // Info server. Default access so can be used by unit tests. REGIONSERVER
277   // is name of the webapp and the attribute name used stuffing this instance
278   // into web context.
279   protected InfoServer infoServer;
280   private JvmPauseMonitor pauseMonitor;
281 
282   /** region server process name */
283   public static final String REGIONSERVER = "regionserver";
284 
285   MetricsRegionServer metricsRegionServer;
286   private SpanReceiverHost spanReceiverHost;
287 
288   /*
289    * Check for compactions requests.
290    */
291   Chore compactionChecker;
292 
293   /*
294    * Check for flushes
295    */
296   Chore periodicFlusher;
297 
298   // HLog and HLog roller. log is protected rather than private to avoid
299   // eclipse warning when accessed by inner classes
300   protected volatile HLog hlog;
301   // The meta updates are written to a different hlog. If this
302   // regionserver holds meta regions, then this field will be non-null.
303   protected volatile HLog hlogForMeta;
304 
305   LogRoller hlogRoller;
306   LogRoller metaHLogRoller;
307 
308   // flag set after we're done setting up server threads
309   protected AtomicBoolean online;
310 
311   // zookeeper connection and watcher
312   protected ZooKeeperWatcher zooKeeper;
313 
314   // master address tracker
315   private MasterAddressTracker masterAddressTracker;
316 
317   // Cluster Status Tracker
318   protected ClusterStatusTracker clusterStatusTracker;
319 
320   // Log Splitting Worker
321   private SplitLogWorker splitLogWorker;
322 
323   // A sleeper that sleeps for msgInterval.
324   protected final Sleeper sleeper;
325 
326   private final int operationTimeout;
327 
328   private final RegionServerAccounting regionServerAccounting;
329 
330   // Cache configuration and block cache reference
331   final CacheConfig cacheConfig;
332 
333   /** The health check chore. */
334   private HealthCheckChore healthCheckChore;
335 
336   /** The nonce manager chore. */
337   private Chore nonceManagerChore;
338 
339   /**
340    * The server name the Master sees us as.  Its made from the hostname the
341    * master passes us, port, and server startcode. Gets set after registration
342    * against  Master.
343    */
344   protected ServerName serverName;
345 
346   /**
347    * This servers startcode.
348    */
349   protected final long startcode;
350 
351   /**
352    * Unique identifier for the cluster we are a part of.
353    */
354   private String clusterId;
355 
356   /**
357    * MX Bean for RegionServerInfo
358    */
359   private ObjectName mxBean = null;
360 
361   /**
362    * Chore to clean periodically the moved region list
363    */
364   private MovedRegionsCleaner movedRegionsCleaner;
365 
366   private RegionServerCoprocessorHost rsHost;
367 
368   private RegionServerProcedureManagerHost rspmHost;
369 
370   // Table level lock manager for locking for region operations
371   protected TableLockManager tableLockManager;
372 
373   /**
374    * Nonce manager. Nonces are used to make operations like increment and append idempotent
375    * in the case where client doesn't receive the response from a successful operation and
376    * retries. We track the successful ops for some time via a nonce sent by client and handle
377    * duplicate operations (currently, by failing them; in future we might use MVCC to return
378    * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
379    * HBASE-3787) are:
380    * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
381    *   of past records. If we don't read the records, we don't read and recover the nonces.
382    *   Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
383    * - There's no WAL recovery during normal region move, so nonces will not be transfered.
384    * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
385    * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
386    * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
387    * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
388    * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
389    * latest nonce in it expired. It can also be recovered during move.
390    */
391   final ServerNonceManager nonceManager;
392 
393   private UserProvider userProvider;
394 
395   protected final RSRpcServices rpcServices;
396 
397   protected BaseCoordinatedStateManager csm;
398 
399   private final boolean useZKForAssignment;
400 
401   /**
402    * Starts a HRegionServer at the default location.
403    * @param conf
404    * @throws IOException
405    * @throws InterruptedException
406    */
407   public HRegionServer(Configuration conf) throws IOException, InterruptedException {
408     this(conf, CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
409   }
410 
411   /**
412    * Starts a HRegionServer at the default location
413    * @param conf
414    * @param csm implementation of CoordinatedStateManager to be used
415    * @throws IOException
416    * @throws InterruptedException
417    */
418   public HRegionServer(Configuration conf, CoordinatedStateManager csm)
419       throws IOException, InterruptedException {
420     this.fsOk = true;
421     this.conf = conf;
422     checkCodecs(this.conf);
423     this.online = new AtomicBoolean(false);
424     this.userProvider = UserProvider.instantiate(conf);
425     FSUtils.setupShortCircuitRead(this.conf);
426 
427     // Config'ed params
428     this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
429         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
430     this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
431     this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
432 
433     this.sleeper = new Sleeper(this.msgInterval, this);
434 
435     boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
436     this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
437 
438     this.numRegionsToReport = conf.getInt(
439       "hbase.regionserver.numregionstoreport", 10);
440 
441     this.operationTimeout = conf.getInt(
442       HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
443       HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
444 
445     this.abortRequested = false;
446     this.stopped = false;
447 
448     rpcServices = createRpcServices();
449     this.startcode = System.currentTimeMillis();
450     String hostName = rpcServices.isa.getHostName();
451     serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode);
452 
453     // login the zookeeper client principal (if using security)
454     ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
455       "hbase.zookeeper.client.kerberos.principal", hostName);
456     // login the server principal (if using secure Hadoop)
457     login(userProvider, hostName);
458 
459     regionServerAccounting = new RegionServerAccounting();
460     cacheConfig = new CacheConfig(conf);
461     uncaughtExceptionHandler = new UncaughtExceptionHandler() {
462       @Override
463       public void uncaughtException(Thread t, Throwable e) {
464         abort("Uncaught exception in service thread " + t.getName(), e);
465       }
466     };
467 
468     useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
469 
470     // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
471     // underlying hadoop hdfs accessors will be going against wrong filesystem
472     // (unless all is set to defaults).
473     FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
474     // Get fs instance used by this RS.  Do we use checksum verification in the hbase? If hbase
475     // checksum verification enabled, then automatically switch off hdfs checksum verification.
476     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
477     this.fs = new HFileSystem(this.conf, useHBaseChecksum);
478     this.rootDir = FSUtils.getRootDir(this.conf);
479     this.tableDescriptors = new FSTableDescriptors(
480       this.fs, this.rootDir, !canUpdateTableDescriptor());
481 
482     service = new ExecutorService(getServerName().toShortString());
483     spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
484 
485     // Some unit tests don't need a cluster, so no zookeeper at all
486     if (!conf.getBoolean("hbase.testing.nocluster", false)) {
487       // Open connection to zookeeper and set primary watcher
488       zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
489         rpcServices.isa.getPort(), this, canCreateBaseZNode());
490 
491       this.csm = (BaseCoordinatedStateManager) csm;
492       this.csm.initialize(this);
493       this.csm.start();
494 
495       tableLockManager = TableLockManager.createTableLockManager(
496         conf, zooKeeper, serverName);
497 
498       masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
499       masterAddressTracker.start();
500 
501       clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
502       clusterStatusTracker.start();
503     }
504 
505     rpcServices.start();
506     putUpWebUI();
507   }
508 
509   protected void login(UserProvider user, String host) throws IOException {
510     user.login("hbase.regionserver.keytab.file",
511       "hbase.regionserver.kerberos.principal", host);
512   }
513 
514   protected String getProcessName() {
515     return REGIONSERVER;
516   }
517 
518   protected boolean canCreateBaseZNode() {
519     return false;
520   }
521 
522   protected boolean canUpdateTableDescriptor() {
523     return false;
524   }
525 
526   protected RSRpcServices createRpcServices() throws IOException {
527     return new RSRpcServices(this);
528   }
529 
530   protected void configureInfoServer() {
531     infoServer.addServlet("rs-status", "/rs-status", RSStatusServlet.class);
532     infoServer.setAttribute(REGIONSERVER, this);
533   }
534 
535   protected Class<? extends HttpServlet> getDumpServlet() {
536     return RSDumpServlet.class;
537   }
538 
539   protected void doMetrics() {
540   }
541 
542   /**
543    * Create CatalogTracker.
544    * In its own method so can intercept and mock it over in tests.
545    * @throws IOException
546    */
547   protected CatalogTracker createCatalogTracker() throws IOException {
548     HConnection conn = ConnectionUtils.createShortCircuitHConnection(
549       HConnectionManager.getConnection(conf), serverName, rpcServices, rpcServices);
550     return new CatalogTracker(zooKeeper, conf, conn, this);
551   }
552 
553   /**
554    * Run test on configured codecs to make sure supporting libs are in place.
555    * @param c
556    * @throws IOException
557    */
558   private static void checkCodecs(final Configuration c) throws IOException {
559     // check to see if the codec list is available:
560     String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
561     if (codecs == null) return;
562     for (String codec : codecs) {
563       if (!CompressionTest.testCompression(codec)) {
564         throw new IOException("Compression codec " + codec +
565           " not supported, aborting RS construction");
566       }
567     }
568   }
569 
570   public String getClusterId() {
571     return this.clusterId;
572   }
573 
574   /**
575    * All initialization needed before we go register with Master.
576    *
577    * @throws IOException
578    * @throws InterruptedException
579    */
580   private void preRegistrationInitialization(){
581     try {
582       initializeZooKeeper();
583       initializeThreads();
584     } catch (Throwable t) {
585       // Call stop if error or process will stick around for ever since server
586       // puts up non-daemon threads.
587       this.rpcServices.stop();
588       abort("Initialization of RS failed.  Hence aborting RS.", t);
589     }
590   }
591 
592   /**
593    * Bring up connection to zk ensemble and then wait until a master for this
594    * cluster and then after that, wait until cluster 'up' flag has been set.
595    * This is the order in which master does things.
596    * Finally put up a catalog tracker.
597    * @throws IOException
598    * @throws InterruptedException
599    */
600   private void initializeZooKeeper() throws IOException, InterruptedException {
601     // Create the master address tracker, register with zk, and start it.  Then
602     // block until a master is available.  No point in starting up if no master
603     // running.
604     this.masterAddressTracker = new MasterAddressTracker(this.zooKeeper, this);
605     this.masterAddressTracker.start();
606     blockAndCheckIfStopped(this.masterAddressTracker);
607 
608     // Wait on cluster being up.  Master will set this flag up in zookeeper
609     // when ready.
610     blockAndCheckIfStopped(this.clusterStatusTracker);
611 
612     // Retrieve clusterId
613     // Since cluster status is now up
614     // ID should have already been set by HMaster
615     try {
616       clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
617       if (clusterId == null) {
618         this.abort("Cluster ID has not been set");
619       }
620       LOG.info("ClusterId : "+clusterId);
621     } catch (KeeperException e) {
622       this.abort("Failed to retrieve Cluster ID",e);
623     }
624 
625     // Now we have the cluster ID, start catalog tracker
626     startCatalogTracker();
627 
628     // watch for snapshots and other procedures
629     try {
630       rspmHost = new RegionServerProcedureManagerHost();
631       rspmHost.loadProcedures(conf);
632       rspmHost.initialize(this);
633     } catch (KeeperException e) {
634       this.abort("Failed to reach zk cluster when creating procedure handler.", e);
635     }
636     // register watcher for recovering regions
637     this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
638   }
639 
640   /**
641    * Utilty method to wait indefinitely on a znode availability while checking
642    * if the region server is shut down
643    * @param tracker znode tracker to use
644    * @throws IOException any IO exception, plus if the RS is stopped
645    * @throws InterruptedException
646    */
647   private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker)
648       throws IOException, InterruptedException {
649     while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
650       if (this.stopped) {
651         throw new IOException("Received the shutdown message while waiting.");
652       }
653     }
654   }
655 
656   /**
657    * @return False if cluster shutdown in progress
658    */
659   private boolean isClusterUp() {
660     return this.clusterStatusTracker.isClusterUp();
661   }
662 
663   private void initializeThreads() throws IOException {
664     // Cache flushing thread.
665     this.cacheFlusher = new MemStoreFlusher(conf, this);
666 
667     // Compaction thread
668     this.compactSplitThread = new CompactSplitThread(this);
669 
670     // Background thread to check for compactions; needed if region has not gotten updates
671     // in a while. It will take care of not checking too frequently on store-by-store basis.
672     this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
673     this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
674     // Health checker thread.
675     int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
676       HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
677     if (isHealthCheckerConfigured()) {
678       healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
679     }
680 
681     this.leases = new Leases(this.threadWakeFrequency);
682 
683     // Create the thread to clean the moved regions list
684     movedRegionsCleaner = MovedRegionsCleaner.createAndStart(this);
685 
686     if (this.nonceManager != null) {
687       // Create the chore that cleans up nonces.
688       nonceManagerChore = this.nonceManager.createCleanupChore(this);
689     }
690 
691     // Setup RPC client for master communication
692     rpcClient = new RpcClient(conf, clusterId, new InetSocketAddress(
693       rpcServices.isa.getAddress(), 0));
694     this.pauseMonitor = new JvmPauseMonitor(conf);
695     pauseMonitor.start();
696   }
697 
698   /**
699    * Create and start the catalog tracker if not already done.
700    */
701   protected synchronized void startCatalogTracker()
702       throws IOException, InterruptedException {
703     if (catalogTracker == null) {
704       catalogTracker = createCatalogTracker();
705       catalogTracker.start();
706     }
707   }
708 
709   /**
710    * The HRegionServer sticks in this loop until closed.
711    */
712   @Override
713   public void run() {
714     try {
715       // Do pre-registration initializations; zookeeper, lease threads, etc.
716       preRegistrationInitialization();
717     } catch (Throwable e) {
718       abort("Fatal exception during initialization", e);
719     }
720 
721     try {
722       if (!isStopped() && !isAborted()) {
723         ShutdownHook.install(conf, fs, this, Thread.currentThread());
724         // Set our ephemeral znode up in zookeeper now we have a name.
725         createMyEphemeralNode();
726         // Initialize the RegionServerCoprocessorHost now that our ephemeral
727         // node was created, in case any coprocessors want to use ZooKeeper
728         this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
729       }
730 
731       // Try and register with the Master; tell it we are here.  Break if
732       // server is stopped or the clusterup flag is down or hdfs went wacky.
733       while (keepLooping()) {
734         RegionServerStartupResponse w = reportForDuty();
735         if (w == null) {
736           LOG.warn("reportForDuty failed; sleeping and then retrying.");
737           this.sleeper.sleep();
738         } else {
739           handleReportForDutyResponse(w);
740           break;
741         }
742       }
743 
744       if (!isStopped() && isHealthy()){
745         // start the snapshot handler and other procedure handlers,
746         // since the server is ready to run
747         rspmHost.start();
748       }
749 
750       // We registered with the Master.  Go into run mode.
751       long lastMsg = System.currentTimeMillis();
752       long oldRequestCount = -1;
753       // The main run loop.
754       while (!isStopped() && isHealthy()) {
755         if (!isClusterUp()) {
756           if (isOnlineRegionsEmpty()) {
757             stop("Exiting; cluster shutdown set and not carrying any regions");
758           } else if (!this.stopping) {
759             this.stopping = true;
760             LOG.info("Closing user regions");
761             closeUserRegions(this.abortRequested);
762           } else if (this.stopping) {
763             boolean allUserRegionsOffline = areAllUserRegionsOffline();
764             if (allUserRegionsOffline) {
765               // Set stopped if no more write requests tp meta tables
766               // since last time we went around the loop.  Any open
767               // meta regions will be closed on our way out.
768               if (oldRequestCount == getWriteRequestCount()) {
769                 stop("Stopped; only catalog regions remaining online");
770                 break;
771               }
772               oldRequestCount = getWriteRequestCount();
773             } else {
774               // Make sure all regions have been closed -- some regions may
775               // have not got it because we were splitting at the time of
776               // the call to closeUserRegions.
777               closeUserRegions(this.abortRequested);
778             }
779             LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
780           }
781         }
782         long now = System.currentTimeMillis();
783         if ((now - lastMsg) >= msgInterval) {
784           tryRegionServerReport(lastMsg, now);
785           lastMsg = System.currentTimeMillis();
786           doMetrics();
787         }
788         if (!isStopped() && !isAborted()) {
789           this.sleeper.sleep();
790         }
791       } // for
792     } catch (Throwable t) {
793       if (!rpcServices.checkOOME(t)) {
794         String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
795         abort(prefix + t.getMessage(), t);
796       }
797     }
798     // Run shutdown.
799     if (mxBean != null) {
800       MBeanUtil.unregisterMBean(mxBean);
801       mxBean = null;
802     }
803     if (this.leases != null) this.leases.closeAfterLeasesExpire();
804     if (this.splitLogWorker != null) {
805       splitLogWorker.stop();
806     }
807     if (this.infoServer != null) {
808       LOG.info("Stopping infoServer");
809       try {
810         this.infoServer.stop();
811       } catch (Exception e) {
812         LOG.error("Failed to stop infoServer", e);
813       }
814     }
815     // Send cache a shutdown.
816     if (cacheConfig.isBlockCacheEnabled()) {
817       cacheConfig.getBlockCache().shutdown();
818     }
819 
820     movedRegionsCleaner.stop("Region Server stopping");
821 
822     // Send interrupts to wake up threads if sleeping so they notice shutdown.
823     // TODO: Should we check they are alive? If OOME could have exited already
824     if(this.hMemManager != null) this.hMemManager.stop();
825     if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
826     if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
827     if (this.compactionChecker != null)
828       this.compactionChecker.interrupt();
829     if (this.healthCheckChore != null) {
830       this.healthCheckChore.interrupt();
831     }
832     if (this.nonceManagerChore != null) {
833       this.nonceManagerChore.interrupt();
834     }
835 
836     // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
837     rspmHost.stop(this.abortRequested || this.killed);
838 
839     if (this.killed) {
840       // Just skip out w/o closing regions.  Used when testing.
841     } else if (abortRequested) {
842       if (this.fsOk) {
843         closeUserRegions(abortRequested); // Don't leave any open file handles
844       }
845       LOG.info("aborting server " + this.serverName);
846     } else {
847       closeUserRegions(abortRequested);
848       LOG.info("stopping server " + this.serverName);
849     }
850     // Interrupt catalog tracker here in case any regions being opened out in
851     // handlers are stuck waiting on meta.
852     if (this.catalogTracker != null) this.catalogTracker.stop();
853 
854     // Closing the compactSplit thread before closing meta regions
855     if (!this.killed && containsMetaTableRegions()) {
856       if (!abortRequested || this.fsOk) {
857         if (this.compactSplitThread != null) {
858           this.compactSplitThread.join();
859           this.compactSplitThread = null;
860         }
861         closeMetaTableRegions(abortRequested);
862       }
863     }
864 
865     if (!this.killed && this.fsOk) {
866       waitOnAllRegionsToClose(abortRequested);
867       LOG.info("stopping server " + this.serverName +
868         "; all regions closed.");
869     }
870 
871     //fsOk flag may be changed when closing regions throws exception.
872     if (this.fsOk) {
873       closeWAL(!abortRequested);
874     }
875 
876     // Make sure the proxy is down.
877     if (this.rssStub != null) {
878       this.rssStub = null;
879     }
880     this.rpcClient.stop();
881     this.leases.close();
882     if (this.pauseMonitor != null) {
883       this.pauseMonitor.stop();
884     }
885 
886     if (!killed) {
887       stopServiceThreads();
888     }
889 
890     this.rpcServices.stop();
891 
892     try {
893       deleteMyEphemeralNode();
894     } catch (KeeperException e) {
895       LOG.warn("Failed deleting my ephemeral node", e);
896     }
897     // We may have failed to delete the znode at the previous step, but
898     //  we delete the file anyway: a second attempt to delete the znode is likely to fail again.
899     ZNodeClearer.deleteMyEphemeralNodeOnDisk();
900 
901     this.zooKeeper.close();
902     LOG.info("stopping server " + this.serverName +
903       "; zookeeper connection closed.");
904 
905     LOG.info(Thread.currentThread().getName() + " exiting");
906   }
907 
908   private boolean containsMetaTableRegions() {
909     return onlineRegions.containsKey(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
910   }
911 
912   private boolean areAllUserRegionsOffline() {
913     if (getNumberOfOnlineRegions() > 2) return false;
914     boolean allUserRegionsOffline = true;
915     for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
916       if (!e.getValue().getRegionInfo().isMetaTable()) {
917         allUserRegionsOffline = false;
918         break;
919       }
920     }
921     return allUserRegionsOffline;
922   }
923 
924   /**
925    * @return Current write count for all online regions.
926    */
927   private long getWriteRequestCount() {
928     int writeCount = 0;
929     for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
930       writeCount += e.getValue().getWriteRequestsCount();
931     }
932     return writeCount;
933   }
934 
935   @VisibleForTesting
936   protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
937   throws IOException {
938     RegionServerStatusService.BlockingInterface rss = rssStub;
939     if (rss == null) {
940       // the current server could be stopping.
941       return;
942     }
943     ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
944     try {
945       RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
946       ServerName sn = ServerName.parseVersionedServerName(
947         this.serverName.getVersionedBytes());
948       request.setServer(ProtobufUtil.toServerName(sn));
949       request.setLoad(sl);
950       rss.regionServerReport(null, request.build());
951     } catch (ServiceException se) {
952       IOException ioe = ProtobufUtil.getRemoteException(se);
953       if (ioe instanceof YouAreDeadException) {
954         // This will be caught and handled as a fatal error in run()
955         throw ioe;
956       }
957       if (rssStub == rss) {
958         rssStub = null;
959       }
960       // Couldn't connect to the master, get location from zk and reconnect
961       // Method blocks until new master is found or we are stopped
962       createRegionServerStatusStub();
963     }
964   }
965 
966   ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) {
967     // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
968     // per second, and other metrics  As long as metrics are part of ServerLoad it's best to use
969     // the wrapper to compute those numbers in one place.
970     // In the long term most of these should be moved off of ServerLoad and the heart beat.
971     // Instead they should be stored in an HBase table so that external visibility into HBase is
972     // improved; Additionally the load balancer will be able to take advantage of a more complete
973     // history.
974     MetricsRegionServerWrapper regionServerWrapper = this.metricsRegionServer.getRegionServerWrapper();
975     Collection<HRegion> regions = getOnlineRegionsLocalContext();
976     MemoryUsage memory =
977       ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
978 
979     ClusterStatusProtos.ServerLoad.Builder serverLoad =
980       ClusterStatusProtos.ServerLoad.newBuilder();
981     serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
982     serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
983     serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024));
984     serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024));
985     Set<String> coprocessors = this.hlog.getCoprocessorHost().getCoprocessors();
986     for (String coprocessor : coprocessors) {
987       serverLoad.addCoprocessors(
988         Coprocessor.newBuilder().setName(coprocessor).build());
989     }
990     RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
991     RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
992     for (HRegion region : regions) {
993       serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
994     }
995     serverLoad.setReportStartTime(reportStartTime);
996     serverLoad.setReportEndTime(reportEndTime);
997     if (this.infoServer != null) {
998       serverLoad.setInfoServerPort(this.infoServer.getPort());
999     } else {
1000       serverLoad.setInfoServerPort(-1);
1001     }
1002     return serverLoad.build();
1003   }
1004 
1005   String getOnlineRegionsAsPrintableString() {
1006     StringBuilder sb = new StringBuilder();
1007     for (HRegion r: this.onlineRegions.values()) {
1008       if (sb.length() > 0) sb.append(", ");
1009       sb.append(r.getRegionInfo().getEncodedName());
1010     }
1011     return sb.toString();
1012   }
1013 
1014   /**
1015    * Wait on regions close.
1016    */
1017   private void waitOnAllRegionsToClose(final boolean abort) {
1018     // Wait till all regions are closed before going out.
1019     int lastCount = -1;
1020     long previousLogTime = 0;
1021     Set<String> closedRegions = new HashSet<String>();
1022     boolean interrupted = false;
1023     try {
1024       while (!isOnlineRegionsEmpty()) {
1025         int count = getNumberOfOnlineRegions();
1026         // Only print a message if the count of regions has changed.
1027         if (count != lastCount) {
1028           // Log every second at most
1029           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1030             previousLogTime = System.currentTimeMillis();
1031             lastCount = count;
1032             LOG.info("Waiting on " + count + " regions to close");
1033             // Only print out regions still closing if a small number else will
1034             // swamp the log.
1035             if (count < 10 && LOG.isDebugEnabled()) {
1036               LOG.debug(this.onlineRegions);
1037             }
1038           }
1039         }
1040         // Ensure all user regions have been sent a close. Use this to
1041         // protect against the case where an open comes in after we start the
1042         // iterator of onlineRegions to close all user regions.
1043         for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1044           HRegionInfo hri = e.getValue().getRegionInfo();
1045           if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1046               && !closedRegions.contains(hri.getEncodedName())) {
1047             closedRegions.add(hri.getEncodedName());
1048             // Don't update zk with this close transition; pass false.
1049             closeRegionIgnoreErrors(hri, abort);
1050               }
1051         }
1052         // No regions in RIT, we could stop waiting now.
1053         if (this.regionsInTransitionInRS.isEmpty()) {
1054           if (!isOnlineRegionsEmpty()) {
1055             LOG.info("We were exiting though online regions are not empty," +
1056                 " because some regions failed closing");
1057           }
1058           break;
1059         }
1060         try {
1061           Thread.sleep(200);
1062         } catch (InterruptedException e) {
1063           interrupted = true;
1064           LOG.warn("Interrupted while sleeping");
1065         }
1066       }
1067     } finally {
1068       if (interrupted) {
1069         Thread.currentThread().interrupt();
1070       }
1071     }
1072   }
1073 
1074   private void closeWAL(final boolean delete) {
1075     if (this.hlogForMeta != null) {
1076       // All hlogs (meta and non-meta) are in the same directory. Don't call
1077       // closeAndDelete here since that would delete all hlogs not just the
1078       // meta ones. We will just 'close' the hlog for meta here, and leave
1079       // the directory cleanup to the follow-on closeAndDelete call.
1080       try {
1081         this.hlogForMeta.close();
1082       } catch (Throwable e) {
1083         LOG.error("Metalog close and delete failed", RemoteExceptionHandler.checkThrowable(e));
1084       }
1085     }
1086     if (this.hlog != null) {
1087       try {
1088         if (delete) {
1089           hlog.closeAndDelete();
1090         } else {
1091           hlog.close();
1092         }
1093       } catch (Throwable e) {
1094         LOG.error("Close and delete failed", RemoteExceptionHandler.checkThrowable(e));
1095       }
1096     }
1097   }
1098 
1099   /*
1100    * Run init. Sets up hlog and starts up all server threads.
1101    *
1102    * @param c Extra configuration.
1103    */
1104   protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1105   throws IOException {
1106     try {
1107       for (NameStringPair e : c.getMapEntriesList()) {
1108         String key = e.getName();
1109         // The hostname the master sees us as.
1110         if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1111           String hostnameFromMasterPOV = e.getValue();
1112           this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1113             rpcServices.isa.getPort(), this.startcode);
1114           if (!hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) {
1115             LOG.info("Master passed us a different hostname to use; was=" +
1116               rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV);
1117           }
1118           continue;
1119         }
1120         String value = e.getValue();
1121         if (LOG.isDebugEnabled()) {
1122           LOG.info("Config from master: " + key + "=" + value);
1123         }
1124         this.conf.set(key, value);
1125       }
1126 
1127       // hack! Maps DFSClient => RegionServer for logs.  HDFS made this
1128       // config param for task trackers, but we can piggyback off of it.
1129       if (this.conf.get("mapreduce.task.attempt.id") == null) {
1130         this.conf.set("mapreduce.task.attempt.id", "hb_rs_" +
1131           this.serverName.toString());
1132       }
1133 
1134       // Save it in a file, this will allow to see if we crash
1135       ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1136 
1137       this.hlog = setupWALAndReplication();
1138       // Init in here rather than in constructor after thread name has been set
1139       this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
1140 
1141       startServiceThreads();
1142       startHeapMemoryManager();
1143       LOG.info("Serving as " + this.serverName +
1144         ", RpcServer on " + rpcServices.isa +
1145         ", sessionid=0x" +
1146         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1147       synchronized (online) {
1148         online.set(true);
1149         online.notifyAll();
1150       }
1151     } catch (Throwable e) {
1152       stop("Failed initialization");
1153       throw convertThrowableToIOE(cleanup(e, "Failed init"),
1154           "Region server startup failed");
1155     } finally {
1156       sleeper.skipSleepCycle();
1157     }
1158   }
1159 
1160   private void startHeapMemoryManager() {
1161     this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this);
1162     if (this.hMemManager != null) {
1163       this.hMemManager.start();
1164     }
1165   }
1166 
1167   private void createMyEphemeralNode() throws KeeperException, IOException {
1168     RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1169     rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1170     byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1171     ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper,
1172       getMyEphemeralNodePath(), data);
1173   }
1174 
1175   private void deleteMyEphemeralNode() throws KeeperException {
1176     ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1177   }
1178 
1179   @Override
1180   public RegionServerAccounting getRegionServerAccounting() {
1181     return regionServerAccounting;
1182   }
1183 
1184   @Override
1185   public TableLockManager getTableLockManager() {
1186     return tableLockManager;
1187   }
1188 
1189   /*
1190    * @param r Region to get RegionLoad for.
1191    * @param regionLoadBldr the RegionLoad.Builder, can be null
1192    * @param regionSpecifier the RegionSpecifier.Builder, can be null
1193    * @return RegionLoad instance.
1194    *
1195    * @throws IOException
1196    */
1197   private RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1198       RegionSpecifier.Builder regionSpecifier) {
1199     byte[] name = r.getRegionName();
1200     int stores = 0;
1201     int storefiles = 0;
1202     int storeUncompressedSizeMB = 0;
1203     int storefileSizeMB = 0;
1204     int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
1205     int storefileIndexSizeMB = 0;
1206     int rootIndexSizeKB = 0;
1207     int totalStaticIndexSizeKB = 0;
1208     int totalStaticBloomSizeKB = 0;
1209     long totalCompactingKVs = 0;
1210     long currentCompactedKVs = 0;
1211     synchronized (r.stores) {
1212       stores += r.stores.size();
1213       for (Store store : r.stores.values()) {
1214         storefiles += store.getStorefilesCount();
1215         storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed()
1216             / 1024 / 1024);
1217         storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1218         storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
1219         CompactionProgress progress = store.getCompactionProgress();
1220         if (progress != null) {
1221           totalCompactingKVs += progress.totalCompactingKVs;
1222           currentCompactedKVs += progress.currentCompactedKVs;
1223         }
1224 
1225         rootIndexSizeKB +=
1226             (int) (store.getStorefilesIndexSize() / 1024);
1227 
1228         totalStaticIndexSizeKB +=
1229           (int) (store.getTotalStaticIndexSize() / 1024);
1230 
1231         totalStaticBloomSizeKB +=
1232           (int) (store.getTotalStaticBloomSize() / 1024);
1233       }
1234     }
1235     if (regionLoadBldr == null) {
1236       regionLoadBldr = RegionLoad.newBuilder();
1237     }
1238     if (regionSpecifier == null) {
1239       regionSpecifier = RegionSpecifier.newBuilder();
1240     }
1241     regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1242     regionSpecifier.setValue(HBaseZeroCopyByteString.wrap(name));
1243     regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1244       .setStores(stores)
1245       .setStorefiles(storefiles)
1246       .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1247       .setStorefileSizeMB(storefileSizeMB)
1248       .setMemstoreSizeMB(memstoreSizeMB)
1249       .setStorefileIndexSizeMB(storefileIndexSizeMB)
1250       .setRootIndexSizeKB(rootIndexSizeKB)
1251       .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1252       .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1253       .setReadRequestsCount(r.readRequestsCount.get())
1254       .setWriteRequestsCount(r.writeRequestsCount.get())
1255       .setTotalCompactingKVs(totalCompactingKVs)
1256       .setCurrentCompactedKVs(currentCompactedKVs)
1257       .setCompleteSequenceId(r.lastFlushSeqId);
1258 
1259     return regionLoadBldr.build();
1260   }
1261 
1262   /**
1263    * @param encodedRegionName
1264    * @return An instance of RegionLoad.
1265    */
1266   public RegionLoad createRegionLoad(final String encodedRegionName) {
1267     HRegion r = null;
1268     r = this.onlineRegions.get(encodedRegionName);
1269     return r != null ? createRegionLoad(r, null, null) : null;
1270   }
1271 
1272   /*
1273    * Inner class that runs on a long period checking if regions need compaction.
1274    */
1275   private static class CompactionChecker extends Chore {
1276     private final HRegionServer instance;
1277     private final int majorCompactPriority;
1278     private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1279     private long iteration = 0;
1280 
1281     CompactionChecker(final HRegionServer h, final int sleepTime,
1282         final Stoppable stopper) {
1283       super("CompactionChecker", sleepTime, h);
1284       this.instance = h;
1285       LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));
1286 
1287       /* MajorCompactPriority is configurable.
1288        * If not set, the compaction will use default priority.
1289        */
1290       this.majorCompactPriority = this.instance.conf.
1291         getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1292         DEFAULT_PRIORITY);
1293     }
1294 
1295     @Override
1296     protected void chore() {
1297       for (HRegion r : this.instance.onlineRegions.values()) {
1298         if (r == null)
1299           continue;
1300         for (Store s : r.getStores().values()) {
1301           try {
1302             long multiplier = s.getCompactionCheckMultiplier();
1303             assert multiplier > 0;
1304             if (iteration % multiplier != 0) continue;
1305             if (s.needsCompaction()) {
1306               // Queue a compaction. Will recognize if major is needed.
1307               this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
1308                   + " requests compaction");
1309             } else if (s.isMajorCompaction()) {
1310               if (majorCompactPriority == DEFAULT_PRIORITY
1311                   || majorCompactPriority > r.getCompactPriority()) {
1312                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1313                     + " requests major compaction; use default priority", null);
1314               } else {
1315                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1316                     + " requests major compaction; use configured priority",
1317                   this.majorCompactPriority, null);
1318               }
1319             }
1320           } catch (IOException e) {
1321             LOG.warn("Failed major compaction check on " + r, e);
1322           }
1323         }
1324       }
1325       iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1326     }
1327   }
1328 
1329   class PeriodicMemstoreFlusher extends Chore {
1330     final HRegionServer server;
1331     final static int RANGE_OF_DELAY = 20000; //millisec
1332     final static int MIN_DELAY_TIME = 3000; //millisec
1333     public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1334       super(server.getServerName() + "-MemstoreFlusherChore", cacheFlushInterval, server);
1335       this.server = server;
1336     }
1337 
1338     @Override
1339     protected void chore() {
1340       for (HRegion r : this.server.onlineRegions.values()) {
1341         if (r == null)
1342           continue;
1343         if (r.shouldFlush()) {
1344           FlushRequester requester = server.getFlushRequester();
1345           if (requester != null) {
1346             long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
1347             LOG.info(getName() + " requesting flush for region " + r.getRegionNameAsString() +
1348                 " after a delay of " + randomDelay);
1349             //Throttle the flushes by putting a delay. If we don't throttle, and there
1350             //is a balanced write-load on the regions in a table, we might end up
1351             //overwhelming the filesystem with too many flushes at once.
1352             requester.requestDelayedFlush(r, randomDelay);
1353           }
1354         }
1355       }
1356     }
1357   }
1358 
1359   /**
1360    * Report the status of the server. A server is online once all the startup is
1361    * completed (setting up filesystem, starting service threads, etc.). This
1362    * method is designed mostly to be useful in tests.
1363    *
1364    * @return true if online, false if not.
1365    */
1366   public boolean isOnline() {
1367     return online.get();
1368   }
1369 
1370   /**
1371    * Setup WAL log and replication if enabled.
1372    * Replication setup is done in here because it wants to be hooked up to WAL.
1373    * @return A WAL instance.
1374    * @throws IOException
1375    */
1376   private HLog setupWALAndReplication() throws IOException {
1377     final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1378     final String logName
1379       = HLogUtil.getHLogDirectoryName(this.serverName.toString());
1380 
1381     Path logdir = new Path(rootDir, logName);
1382     if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1383     if (this.fs.exists(logdir)) {
1384       throw new RegionServerRunningException("Region server has already " +
1385         "created directory at " + this.serverName.toString());
1386     }
1387 
1388     // Instantiate replication manager if replication enabled.  Pass it the
1389     // log directories.
1390     createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
1391 
1392     return instantiateHLog(rootDir, logName);
1393   }
1394 
1395   private HLog getMetaWAL() throws IOException {
1396     if (this.hlogForMeta != null) return this.hlogForMeta;
1397     final String logName = HLogUtil.getHLogDirectoryName(this.serverName.toString());
1398     Path logdir = new Path(rootDir, logName);
1399     if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1400     this.hlogForMeta = HLogFactory.createMetaHLog(this.fs.getBackingFs(), rootDir, logName,
1401       this.conf, getMetaWALActionListeners(), this.serverName.toString());
1402     return this.hlogForMeta;
1403   }
1404 
1405   /**
1406    * Called by {@link #setupWALAndReplication()} creating WAL instance.
1407    * @param rootdir
1408    * @param logName
1409    * @return WAL instance.
1410    * @throws IOException
1411    */
1412   protected HLog instantiateHLog(Path rootdir, String logName) throws IOException {
1413     return HLogFactory.createHLog(this.fs.getBackingFs(), rootdir, logName, this.conf,
1414       getWALActionListeners(), this.serverName.toString());
1415   }
1416 
1417   /**
1418    * Called by {@link #instantiateHLog(Path, String)} setting up WAL instance.
1419    * Add any {@link WALActionsListener}s you want inserted before WAL startup.
1420    * @return List of WALActionsListener that will be passed in to
1421    * {@link org.apache.hadoop.hbase.regionserver.wal.FSHLog} on construction.
1422    */
1423   protected List<WALActionsListener> getWALActionListeners() {
1424     List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1425     // Log roller.
1426     this.hlogRoller = new LogRoller(this, this);
1427     listeners.add(this.hlogRoller);
1428     if (this.replicationSourceHandler != null &&
1429         this.replicationSourceHandler.getWALActionsListener() != null) {
1430       // Replication handler is an implementation of WALActionsListener.
1431       listeners.add(this.replicationSourceHandler.getWALActionsListener());
1432     }
1433     return listeners;
1434   }
1435 
1436   protected List<WALActionsListener> getMetaWALActionListeners() {
1437     List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1438     // Using a tmp log roller to ensure metaLogRoller is alive once it is not
1439     // null
1440     MetaLogRoller tmpLogRoller = new MetaLogRoller(this, this);
1441     String n = Thread.currentThread().getName();
1442     Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
1443         n + "-MetaLogRoller", uncaughtExceptionHandler);
1444     this.metaHLogRoller = tmpLogRoller;
1445     tmpLogRoller = null;
1446     listeners.add(this.metaHLogRoller);
1447     return listeners;
1448   }
1449 
1450   protected LogRoller getLogRoller() {
1451     return hlogRoller;
1452   }
1453 
1454   public MetricsRegionServer getRegionServerMetrics() {
1455     return this.metricsRegionServer;
1456   }
1457 
1458   /**
1459    * @return Master address tracker instance.
1460    */
1461   public MasterAddressTracker getMasterAddressTracker() {
1462     return this.masterAddressTracker;
1463   }
1464 
1465   /*
1466    * Start maintenance Threads, Server, Worker and lease checker threads.
1467    * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1468    * get an unhandled exception. We cannot set the handler on all threads.
1469    * Server's internal Listener thread is off limits. For Server, if an OOME, it
1470    * waits a while then retries. Meantime, a flush or a compaction that tries to
1471    * run should trigger same critical condition and the shutdown will run. On
1472    * its way out, this server will shut down Server. Leases are sort of
1473    * inbetween. It has an internal thread that while it inherits from Chore, it
1474    * keeps its own internal stop mechanism so needs to be stopped by this
1475    * hosting server. Worker logs the exception and exits.
1476    */
1477   private void startServiceThreads() throws IOException {
1478     // Start executor services
1479     this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
1480       conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1481     this.service.startExecutorService(ExecutorType.RS_OPEN_META,
1482       conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1483     this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1484       conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1485     this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
1486       conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1487     if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1488       this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
1489         conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
1490     }
1491     this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS,
1492       conf.getInt("hbase.regionserver.wal.max.splitters", SplitLogWorker.DEFAULT_MAX_SPLITTERS));
1493 
1494     Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), getName() + ".logRoller",
1495         uncaughtExceptionHandler);
1496     this.cacheFlusher.start(uncaughtExceptionHandler);
1497     Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), getName() +
1498       ".compactionChecker", uncaughtExceptionHandler);
1499     Threads.setDaemonThreadRunning(this.periodicFlusher.getThread(), getName() +
1500         ".periodicFlusher", uncaughtExceptionHandler);
1501     if (this.healthCheckChore != null) {
1502       Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), getName() + ".healthChecker",
1503             uncaughtExceptionHandler);
1504     }
1505     if (this.nonceManagerChore != null) {
1506       Threads.setDaemonThreadRunning(this.nonceManagerChore.getThread(), getName() + ".nonceCleaner",
1507             uncaughtExceptionHandler);
1508     }
1509 
1510     // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1511     // an unhandled exception, it will just exit.
1512     this.leases.setName(getName() + ".leaseChecker");
1513     this.leases.start();
1514 
1515     if (this.replicationSourceHandler == this.replicationSinkHandler &&
1516         this.replicationSourceHandler != null) {
1517       this.replicationSourceHandler.startReplicationService();
1518     } else {
1519       if (this.replicationSourceHandler != null) {
1520         this.replicationSourceHandler.startReplicationService();
1521       }
1522       if (this.replicationSinkHandler != null) {
1523         this.replicationSinkHandler.startReplicationService();
1524       }
1525     }
1526 
1527     // Start Server.  This service is like leases in that it internally runs
1528     // a thread.
1529     rpcServices.rpcServer.start();
1530 
1531     // Create the log splitting worker and start it
1532     // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
1533     // quite a while inside HConnection layer. The worker won't be available for other
1534     // tasks even after current task is preempted after a split task times out.
1535     Configuration sinkConf = HBaseConfiguration.create(conf);
1536     sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1537       conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
1538     sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1539       conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
1540     sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
1541     this.splitLogWorker = new SplitLogWorker(this.zooKeeper, sinkConf, this, this);
1542     splitLogWorker.start();
1543   }
1544 
1545   /**
1546    * Puts up the webui.
1547    * @return Returns final port -- maybe different from what we started with.
1548    * @throws IOException
1549    */
1550   private int putUpWebUI() throws IOException {
1551     int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
1552       HConstants.DEFAULT_REGIONSERVER_INFOPORT);
1553     // -1 is for disabling info server
1554     if (port < 0) return port;
1555     String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
1556     // check if auto port bind enabled
1557     boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
1558         false);
1559     while (true) {
1560       try {
1561         this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
1562         infoServer.addServlet("dump", "/dump", getDumpServlet());
1563         configureInfoServer();
1564         this.infoServer.start();
1565         break;
1566       } catch (BindException e) {
1567         if (!auto) {
1568           // auto bind disabled throw BindException
1569           LOG.error("Failed binding http info server to port: " + port);
1570           throw e;
1571         }
1572         // auto bind enabled, try to use another port
1573         LOG.info("Failed binding http info server to port: " + port);
1574         port++;
1575       }
1576     }
1577     port = this.infoServer.getPort();
1578     conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
1579     int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT,
1580       HConstants.DEFAULT_MASTER_INFOPORT);
1581     conf.setInt("hbase.master.info.port.orig", masterInfoPort);
1582     conf.setInt(HConstants.MASTER_INFO_PORT, port);
1583     return port;
1584   }
1585 
1586   /*
1587    * Verify that server is healthy
1588    */
1589   private boolean isHealthy() {
1590     if (!fsOk) {
1591       // File system problem
1592       return false;
1593     }
1594     // Verify that all threads are alive
1595     if (!(leases.isAlive()
1596         && cacheFlusher.isAlive() && hlogRoller.isAlive()
1597         && this.compactionChecker.isAlive()
1598         && this.periodicFlusher.isAlive())) {
1599       stop("One or more threads are no longer alive -- stop");
1600       return false;
1601     }
1602     if (metaHLogRoller != null && !metaHLogRoller.isAlive()) {
1603       stop("Meta HLog roller thread is no longer alive -- stop");
1604       return false;
1605     }
1606     return true;
1607   }
1608 
1609   public HLog getWAL() {
1610     try {
1611       return getWAL(null);
1612     } catch (IOException e) {
1613       LOG.warn("getWAL threw exception " + e);
1614       return null;
1615     }
1616   }
1617 
1618   @Override
1619   public HLog getWAL(HRegionInfo regionInfo) throws IOException {
1620     //TODO: at some point this should delegate to the HLogFactory
1621     //currently, we don't care about the region as much as we care about the
1622     //table.. (hence checking the tablename below)
1623     //_ROOT_ and hbase:meta regions have separate WAL.
1624     if (regionInfo != null && regionInfo.isMetaTable()) {
1625       return getMetaWAL();
1626     }
1627     return this.hlog;
1628   }
1629 
1630   @Override
1631   public CatalogTracker getCatalogTracker() {
1632     return this.catalogTracker;
1633   }
1634 
1635   @Override
1636   public void stop(final String msg) {
1637     if (!this.stopped) {
1638       try {
1639         if (this.rsHost != null) {
1640           this.rsHost.preStop(msg);
1641         }
1642         this.stopped = true;
1643         LOG.info("STOPPED: " + msg);
1644         // Wakes run() if it is sleeping
1645         sleeper.skipSleepCycle();
1646       } catch (IOException exp) {
1647         LOG.warn("The region server did not stop", exp);
1648       }
1649     }
1650   }
1651 
1652   public void waitForServerOnline(){
1653     while (!isOnline() && !isStopped()){
1654        sleeper.sleep();
1655     }
1656   }
1657 
1658   @Override
1659   public void postOpenDeployTasks(final HRegion r, final CatalogTracker ct)
1660   throws KeeperException, IOException {
1661     rpcServices.checkOpen();
1662     LOG.info("Post open deploy tasks for " + r.getRegionNameAsString());
1663     // Do checks to see if we need to compact (references or too many files)
1664     for (Store s : r.getStores().values()) {
1665       if (s.hasReferences() || s.needsCompaction()) {
1666        this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
1667       }
1668     }
1669     long openSeqNum = r.getOpenSeqNum();
1670     if (openSeqNum == HConstants.NO_SEQNUM) {
1671       // If we opened a region, we should have read some sequence number from it.
1672       LOG.error("No sequence number found when opening " + r.getRegionNameAsString());
1673       openSeqNum = 0;
1674     }
1675 
1676     // Update flushed sequence id of a recovering region in ZK
1677     updateRecoveringRegionLastFlushedSequenceId(r);
1678 
1679     // Update ZK, or META
1680     if (r.getRegionInfo().isMetaRegion()) {
1681       MetaRegionTracker.setMetaLocation(getZooKeeper(), serverName);
1682     } else if (useZKForAssignment) {
1683       MetaEditor.updateRegionLocation(ct, r.getRegionInfo(),
1684         this.serverName, openSeqNum);
1685     }
1686     if (!useZKForAssignment && !reportRegionStateTransition(
1687         TransitionCode.OPENED, openSeqNum, r.getRegionInfo())) {
1688       throw new IOException("Failed to report opened region to master: "
1689         + r.getRegionNameAsString());
1690     }
1691 
1692     LOG.debug("Finished post open deploy task for " + r.getRegionNameAsString());
1693   }
1694 
1695   @Override
1696   public boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris) {
1697     return reportRegionStateTransition(code, HConstants.NO_SEQNUM, hris);
1698   }
1699 
1700   @Override
1701   public boolean reportRegionStateTransition(
1702       TransitionCode code, long openSeqNum, HRegionInfo... hris) {
1703     ReportRegionStateTransitionRequest.Builder builder =
1704       ReportRegionStateTransitionRequest.newBuilder();
1705     builder.setServer(ProtobufUtil.toServerName(serverName));
1706     RegionStateTransition.Builder transition = builder.addTransitionBuilder();
1707     transition.setTransitionCode(code);
1708     if (code == TransitionCode.OPENED && openSeqNum >= 0) {
1709       transition.setOpenSeqNum(openSeqNum);
1710     }
1711     for (HRegionInfo hri: hris) {
1712       transition.addRegionInfo(HRegionInfo.convert(hri));
1713     }
1714     ReportRegionStateTransitionRequest request = builder.build();
1715     while (keepLooping()) {
1716       RegionServerStatusService.BlockingInterface rss = rssStub;
1717       try {
1718         if (rss == null) {
1719           createRegionServerStatusStub();
1720           continue;
1721         }
1722         ReportRegionStateTransitionResponse response =
1723           rss.reportRegionStateTransition(null, request);
1724         if (response.hasErrorMessage()) {
1725           LOG.info("Failed to transition " + hris[0]
1726             + " to " + code + ": " + response.getErrorMessage());
1727           return false;
1728         }
1729         return true;
1730       } catch (ServiceException se) {
1731         IOException ioe = ProtobufUtil.getRemoteException(se);
1732         LOG.info("Failed to report region transition, will retry", ioe);
1733         if (rssStub == rss) {
1734           rssStub = null;
1735         }
1736       }
1737     }
1738     return false;
1739   }
1740 
1741   @Override
1742   public RpcServerInterface getRpcServer() {
1743     return rpcServices.rpcServer;
1744   }
1745 
1746   @VisibleForTesting
1747   public RSRpcServices getRSRpcServices() {
1748     return rpcServices;
1749   }
1750 
1751   /**
1752    * Cause the server to exit without closing the regions it is serving, the log
1753    * it is using and without notifying the master. Used unit testing and on
1754    * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
1755    *
1756    * @param reason
1757    *          the reason we are aborting
1758    * @param cause
1759    *          the exception that caused the abort, or null
1760    */
1761   @Override
1762   public void abort(String reason, Throwable cause) {
1763     String msg = "ABORTING region server " + this + ": " + reason;
1764     if (cause != null) {
1765       LOG.fatal(msg, cause);
1766     } else {
1767       LOG.fatal(msg);
1768     }
1769     this.abortRequested = true;
1770     // HBASE-4014: show list of coprocessors that were loaded to help debug
1771     // regionserver crashes.Note that we're implicitly using
1772     // java.util.HashSet's toString() method to print the coprocessor names.
1773     LOG.fatal("RegionServer abort: loaded coprocessors are: " +
1774         CoprocessorHost.getLoadedCoprocessors());
1775     // Do our best to report our abort to the master, but this may not work
1776     try {
1777       if (cause != null) {
1778         msg += "\nCause:\n" + StringUtils.stringifyException(cause);
1779       }
1780       // Report to the master but only if we have already registered with the master.
1781       if (rssStub != null && this.serverName != null) {
1782         ReportRSFatalErrorRequest.Builder builder =
1783           ReportRSFatalErrorRequest.newBuilder();
1784         ServerName sn =
1785           ServerName.parseVersionedServerName(this.serverName.getVersionedBytes());
1786         builder.setServer(ProtobufUtil.toServerName(sn));
1787         builder.setErrorMessage(msg);
1788         rssStub.reportRSFatalError(null, builder.build());
1789       }
1790     } catch (Throwable t) {
1791       LOG.warn("Unable to report fatal error to master", t);
1792     }
1793     stop(reason);
1794   }
1795 
1796   /**
1797    * @see HRegionServer#abort(String, Throwable)
1798    */
1799   public void abort(String reason) {
1800     abort(reason, null);
1801   }
1802 
1803   @Override
1804   public boolean isAborted() {
1805     return this.abortRequested;
1806   }
1807 
1808   /*
1809    * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
1810    * logs but it does close socket in case want to bring up server on old
1811    * hostname+port immediately.
1812    */
1813   protected void kill() {
1814     this.killed = true;
1815     abort("Simulated kill");
1816   }
1817 
1818   /**
1819    * Wait on all threads to finish. Presumption is that all closes and stops
1820    * have already been called.
1821    */
1822   protected void stopServiceThreads() {
1823     if (this.nonceManagerChore != null) {
1824       Threads.shutdown(this.nonceManagerChore.getThread());
1825     }
1826     Threads.shutdown(this.compactionChecker.getThread());
1827     Threads.shutdown(this.periodicFlusher.getThread());
1828     this.cacheFlusher.join();
1829     if (this.healthCheckChore != null) {
1830       Threads.shutdown(this.healthCheckChore.getThread());
1831     }
1832     if (this.spanReceiverHost != null) {
1833       this.spanReceiverHost.closeReceivers();
1834     }
1835     if (this.hlogRoller != null) {
1836       Threads.shutdown(this.hlogRoller.getThread());
1837     }
1838     if (this.metaHLogRoller != null) {
1839       Threads.shutdown(this.metaHLogRoller.getThread());
1840     }
1841     if (this.compactSplitThread != null) {
1842       this.compactSplitThread.join();
1843     }
1844     if (this.service != null) this.service.shutdown();
1845     if (this.replicationSourceHandler != null &&
1846         this.replicationSourceHandler == this.replicationSinkHandler) {
1847       this.replicationSourceHandler.stopReplicationService();
1848     } else {
1849       if (this.replicationSourceHandler != null) {
1850         this.replicationSourceHandler.stopReplicationService();
1851       }
1852       if (this.replicationSinkHandler != null) {
1853         this.replicationSinkHandler.stopReplicationService();
1854       }
1855     }
1856   }
1857 
1858   /**
1859    * @return Return the object that implements the replication
1860    * source service.
1861    */
1862   ReplicationSourceService getReplicationSourceService() {
1863     return replicationSourceHandler;
1864   }
1865 
1866   /**
1867    * @return Return the object that implements the replication
1868    * sink service.
1869    */
1870   ReplicationSinkService getReplicationSinkService() {
1871     return replicationSinkHandler;
1872   }
1873 
1874   /**
1875    * Get the current master from ZooKeeper and open the RPC connection to it.
1876    *
1877    * Method will block until a master is available. You can break from this
1878    * block by requesting the server stop.
1879    *
1880    * @return master + port, or null if server has been stopped
1881    */
1882   private synchronized ServerName createRegionServerStatusStub() {
1883     if (rssStub != null) {
1884       return masterAddressTracker.getMasterAddress();
1885     }
1886     ServerName sn = null;
1887     long previousLogTime = 0;
1888     RegionServerStatusService.BlockingInterface master = null;
1889     boolean refresh = false; // for the first time, use cached data
1890     RegionServerStatusService.BlockingInterface intf = null;
1891     boolean interrupted = false;
1892     try {
1893       while (keepLooping() && master == null) {
1894         sn = this.masterAddressTracker.getMasterAddress(refresh);
1895         if (sn == null) {
1896           if (!keepLooping()) {
1897             // give up with no connection.
1898             LOG.debug("No master found and cluster is stopped; bailing out");
1899             return null;
1900           }
1901           LOG.debug("No master found; retry");
1902           previousLogTime = System.currentTimeMillis();
1903           refresh = true; // let's try pull it from ZK directly
1904           sleeper.sleep();
1905           continue;
1906         }
1907 
1908         // If we are on the active master, use the shortcut
1909         if (this instanceof HMaster && sn.equals(getServerName())) {
1910           intf = ((HMaster)this).getMasterRpcServices();
1911           break;
1912         }
1913         try {
1914           BlockingRpcChannel channel =
1915             this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), operationTimeout);
1916           intf = RegionServerStatusService.newBlockingStub(channel);
1917           break;
1918         } catch (IOException e) {
1919           e = e instanceof RemoteException ?
1920             ((RemoteException)e).unwrapRemoteException() : e;
1921           if (e instanceof ServerNotRunningYetException) {
1922             if (System.currentTimeMillis() > (previousLogTime+1000)){
1923               LOG.info("Master isn't available yet, retrying");
1924               previousLogTime = System.currentTimeMillis();
1925             }
1926           } else {
1927             if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1928               LOG.warn("Unable to connect to master. Retrying. Error was:", e);
1929               previousLogTime = System.currentTimeMillis();
1930             }
1931           }
1932           try {
1933             Thread.sleep(200);
1934           } catch (InterruptedException ex) {
1935             interrupted = true;
1936             LOG.warn("Interrupted while sleeping");
1937           }
1938         }
1939       }
1940     } finally {
1941       if (interrupted) {
1942         Thread.currentThread().interrupt();
1943       }
1944     }
1945     rssStub = intf;
1946     return sn;
1947   }
1948 
1949   /**
1950    * @return True if we should break loop because cluster is going down or
1951    * this server has been stopped or hdfs has gone bad.
1952    */
1953   private boolean keepLooping() {
1954     return !this.stopped && isClusterUp();
1955   }
1956 
1957   /*
1958    * Let the master know we're here Run initialization using parameters passed
1959    * us by the master.
1960    * @return A Map of key/value configurations we got from the Master else
1961    * null if we failed to register.
1962    * @throws IOException
1963    */
1964   private RegionServerStartupResponse reportForDuty() throws IOException {
1965     ServerName masterServerName = createRegionServerStatusStub();
1966     if (masterServerName == null) return null;
1967     RegionServerStartupResponse result = null;
1968     try {
1969       rpcServices.requestCount.set(0);
1970       LOG.info("reportForDuty to master=" + masterServerName + " with port="
1971         + rpcServices.isa.getPort() + ", startcode=" + this.startcode);
1972       long now = EnvironmentEdgeManager.currentTimeMillis();
1973       int port = rpcServices.isa.getPort();
1974       RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
1975       request.setPort(port);
1976       request.setServerStartCode(this.startcode);
1977       request.setServerCurrentTime(now);
1978       result = this.rssStub.regionServerStartup(null, request.build());
1979     } catch (ServiceException se) {
1980       IOException ioe = ProtobufUtil.getRemoteException(se);
1981       if (ioe instanceof ClockOutOfSyncException) {
1982         LOG.fatal("Master rejected startup because clock is out of sync", ioe);
1983         // Re-throw IOE will cause RS to abort
1984         throw ioe;
1985       } else if (ioe instanceof ServerNotRunningYetException) {
1986         LOG.debug("Master is not running yet");
1987       } else {
1988         LOG.warn("error telling master we are up", se);
1989       }
1990     }
1991     return result;
1992   }
1993 
1994   @Override
1995   public long getLastSequenceId(byte[] region) {
1996     Long lastFlushedSequenceId = -1l;
1997     try {
1998       GetLastFlushedSequenceIdRequest req = RequestConverter
1999           .buildGetLastFlushedSequenceIdRequest(region);
2000       lastFlushedSequenceId = rssStub.getLastFlushedSequenceId(null, req)
2001           .getLastFlushedSequenceId();
2002     } catch (ServiceException e) {
2003       lastFlushedSequenceId = -1l;
2004       LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id", e);
2005     }
2006     return lastFlushedSequenceId;
2007   }
2008 
2009   /**
2010    * Closes all regions.  Called on our way out.
2011    * Assumes that its not possible for new regions to be added to onlineRegions
2012    * while this method runs.
2013    */
2014   protected void closeAllRegions(final boolean abort) {
2015     closeUserRegions(abort);
2016     closeMetaTableRegions(abort);
2017   }
2018 
2019   /**
2020    * Close meta region if we carry it
2021    * @param abort Whether we're running an abort.
2022    */
2023   void closeMetaTableRegions(final boolean abort) {
2024     HRegion meta = null;
2025     this.lock.writeLock().lock();
2026     try {
2027       for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) {
2028         HRegionInfo hri = e.getValue().getRegionInfo();
2029         if (hri.isMetaRegion()) {
2030           meta = e.getValue();
2031         }
2032         if (meta != null) break;
2033       }
2034     } finally {
2035       this.lock.writeLock().unlock();
2036     }
2037     if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2038   }
2039 
2040   /**
2041    * Schedule closes on all user regions.
2042    * Should be safe calling multiple times because it wont' close regions
2043    * that are already closed or that are closing.
2044    * @param abort Whether we're running an abort.
2045    */
2046   void closeUserRegions(final boolean abort) {
2047     this.lock.writeLock().lock();
2048     try {
2049       for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
2050         HRegion r = e.getValue();
2051         if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) {
2052           // Don't update zk with this close transition; pass false.
2053           closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2054         }
2055       }
2056     } finally {
2057       this.lock.writeLock().unlock();
2058     }
2059   }
2060 
2061   /** @return the info server */
2062   public InfoServer getInfoServer() {
2063     return infoServer;
2064   }
2065 
2066   /**
2067    * @return true if a stop has been requested.
2068    */
2069   @Override
2070   public boolean isStopped() {
2071     return this.stopped;
2072   }
2073 
2074   @Override
2075   public boolean isStopping() {
2076     return this.stopping;
2077   }
2078 
2079   @Override
2080   public Map<String, HRegion> getRecoveringRegions() {
2081     return this.recoveringRegions;
2082   }
2083 
2084   /**
2085    *
2086    * @return the configuration
2087    */
2088   @Override
2089   public Configuration getConfiguration() {
2090     return conf;
2091   }
2092 
2093   /** @return the write lock for the server */
2094   ReentrantReadWriteLock.WriteLock getWriteLock() {
2095     return lock.writeLock();
2096   }
2097 
2098   public int getNumberOfOnlineRegions() {
2099     return this.onlineRegions.size();
2100   }
2101 
2102   boolean isOnlineRegionsEmpty() {
2103     return this.onlineRegions.isEmpty();
2104   }
2105 
2106   /**
2107    * For tests, web ui and metrics.
2108    * This method will only work if HRegionServer is in the same JVM as client;
2109    * HRegion cannot be serialized to cross an rpc.
2110    */
2111   public Collection<HRegion> getOnlineRegionsLocalContext() {
2112     Collection<HRegion> regions = this.onlineRegions.values();
2113     return Collections.unmodifiableCollection(regions);
2114   }
2115 
2116   @Override
2117   public void addToOnlineRegions(HRegion region) {
2118     this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2119   }
2120 
2121   /**
2122    * @return A new Map of online regions sorted by region size with the first entry being the
2123    * biggest.  If two regions are the same size, then the last one found wins; i.e. this method
2124    * may NOT return all regions.
2125    */
2126   SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() {
2127     // we'll sort the regions in reverse
2128     SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
2129         new Comparator<Long>() {
2130           @Override
2131           public int compare(Long a, Long b) {
2132             return -1 * a.compareTo(b);
2133           }
2134         });
2135     // Copy over all regions. Regions are sorted by size with biggest first.
2136     for (HRegion region : this.onlineRegions.values()) {
2137       sortedRegions.put(region.memstoreSize.get(), region);
2138     }
2139     return sortedRegions;
2140   }
2141 
2142   /**
2143    * @return time stamp in millis of when this region server was started
2144    */
2145   public long getStartcode() {
2146     return this.startcode;
2147   }
2148 
2149   /** @return reference to FlushRequester */
2150   @Override
2151   public FlushRequester getFlushRequester() {
2152     return this.cacheFlusher;
2153   }
2154 
2155   /**
2156    * Get the top N most loaded regions this server is serving so we can tell the
2157    * master which regions it can reallocate if we're overloaded. TODO: actually
2158    * calculate which regions are most loaded. (Right now, we're just grabbing
2159    * the first N regions being served regardless of load.)
2160    */
2161   protected HRegionInfo[] getMostLoadedRegions() {
2162     ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2163     for (HRegion r : onlineRegions.values()) {
2164       if (!r.isAvailable()) {
2165         continue;
2166       }
2167       if (regions.size() < numRegionsToReport) {
2168         regions.add(r.getRegionInfo());
2169       } else {
2170         break;
2171       }
2172     }
2173     return regions.toArray(new HRegionInfo[regions.size()]);
2174   }
2175 
2176   @Override
2177   public Leases getLeases() {
2178     return leases;
2179   }
2180 
2181   /**
2182    * @return Return the rootDir.
2183    */
2184   protected Path getRootDir() {
2185     return rootDir;
2186   }
2187 
2188   /**
2189    * @return Return the fs.
2190    */
2191   @Override
2192   public FileSystem getFileSystem() {
2193     return fs;
2194   }
2195 
2196   @Override
2197   public String toString() {
2198     return getServerName().toString();
2199   }
2200 
2201   /**
2202    * Interval at which threads should run
2203    *
2204    * @return the interval
2205    */
2206   public int getThreadWakeFrequency() {
2207     return threadWakeFrequency;
2208   }
2209 
2210   @Override
2211   public ZooKeeperWatcher getZooKeeper() {
2212     return zooKeeper;
2213   }
2214 
2215   @Override
2216   public BaseCoordinatedStateManager getCoordinatedStateManager() {
2217     return csm;
2218   }
2219 
2220   @Override
2221   public ServerName getServerName() {
2222     return serverName;
2223   }
2224 
2225   @Override
2226   public CompactionRequestor getCompactionRequester() {
2227     return this.compactSplitThread;
2228   }
2229 
2230   public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
2231     return this.rsHost;
2232   }
2233 
2234   @Override
2235   public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2236     return this.regionsInTransitionInRS;
2237   }
2238 
2239   @Override
2240   public ExecutorService getExecutorService() {
2241     return service;
2242   }
2243 
2244   //
2245   // Main program and support routines
2246   //
2247 
2248   /**
2249    * Load the replication service objects, if any
2250    */
2251   static private void createNewReplicationInstance(Configuration conf,
2252     HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
2253 
2254     // If replication is not enabled, then return immediately.
2255     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
2256         HConstants.REPLICATION_ENABLE_DEFAULT)) {
2257       return;
2258     }
2259 
2260     // read in the name of the source replication class from the config file.
2261     String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2262                                HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2263 
2264     // read in the name of the sink replication class from the config file.
2265     String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2266                              HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2267 
2268     // If both the sink and the source class names are the same, then instantiate
2269     // only one object.
2270     if (sourceClassname.equals(sinkClassname)) {
2271       server.replicationSourceHandler = (ReplicationSourceService)
2272                                          newReplicationInstance(sourceClassname,
2273                                          conf, server, fs, logDir, oldLogDir);
2274       server.replicationSinkHandler = (ReplicationSinkService)
2275                                          server.replicationSourceHandler;
2276     } else {
2277       server.replicationSourceHandler = (ReplicationSourceService)
2278                                          newReplicationInstance(sourceClassname,
2279                                          conf, server, fs, logDir, oldLogDir);
2280       server.replicationSinkHandler = (ReplicationSinkService)
2281                                          newReplicationInstance(sinkClassname,
2282                                          conf, server, fs, logDir, oldLogDir);
2283     }
2284   }
2285 
2286   static private ReplicationService newReplicationInstance(String classname,
2287     Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
2288     Path oldLogDir) throws IOException{
2289 
2290     Class<?> clazz = null;
2291     try {
2292       ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2293       clazz = Class.forName(classname, true, classLoader);
2294     } catch (java.lang.ClassNotFoundException nfe) {
2295       throw new IOException("Could not find class for " + classname);
2296     }
2297 
2298     // create an instance of the replication object.
2299     ReplicationService service = (ReplicationService)
2300                               ReflectionUtils.newInstance(clazz, conf);
2301     service.initialize(server, fs, logDir, oldLogDir);
2302     return service;
2303   }
2304 
2305   /**
2306    * Utility for constructing an instance of the passed HRegionServer class.
2307    *
2308    * @param regionServerClass
2309    * @param conf2
2310    * @return HRegionServer instance.
2311    */
2312   public static HRegionServer constructRegionServer(
2313       Class<? extends HRegionServer> regionServerClass,
2314       final Configuration conf2, CoordinatedStateManager cp) {
2315     try {
2316       Constructor<? extends HRegionServer> c = regionServerClass
2317           .getConstructor(Configuration.class, CoordinatedStateManager.class);
2318       return c.newInstance(conf2, cp);
2319     } catch (Exception e) {
2320       throw new RuntimeException("Failed construction of " + "Regionserver: "
2321           + regionServerClass.toString(), e);
2322     }
2323   }
2324 
2325   /**
2326    * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2327    */
2328   public static void main(String[] args) throws Exception {
2329 	VersionInfo.logVersion();
2330     Configuration conf = HBaseConfiguration.create();
2331     @SuppressWarnings("unchecked")
2332     Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2333         .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2334 
2335     new HRegionServerCommandLine(regionServerClass).doMain(args);
2336   }
2337 
2338   /**
2339    * Gets the online regions of the specified table.
2340    * This method looks at the in-memory onlineRegions.  It does not go to <code>hbase:meta</code>.
2341    * Only returns <em>online</em> regions.  If a region on this table has been
2342    * closed during a disable, etc., it will not be included in the returned list.
2343    * So, the returned list may not necessarily be ALL regions in this table, its
2344    * all the ONLINE regions in the table.
2345    * @param tableName
2346    * @return Online regions from <code>tableName</code>
2347    */
2348   @Override
2349   public List<HRegion> getOnlineRegions(TableName tableName) {
2350      List<HRegion> tableRegions = new ArrayList<HRegion>();
2351      synchronized (this.onlineRegions) {
2352        for (HRegion region: this.onlineRegions.values()) {
2353          HRegionInfo regionInfo = region.getRegionInfo();
2354          if(regionInfo.getTable().equals(tableName)) {
2355            tableRegions.add(region);
2356          }
2357        }
2358      }
2359      return tableRegions;
2360    }
2361 
2362   // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
2363   public String[] getRegionServerCoprocessors() {
2364     TreeSet<String> coprocessors = new TreeSet<String>(
2365         this.hlog.getCoprocessorHost().getCoprocessors());
2366     Collection<HRegion> regions = getOnlineRegionsLocalContext();
2367     for (HRegion region: regions) {
2368       coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2369     }
2370     return coprocessors.toArray(new String[coprocessors.size()]);
2371   }
2372 
2373   /**
2374    * Try to close the region, logs a warning on failure but continues.
2375    * @param region Region to close
2376    */
2377   private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
2378     try {
2379       CloseRegionCoordination.CloseRegionDetails details =
2380         csm.getCloseRegionCoordination().getDetaultDetails();
2381       if (!closeRegion(region.getEncodedName(), abort, details, null)) {
2382         LOG.warn("Failed to close " + region.getRegionNameAsString() +
2383             " - ignoring and continuing");
2384       }
2385     } catch (IOException e) {
2386       LOG.warn("Failed to close " + region.getRegionNameAsString() +
2387           " - ignoring and continuing", e);
2388     }
2389   }
2390 
2391   /**
2392    * Close asynchronously a region, can be called from the master or internally by the regionserver
2393    * when stopping. If called from the master, the region will update the znode status.
2394    *
2395    * <p>
2396    * If an opening was in progress, this method will cancel it, but will not start a new close. The
2397    * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
2398    * </p>
2399 
2400    * <p>
2401    *   If a close was in progress, this new request will be ignored, and an exception thrown.
2402    * </p>
2403    *
2404    * @param encodedName Region to close
2405    * @param abort True if we are aborting
2406    * @param crd details about closing region coordination-coordinated task
2407    * @return True if closed a region.
2408    * @throws NotServingRegionException if the region is not online
2409    * @throws RegionAlreadyInTransitionException if the region is already closing
2410    */
2411   protected boolean closeRegion(String encodedName, final boolean abort,
2412       CloseRegionCoordination.CloseRegionDetails crd, final ServerName sn)
2413       throws NotServingRegionException, RegionAlreadyInTransitionException {
2414     //Check for permissions to close.
2415     HRegion actualRegion = this.getFromOnlineRegions(encodedName);
2416     if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2417       try {
2418         actualRegion.getCoprocessorHost().preClose(false);
2419       } catch (IOException exp) {
2420         LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2421         return false;
2422       }
2423     }
2424 
2425     final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(encodedName.getBytes(),
2426         Boolean.FALSE);
2427 
2428     if (Boolean.TRUE.equals(previous)) {
2429       LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
2430           "trying to OPEN. Cancelling OPENING.");
2431       if (!regionsInTransitionInRS.replace(encodedName.getBytes(), previous, Boolean.FALSE)){
2432         // The replace failed. That should be an exceptional case, but theoretically it can happen.
2433         // We're going to try to do a standard close then.
2434         LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
2435             " Doing a standard close now");
2436         return closeRegion(encodedName, abort, crd, sn);
2437       }
2438       // Let's get the region from the online region list again
2439       actualRegion = this.getFromOnlineRegions(encodedName);
2440       if (actualRegion == null) { // If already online, we still need to close it.
2441         LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
2442         // The master deletes the znode when it receives this exception.
2443         throw new NotServingRegionException("The region " + encodedName +
2444           " was opening but not yet served. Opening is cancelled.");
2445       }
2446     } else if (Boolean.FALSE.equals(previous)) {
2447       LOG.info("Received CLOSE for the region: " + encodedName +
2448         " ,which we are already trying to CLOSE, but not completed yet");
2449       // The master will retry till the region is closed. We need to do this since
2450       // the region could fail to close somehow. If we mark the region closed in master
2451       // while it is not, there could be data loss.
2452       // If the region stuck in closing for a while, and master runs out of retries,
2453       // master will move the region to failed_to_close. Later on, if the region
2454       // is indeed closed, master can properly re-assign it.
2455       throw new RegionAlreadyInTransitionException("The region " + encodedName +
2456         " was already closing. New CLOSE request is ignored.");
2457     }
2458 
2459     if (actualRegion == null) {
2460       LOG.error("Received CLOSE for a region which is not online, and we're not opening.");
2461       this.regionsInTransitionInRS.remove(encodedName.getBytes());
2462       // The master deletes the znode when it receives this exception.
2463       throw new NotServingRegionException("The region " + encodedName +
2464           " is not online, and is not opening.");
2465     }
2466 
2467     CloseRegionHandler crh;
2468     final HRegionInfo hri = actualRegion.getRegionInfo();
2469     if (hri.isMetaRegion()) {
2470       crh = new CloseMetaHandler(this, this, hri, abort,
2471         csm.getCloseRegionCoordination(), crd);
2472     } else {
2473       crh = new CloseRegionHandler(this, this, hri, abort,
2474         csm.getCloseRegionCoordination(), crd, sn);
2475     }
2476     this.service.submit(crh);
2477     return true;
2478   }
2479 
2480    /**
2481    * @param regionName
2482    * @return HRegion for the passed binary <code>regionName</code> or null if
2483    *         named region is not member of the online regions.
2484    */
2485   public HRegion getOnlineRegion(final byte[] regionName) {
2486     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2487     return this.onlineRegions.get(encodedRegionName);
2488   }
2489 
2490   public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) {
2491     return this.regionFavoredNodesMap.get(encodedRegionName);
2492   }
2493 
2494   @Override
2495   public HRegion getFromOnlineRegions(final String encodedRegionName) {
2496     return this.onlineRegions.get(encodedRegionName);
2497   }
2498 
2499 
2500   @Override
2501   public boolean removeFromOnlineRegions(final HRegion r, ServerName destination) {
2502     HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
2503 
2504     if (destination != null) {
2505       HLog wal = getWAL();
2506       long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes());
2507       if (closeSeqNum == HConstants.NO_SEQNUM) {
2508         // No edits in WAL for this region; get the sequence number when the region was opened.
2509         closeSeqNum = r.getOpenSeqNum();
2510         if (closeSeqNum == HConstants.NO_SEQNUM) {
2511           closeSeqNum = 0;
2512         }
2513       }
2514       addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
2515     }
2516     this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
2517     return toReturn != null;
2518   }
2519 
2520   /**
2521    * Protected utility method for safely obtaining an HRegion handle.
2522    *
2523    * @param regionName
2524    *          Name of online {@link HRegion} to return
2525    * @return {@link HRegion} for <code>regionName</code>
2526    * @throws NotServingRegionException
2527    */
2528   protected HRegion getRegion(final byte[] regionName)
2529       throws NotServingRegionException {
2530     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2531     return getRegionByEncodedName(regionName, encodedRegionName);
2532   }
2533 
2534   protected HRegion getRegionByEncodedName(String encodedRegionName)
2535       throws NotServingRegionException {
2536     return getRegionByEncodedName(null, encodedRegionName);
2537   }
2538 
2539   protected HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
2540     throws NotServingRegionException {
2541     HRegion region = this.onlineRegions.get(encodedRegionName);
2542     if (region == null) {
2543       MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
2544       if (moveInfo != null) {
2545         throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
2546       }
2547       Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
2548       String regionNameStr = regionName == null?
2549         encodedRegionName: Bytes.toStringBinary(regionName);
2550       if (isOpening != null && isOpening.booleanValue()) {
2551         throw new RegionOpeningException("Region " + regionNameStr +
2552           " is opening on " + this.serverName);
2553       }
2554       throw new NotServingRegionException("Region " + regionNameStr +
2555         " is not online on " + this.serverName);
2556     }
2557     return region;
2558   }
2559 
2560   /*
2561    * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
2562    * IOE if it isn't already.
2563    *
2564    * @param t Throwable
2565    *
2566    * @param msg Message to log in error. Can be null.
2567    *
2568    * @return Throwable converted to an IOE; methods can only let out IOEs.
2569    */
2570   private Throwable cleanup(final Throwable t, final String msg) {
2571     // Don't log as error if NSRE; NSRE is 'normal' operation.
2572     if (t instanceof NotServingRegionException) {
2573       LOG.debug("NotServingRegionException; " + t.getMessage());
2574       return t;
2575     }
2576     if (msg == null) {
2577       LOG.error("", RemoteExceptionHandler.checkThrowable(t));
2578     } else {
2579       LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
2580     }
2581     if (!rpcServices.checkOOME(t)) {
2582       checkFileSystem();
2583     }
2584     return t;
2585   }
2586 
2587   /*
2588    * @param t
2589    *
2590    * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
2591    *
2592    * @return Make <code>t</code> an IOE if it isn't already.
2593    */
2594   protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
2595     return (t instanceof IOException ? (IOException) t : msg == null
2596         || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
2597   }
2598 
2599   /**
2600    * Checks to see if the file system is still accessible. If not, sets
2601    * abortRequested and stopRequested
2602    *
2603    * @return false if file system is not available
2604    */
2605   public boolean checkFileSystem() {
2606     if (this.fsOk && this.fs != null) {
2607       try {
2608         FSUtils.checkFileSystemAvailable(this.fs);
2609       } catch (IOException e) {
2610         abort("File System not available", e);
2611         this.fsOk = false;
2612       }
2613     }
2614     return this.fsOk;
2615   }
2616 
2617   @Override
2618   public void updateRegionFavoredNodesMapping(String encodedRegionName,
2619       List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
2620     InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
2621     // Refer to the comment on the declaration of regionFavoredNodesMap on why
2622     // it is a map of region name to InetSocketAddress[]
2623     for (int i = 0; i < favoredNodes.size(); i++) {
2624       addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
2625           favoredNodes.get(i).getPort());
2626     }
2627     regionFavoredNodesMap.put(encodedRegionName, addr);
2628   }
2629 
2630   /**
2631    * Return the favored nodes for a region given its encoded name. Look at the
2632    * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[]
2633    * @param encodedRegionName
2634    * @return array of favored locations
2635    */
2636   @Override
2637   public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
2638     return regionFavoredNodesMap.get(encodedRegionName);
2639   }
2640 
2641   @Override
2642   public ServerNonceManager getNonceManager() {
2643     return this.nonceManager;
2644   }
2645 
2646   private static class MovedRegionInfo {
2647     private final ServerName serverName;
2648     private final long seqNum;
2649     private final long ts;
2650 
2651     public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
2652       this.serverName = serverName;
2653       this.seqNum = closeSeqNum;
2654       ts = EnvironmentEdgeManager.currentTimeMillis();
2655      }
2656 
2657     public ServerName getServerName() {
2658       return serverName;
2659     }
2660 
2661     public long getSeqNum() {
2662       return seqNum;
2663     }
2664 
2665     public long getMoveTime() {
2666       return ts;
2667     }
2668   }
2669 
2670   // This map will contains all the regions that we closed for a move.
2671   //  We add the time it was moved as we don't want to keep too old information
2672   protected Map<String, MovedRegionInfo> movedRegions =
2673       new ConcurrentHashMap<String, MovedRegionInfo>(3000);
2674 
2675   // We need a timeout. If not there is a risk of giving a wrong information: this would double
2676   //  the number of network calls instead of reducing them.
2677   private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
2678 
2679   protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
2680     if (ServerName.isSameHostnameAndPort(destination, this.getServerName())) {
2681       LOG.warn("Not adding moved region record: " + encodedName + " to self.");
2682       return;
2683     }
2684     LOG.info("Adding moved region record: "
2685       + encodedName + " to " + destination + " as of " + closeSeqNum);
2686     movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
2687   }
2688 
2689   void removeFromMovedRegions(String encodedName) {
2690     movedRegions.remove(encodedName);
2691   }
2692 
2693   private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
2694     MovedRegionInfo dest = movedRegions.get(encodedRegionName);
2695 
2696     long now = EnvironmentEdgeManager.currentTimeMillis();
2697     if (dest != null) {
2698       if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
2699         return dest;
2700       } else {
2701         movedRegions.remove(encodedRegionName);
2702       }
2703     }
2704 
2705     return null;
2706   }
2707 
2708   /**
2709    * Remove the expired entries from the moved regions list.
2710    */
2711   protected void cleanMovedRegions() {
2712     final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
2713     Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();
2714 
2715     while (it.hasNext()){
2716       Map.Entry<String, MovedRegionInfo> e = it.next();
2717       if (e.getValue().getMoveTime() < cutOff) {
2718         it.remove();
2719       }
2720     }
2721   }
2722 
2723   /**
2724    * Creates a Chore thread to clean the moved region cache.
2725    */
2726   protected static class MovedRegionsCleaner extends Chore implements Stoppable {
2727     private HRegionServer regionServer;
2728     Stoppable stoppable;
2729 
2730     private MovedRegionsCleaner(
2731       HRegionServer regionServer, Stoppable stoppable){
2732       super("MovedRegionsCleaner for region "+regionServer, TIMEOUT_REGION_MOVED, stoppable);
2733       this.regionServer = regionServer;
2734       this.stoppable = stoppable;
2735     }
2736 
2737     static MovedRegionsCleaner createAndStart(HRegionServer rs){
2738       Stoppable stoppable = new Stoppable() {
2739         private volatile boolean isStopped = false;
2740         @Override public void stop(String why) { isStopped = true;}
2741         @Override public boolean isStopped() {return isStopped;}
2742       };
2743 
2744       return new MovedRegionsCleaner(rs, stoppable);
2745     }
2746 
2747     @Override
2748     protected void chore() {
2749       regionServer.cleanMovedRegions();
2750     }
2751 
2752     @Override
2753     public void stop(String why) {
2754       stoppable.stop(why);
2755     }
2756 
2757     @Override
2758     public boolean isStopped() {
2759       return stoppable.isStopped();
2760     }
2761   }
2762 
2763   private String getMyEphemeralNodePath() {
2764     return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
2765   }
2766 
2767   private boolean isHealthCheckerConfigured() {
2768     String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
2769     return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
2770   }
2771 
2772   /**
2773    * @return the underlying {@link CompactSplitThread} for the servers
2774    */
2775   public CompactSplitThread getCompactSplitThread() {
2776     return this.compactSplitThread;
2777   }
2778 
2779   /**
2780    * A helper function to store the last flushed sequence Id with the previous failed RS for a
2781    * recovering region. The Id is used to skip wal edits which are flushed. Since the flushed
2782    * sequence id is only valid for each RS, we associate the Id with corresponding failed RS.
2783    * @throws KeeperException
2784    * @throws IOException
2785    */
2786   private void updateRecoveringRegionLastFlushedSequenceId(HRegion r) throws KeeperException,
2787       IOException {
2788     if (!r.isRecovering()) {
2789       // return immdiately for non-recovering regions
2790       return;
2791     }
2792 
2793     HRegionInfo region = r.getRegionInfo();
2794     ZooKeeperWatcher zkw = getZooKeeper();
2795     String previousRSName = this.getLastFailedRSFromZK(region.getEncodedName());
2796     Map<byte[], Long> maxSeqIdInStores = r.getMaxStoreSeqIdForLogReplay();
2797     long minSeqIdForLogReplay = -1;
2798     for (Long storeSeqIdForReplay : maxSeqIdInStores.values()) {
2799       if (minSeqIdForLogReplay == -1 || storeSeqIdForReplay < minSeqIdForLogReplay) {
2800         minSeqIdForLogReplay = storeSeqIdForReplay;
2801       }
2802     }
2803     long lastRecordedFlushedSequenceId = -1;
2804     String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode,
2805       region.getEncodedName());
2806     // recovering-region level
2807     byte[] data;
2808     try {
2809       data = ZKUtil.getData(zkw, nodePath);
2810     } catch (InterruptedException e) {
2811       throw new InterruptedIOException();
2812     }
2813     if (data != null) {
2814       lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
2815     }
2816     if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
2817       ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
2818     }
2819     if (previousRSName != null) {
2820       // one level deeper for the failed RS
2821       nodePath = ZKUtil.joinZNode(nodePath, previousRSName);
2822       ZKUtil.setData(zkw, nodePath,
2823         ZKUtil.regionSequenceIdsToByteArray(minSeqIdForLogReplay, maxSeqIdInStores));
2824       LOG.debug("Update last flushed sequence id of region " + region.getEncodedName() + " for "
2825           + previousRSName);
2826     } else {
2827       LOG.warn("Can't find failed region server for recovering region " + region.getEncodedName());
2828     }
2829   }
2830 
2831   /**
2832    * Return the last failed RS name under /hbase/recovering-regions/encodedRegionName
2833    * @param encodedRegionName
2834    * @throws KeeperException
2835    */
2836   private String getLastFailedRSFromZK(String encodedRegionName) throws KeeperException {
2837     String result = null;
2838     long maxZxid = 0;
2839     ZooKeeperWatcher zkw = this.getZooKeeper();
2840     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
2841     List<String> failedServers = ZKUtil.listChildrenNoWatch(zkw, nodePath);
2842     if (failedServers == null || failedServers.isEmpty()) {
2843       return result;
2844     }
2845     for (String failedServer : failedServers) {
2846       String rsPath = ZKUtil.joinZNode(nodePath, failedServer);
2847       Stat stat = new Stat();
2848       ZKUtil.getDataNoWatch(zkw, rsPath, stat);
2849       if (maxZxid < stat.getCzxid()) {
2850         maxZxid = stat.getCzxid();
2851         result = failedServer;
2852       }
2853     }
2854     return result;
2855   }
2856 
2857   /**
2858    * @return The cache config instance used by the regionserver.
2859    */
2860   public CacheConfig getCacheConfig() {
2861     return this.cacheConfig;
2862   }
2863 }