View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.lang.Thread.UncaughtExceptionHandler;
24  import java.lang.management.ManagementFactory;
25  import java.lang.management.MemoryUsage;
26  import java.lang.reflect.Constructor;
27  import java.net.BindException;
28  import java.net.InetSocketAddress;
29  import java.util.ArrayList;
30  import java.util.Collection;
31  import java.util.Collections;
32  import java.util.Comparator;
33  import java.util.HashMap;
34  import java.util.HashSet;
35  import java.util.Iterator;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.Map.Entry;
39  import java.util.Set;
40  import java.util.SortedMap;
41  import java.util.TreeMap;
42  import java.util.TreeSet;
43  import java.util.concurrent.ConcurrentHashMap;
44  import java.util.concurrent.ConcurrentMap;
45  import java.util.concurrent.ConcurrentSkipListMap;
46  import java.util.concurrent.atomic.AtomicBoolean;
47  import java.util.concurrent.locks.ReentrantReadWriteLock;
48  
49  import javax.management.ObjectName;
50  import javax.servlet.http.HttpServlet;
51  
52  import org.apache.commons.lang.math.RandomUtils;
53  import org.apache.commons.logging.Log;
54  import org.apache.commons.logging.LogFactory;
55  import org.apache.hadoop.classification.InterfaceAudience;
56  import org.apache.hadoop.conf.Configuration;
57  import org.apache.hadoop.fs.FileSystem;
58  import org.apache.hadoop.fs.Path;
59  import org.apache.hadoop.hbase.Chore;
60  import org.apache.hadoop.hbase.ClockOutOfSyncException;
61  import org.apache.hadoop.hbase.CoordinatedStateManager;
62  import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
63  import org.apache.hadoop.hbase.HBaseConfiguration;
64  import org.apache.hadoop.hbase.HConstants;
65  import org.apache.hadoop.hbase.HRegionInfo;
66  import org.apache.hadoop.hbase.HealthCheckChore;
67  import org.apache.hadoop.hbase.MetaTableAccessor;
68  import org.apache.hadoop.hbase.NotServingRegionException;
69  import org.apache.hadoop.hbase.ServerName;
70  import org.apache.hadoop.hbase.Stoppable;
71  import org.apache.hadoop.hbase.TableDescriptors;
72  import org.apache.hadoop.hbase.TableName;
73  import org.apache.hadoop.hbase.YouAreDeadException;
74  import org.apache.hadoop.hbase.ZNodeClearer;
75  import org.apache.hadoop.hbase.client.ConnectionUtils;
76  import org.apache.hadoop.hbase.client.HConnection;
77  import org.apache.hadoop.hbase.client.HConnectionManager;
78  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
79  import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
80  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
81  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
82  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
83  import org.apache.hadoop.hbase.executor.ExecutorService;
84  import org.apache.hadoop.hbase.executor.ExecutorType;
85  import org.apache.hadoop.hbase.fs.HFileSystem;
86  import org.apache.hadoop.hbase.http.InfoServer;
87  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
88  import org.apache.hadoop.hbase.ipc.RpcClient;
89  import org.apache.hadoop.hbase.ipc.RpcServerInterface;
90  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
91  import org.apache.hadoop.hbase.master.HMaster;
92  import org.apache.hadoop.hbase.master.RegionState.State;
93  import org.apache.hadoop.hbase.master.TableLockManager;
94  import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
95  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
96  import org.apache.hadoop.hbase.protobuf.RequestConverter;
97  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
98  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
99  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
100 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
101 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
102 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
103 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
104 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
105 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
106 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
107 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
108 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
109 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
110 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
111 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
112 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
113 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
114 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
115 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
116 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
117 import org.apache.hadoop.hbase.regionserver.wal.HLog;
118 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
119 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
120 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
121 import org.apache.hadoop.hbase.security.UserProvider;
122 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
123 import org.apache.hadoop.hbase.util.ByteStringer;
124 import org.apache.hadoop.hbase.util.Bytes;
125 import org.apache.hadoop.hbase.util.CompressionTest;
126 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
127 import org.apache.hadoop.hbase.util.FSTableDescriptors;
128 import org.apache.hadoop.hbase.util.FSUtils;
129 import org.apache.hadoop.hbase.util.HasThread;
130 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
131 import org.apache.hadoop.hbase.util.Sleeper;
132 import org.apache.hadoop.hbase.util.Threads;
133 import org.apache.hadoop.hbase.util.VersionInfo;
134 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
135 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
136 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
137 import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
138 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
139 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
140 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
141 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
142 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
143 import org.apache.hadoop.ipc.RemoteException;
144 import org.apache.hadoop.metrics.util.MBeanUtil;
145 import org.apache.hadoop.util.ReflectionUtils;
146 import org.apache.hadoop.util.StringUtils;
147 import org.apache.zookeeper.KeeperException;
148 import org.apache.zookeeper.KeeperException.NoNodeException;
149 import org.apache.zookeeper.data.Stat;
150 
151 import com.google.common.annotations.VisibleForTesting;
152 import com.google.common.base.Preconditions;
153 import com.google.protobuf.BlockingRpcChannel;
154 import com.google.protobuf.ServiceException;
155 
156 /**
157  * HRegionServer makes a set of HRegions available to clients. It checks in with
158  * the HMaster. There are many HRegionServers in a single HBase deployment.
159  */
160 @InterfaceAudience.Private
161 @SuppressWarnings("deprecation")
162 public class HRegionServer extends HasThread implements
163     RegionServerServices, LastSequenceId {
164 
165   public static final Log LOG = LogFactory.getLog(HRegionServer.class);
166 
167   /**
168    * For testing only!  Set to true to skip notifying region assignment to master .
169    */
170   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
171   public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
172 
173   /*
174    * Strings to be used in forming the exception message for
175    * RegionsAlreadyInTransitionException.
176    */
177   protected static final String OPEN = "OPEN";
178   protected static final String CLOSE = "CLOSE";
179 
180   //RegionName vs current action in progress
181   //true - if open region action in progress
182   //false - if close region action in progress
183   protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
184     new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
185 
186   // Cache flushing
187   protected MemStoreFlusher cacheFlusher;
188 
189   protected HeapMemoryManager hMemManager;
190 
191   /*
192    * Short-circuit (ie. bypassing RPC layer) HConnection to this Server
193    * to be used internally for miscellaneous needs. Initialized at the server startup
194    * and closed when server shuts down. Clients must never close it explicitly.
195    */
196   protected HConnection shortCircuitConnection;
197 
198   /*
199    * Long-living meta table locator, which is created when the server is started and stopped
200    * when server shuts down. References to this locator shall be used to perform according
201    * operations in EventHandlers. Primary reason for this decision is to make it mockable
202    * for tests.
203    */
204   protected MetaTableLocator metaTableLocator;
205 
206   // Watch if a region is out of recovering state from ZooKeeper
207   @SuppressWarnings("unused")
208   private RecoveringRegionWatcher recoveringRegionWatcher;
209 
210   /**
211    * Go here to get table descriptors.
212    */
213   protected TableDescriptors tableDescriptors;
214 
215   // Replication services. If no replication, this handler will be null.
216   protected ReplicationSourceService replicationSourceHandler;
217   protected ReplicationSinkService replicationSinkHandler;
218 
219   // Compactions
220   public CompactSplitThread compactSplitThread;
221 
222   /**
223    * Map of regions currently being served by this region server. Key is the
224    * encoded region name.  All access should be synchronized.
225    */
226   protected final Map<String, HRegion> onlineRegions =
227     new ConcurrentHashMap<String, HRegion>();
228 
229   /**
230    * Map of encoded region names to the DataNode locations they should be hosted on
231    * We store the value as InetSocketAddress since this is used only in HDFS
232    * API (create() that takes favored nodes as hints for placing file blocks).
233    * We could have used ServerName here as the value class, but we'd need to
234    * convert it to InetSocketAddress at some point before the HDFS API call, and
235    * it seems a bit weird to store ServerName since ServerName refers to RegionServers
236    * and here we really mean DataNode locations.
237    */
238   protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
239       new ConcurrentHashMap<String, InetSocketAddress[]>();
240 
241   /**
242    * Set of regions currently being in recovering state which means it can accept writes(edits from
243    * previous failed region server) but not reads. A recovering region is also an online region.
244    */
245   protected final Map<String, HRegion> recoveringRegions = Collections
246       .synchronizedMap(new HashMap<String, HRegion>());
247 
248   // Leases
249   protected Leases leases;
250 
251   // Instance of the hbase executor service.
252   protected ExecutorService service;
253 
254   // If false, the file system has become unavailable
255   protected volatile boolean fsOk;
256   protected HFileSystem fs;
257 
258   // Set when a report to the master comes back with a message asking us to
259   // shutdown. Also set by call to stop when debugging or running unit tests
260   // of HRegionServer in isolation.
261   private volatile boolean stopped = false;
262 
263   // Go down hard. Used if file system becomes unavailable and also in
264   // debugging and unit tests.
265   private volatile boolean abortRequested;
266 
267   ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
268 
269   // A state before we go into stopped state.  At this stage we're closing user
270   // space regions.
271   private boolean stopping = false;
272 
273   private volatile boolean killed = false;
274 
275   protected final Configuration conf;
276 
277   private Path rootDir;
278 
279   protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
280 
281   final int numRetries;
282   protected final int threadWakeFrequency;
283   protected final int msgInterval;
284 
285   protected final int numRegionsToReport;
286 
287   // Stub to do region server status calls against the master.
288   private volatile RegionServerStatusService.BlockingInterface rssStub;
289   // RPC client. Used to make the stub above that does region server status checking.
290   RpcClient rpcClient;
291 
292   private UncaughtExceptionHandler uncaughtExceptionHandler;
293 
294   // Info server. Default access so can be used by unit tests. REGIONSERVER
295   // is name of the webapp and the attribute name used stuffing this instance
296   // into web context.
297   protected InfoServer infoServer;
298   private JvmPauseMonitor pauseMonitor;
299 
300   /** region server process name */
301   public static final String REGIONSERVER = "regionserver";
302 
303   MetricsRegionServer metricsRegionServer;
304   private SpanReceiverHost spanReceiverHost;
305 
306   /*
307    * Check for compactions requests.
308    */
309   Chore compactionChecker;
310 
311   /*
312    * Check for flushes
313    */
314   Chore periodicFlusher;
315 
316   // HLog and HLog roller. log is protected rather than private to avoid
317   // eclipse warning when accessed by inner classes
318   protected volatile HLog hlog;
319   // The meta updates are written to a different hlog. If this
320   // regionserver holds meta regions, then this field will be non-null.
321   protected volatile HLog hlogForMeta;
322 
323   LogRoller hlogRoller;
324   LogRoller metaHLogRoller;
325 
326   // flag set after we're done setting up server threads
327   protected AtomicBoolean online;
328 
329   // zookeeper connection and watcher
330   protected ZooKeeperWatcher zooKeeper;
331 
332   // master address tracker
333   private MasterAddressTracker masterAddressTracker;
334 
335   // Cluster Status Tracker
336   protected ClusterStatusTracker clusterStatusTracker;
337 
338   // Log Splitting Worker
339   private SplitLogWorker splitLogWorker;
340 
341   // A sleeper that sleeps for msgInterval.
342   protected final Sleeper sleeper;
343 
344   private final int operationTimeout;
345 
346   private final RegionServerAccounting regionServerAccounting;
347 
348   // Cache configuration and block cache reference
349   final CacheConfig cacheConfig;
350 
351   /** The health check chore. */
352   private HealthCheckChore healthCheckChore;
353 
354   /** The nonce manager chore. */
355   private Chore nonceManagerChore;
356 
357   /**
358    * The server name the Master sees us as.  Its made from the hostname the
359    * master passes us, port, and server startcode. Gets set after registration
360    * against  Master.
361    */
362   protected ServerName serverName;
363 
364   /**
365    * This servers startcode.
366    */
367   protected final long startcode;
368 
369   /**
370    * Unique identifier for the cluster we are a part of.
371    */
372   private String clusterId;
373 
374   /**
375    * MX Bean for RegionServerInfo
376    */
377   private ObjectName mxBean = null;
378 
379   /**
380    * Chore to clean periodically the moved region list
381    */
382   private MovedRegionsCleaner movedRegionsCleaner;
383 
384   // chore for refreshing store files for secondary regions
385   private StorefileRefresherChore storefileRefresher;
386 
387   private RegionServerCoprocessorHost rsHost;
388 
389   private RegionServerProcedureManagerHost rspmHost;
390 
391   // Table level lock manager for locking for region operations
392   protected TableLockManager tableLockManager;
393 
394   /**
395    * Nonce manager. Nonces are used to make operations like increment and append idempotent
396    * in the case where client doesn't receive the response from a successful operation and
397    * retries. We track the successful ops for some time via a nonce sent by client and handle
398    * duplicate operations (currently, by failing them; in future we might use MVCC to return
399    * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
400    * HBASE-3787) are:
401    * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
402    *   of past records. If we don't read the records, we don't read and recover the nonces.
403    *   Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
404    * - There's no WAL recovery during normal region move, so nonces will not be transfered.
405    * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
406    * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
407    * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
408    * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
409    * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
410    * latest nonce in it expired. It can also be recovered during move.
411    */
412   final ServerNonceManager nonceManager;
413 
414   private UserProvider userProvider;
415 
416   protected final RSRpcServices rpcServices;
417 
418   protected BaseCoordinatedStateManager csm;
419 
420   /**
421    * Starts a HRegionServer at the default location.
422    * @param conf
423    * @throws IOException
424    * @throws InterruptedException
425    */
426   public HRegionServer(Configuration conf) throws IOException, InterruptedException {
427     this(conf, CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
428   }
429 
430   /**
431    * Starts a HRegionServer at the default location
432    * @param conf
433    * @param csm implementation of CoordinatedStateManager to be used
434    * @throws IOException
435    */
436   public HRegionServer(Configuration conf, CoordinatedStateManager csm)
437       throws IOException {
438     this.fsOk = true;
439     this.conf = conf;
440     checkCodecs(this.conf);
441     this.online = new AtomicBoolean(false);
442     this.userProvider = UserProvider.instantiate(conf);
443     FSUtils.setupShortCircuitRead(this.conf);
444 
445     // Config'ed params
446     this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
447         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
448     this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
449     this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
450 
451     this.sleeper = new Sleeper(this.msgInterval, this);
452 
453     boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
454     this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
455 
456     this.numRegionsToReport = conf.getInt(
457       "hbase.regionserver.numregionstoreport", 10);
458 
459     this.operationTimeout = conf.getInt(
460       HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
461       HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
462 
463     this.abortRequested = false;
464     this.stopped = false;
465 
466     rpcServices = createRpcServices();
467     this.startcode = System.currentTimeMillis();
468     String hostName = rpcServices.isa.getHostName();
469     serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode);
470 
471     // login the zookeeper client principal (if using security)
472     ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
473       "hbase.zookeeper.client.kerberos.principal", hostName);
474     // login the server principal (if using secure Hadoop)
475     login(userProvider, hostName);
476 
477     regionServerAccounting = new RegionServerAccounting();
478     cacheConfig = new CacheConfig(conf);
479     uncaughtExceptionHandler = new UncaughtExceptionHandler() {
480       @Override
481       public void uncaughtException(Thread t, Throwable e) {
482         abort("Uncaught exception in service thread " + t.getName(), e);
483       }
484     };
485 
486     // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
487     // underlying hadoop hdfs accessors will be going against wrong filesystem
488     // (unless all is set to defaults).
489     FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
490     // Get fs instance used by this RS.  Do we use checksum verification in the hbase? If hbase
491     // checksum verification enabled, then automatically switch off hdfs checksum verification.
492     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
493     this.fs = new HFileSystem(this.conf, useHBaseChecksum);
494     this.rootDir = FSUtils.getRootDir(this.conf);
495     this.tableDescriptors = new FSTableDescriptors(
496       this.fs, this.rootDir, !canUpdateTableDescriptor());
497 
498     service = new ExecutorService(getServerName().toShortString());
499     spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
500 
501     // Some unit tests don't need a cluster, so no zookeeper at all
502     if (!conf.getBoolean("hbase.testing.nocluster", false)) {
503       // Open connection to zookeeper and set primary watcher
504       zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
505         rpcServices.isa.getPort(), this, canCreateBaseZNode());
506 
507       this.csm = (BaseCoordinatedStateManager) csm;
508       this.csm.initialize(this);
509       this.csm.start();
510 
511       tableLockManager = TableLockManager.createTableLockManager(
512         conf, zooKeeper, serverName);
513 
514       masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
515       masterAddressTracker.start();
516 
517       clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
518       clusterStatusTracker.start();
519     }
520 
521     rpcServices.start();
522     putUpWebUI();
523   }
524 
525   protected void login(UserProvider user, String host) throws IOException {
526     user.login("hbase.regionserver.keytab.file",
527       "hbase.regionserver.kerberos.principal", host);
528   }
529 
530   protected String getProcessName() {
531     return REGIONSERVER;
532   }
533 
534   protected boolean canCreateBaseZNode() {
535     return false;
536   }
537 
538   protected boolean canUpdateTableDescriptor() {
539     return false;
540   }
541 
542   protected RSRpcServices createRpcServices() throws IOException {
543     return new RSRpcServices(this);
544   }
545 
546   protected void configureInfoServer() {
547     infoServer.addServlet("rs-status", "/rs-status", RSStatusServlet.class);
548     infoServer.setAttribute(REGIONSERVER, this);
549   }
550 
551   protected Class<? extends HttpServlet> getDumpServlet() {
552     return RSDumpServlet.class;
553   }
554 
555   protected void doMetrics() {
556   }
557 
558   /**
559    * Create wrapped short-circuit connection to this server.
560    * In its own method so can intercept and mock it over in tests.
561    * @throws IOException
562    */
563   protected HConnection createShortCircuitConnection() throws IOException {
564     return ConnectionUtils.createShortCircuitHConnection(
565       HConnectionManager.getConnection(conf), serverName, rpcServices, rpcServices);
566   }
567 
568   /**
569    * Run test on configured codecs to make sure supporting libs are in place.
570    * @param c
571    * @throws IOException
572    */
573   private static void checkCodecs(final Configuration c) throws IOException {
574     // check to see if the codec list is available:
575     String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
576     if (codecs == null) return;
577     for (String codec : codecs) {
578       if (!CompressionTest.testCompression(codec)) {
579         throw new IOException("Compression codec " + codec +
580           " not supported, aborting RS construction");
581       }
582     }
583   }
584 
585   public String getClusterId() {
586     return this.clusterId;
587   }
588 
589   /**
590    * All initialization needed before we go register with Master.
591    *
592    * @throws IOException
593    * @throws InterruptedException
594    */
595   private void preRegistrationInitialization(){
596     try {
597       initializeZooKeeper();
598       initializeThreads();
599     } catch (Throwable t) {
600       // Call stop if error or process will stick around for ever since server
601       // puts up non-daemon threads.
602       this.rpcServices.stop();
603       abort("Initialization of RS failed.  Hence aborting RS.", t);
604     }
605   }
606 
607   /**
608    * Bring up connection to zk ensemble and then wait until a master for this
609    * cluster and then after that, wait until cluster 'up' flag has been set.
610    * This is the order in which master does things.
611    * Finally open long-living server short-circuit connection.
612    * @throws IOException
613    * @throws InterruptedException
614    */
615   private void initializeZooKeeper() throws IOException, InterruptedException {
616     // Create the master address tracker, register with zk, and start it.  Then
617     // block until a master is available.  No point in starting up if no master
618     // running.
619     this.masterAddressTracker = new MasterAddressTracker(this.zooKeeper, this);
620     this.masterAddressTracker.start();
621     blockAndCheckIfStopped(this.masterAddressTracker);
622 
623     // Wait on cluster being up.  Master will set this flag up in zookeeper
624     // when ready.
625     blockAndCheckIfStopped(this.clusterStatusTracker);
626 
627     // Retrieve clusterId
628     // Since cluster status is now up
629     // ID should have already been set by HMaster
630     try {
631       clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
632       if (clusterId == null) {
633         this.abort("Cluster ID has not been set");
634       }
635       LOG.info("ClusterId : "+clusterId);
636     } catch (KeeperException e) {
637       this.abort("Failed to retrieve Cluster ID",e);
638     }
639 
640     synchronized (this) {
641       if (shortCircuitConnection == null) {
642         shortCircuitConnection = createShortCircuitConnection();
643         metaTableLocator = new MetaTableLocator();
644       }
645     }
646 
647     // watch for snapshots and other procedures
648     try {
649       rspmHost = new RegionServerProcedureManagerHost();
650       rspmHost.loadProcedures(conf);
651       rspmHost.initialize(this);
652     } catch (KeeperException e) {
653       this.abort("Failed to reach zk cluster when creating procedure handler.", e);
654     }
655     // register watcher for recovering regions
656     this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
657   }
658 
659   /**
660    * Utilty method to wait indefinitely on a znode availability while checking
661    * if the region server is shut down
662    * @param tracker znode tracker to use
663    * @throws IOException any IO exception, plus if the RS is stopped
664    * @throws InterruptedException
665    */
666   private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker)
667       throws IOException, InterruptedException {
668     while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
669       if (this.stopped) {
670         throw new IOException("Received the shutdown message while waiting.");
671       }
672     }
673   }
674 
675   /**
676    * @return False if cluster shutdown in progress
677    */
678   private boolean isClusterUp() {
679     return clusterStatusTracker != null && clusterStatusTracker.isClusterUp();
680   }
681 
682   private void initializeThreads() throws IOException {
683     // Cache flushing thread.
684     this.cacheFlusher = new MemStoreFlusher(conf, this);
685 
686     // Compaction thread
687     this.compactSplitThread = new CompactSplitThread(this);
688 
689     // Background thread to check for compactions; needed if region has not gotten updates
690     // in a while. It will take care of not checking too frequently on store-by-store basis.
691     this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
692     this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
693     // Health checker thread.
694     int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
695       HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
696     if (isHealthCheckerConfigured()) {
697       healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
698     }
699 
700     this.leases = new Leases(this.threadWakeFrequency);
701 
702     // Create the thread to clean the moved regions list
703     movedRegionsCleaner = MovedRegionsCleaner.createAndStart(this);
704 
705     if (this.nonceManager != null) {
706       // Create the chore that cleans up nonces.
707       nonceManagerChore = this.nonceManager.createCleanupChore(this);
708     }
709 
710     // Setup RPC client for master communication
711     rpcClient = new RpcClient(conf, clusterId, new InetSocketAddress(
712       rpcServices.isa.getAddress(), 0));
713     this.pauseMonitor = new JvmPauseMonitor(conf);
714     pauseMonitor.start();
715 
716     int storefileRefreshPeriod = conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
717       , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
718     if (storefileRefreshPeriod > 0) {
719       this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, this, this);
720     }
721   }
722 
723   /**
724    * The HRegionServer sticks in this loop until closed.
725    */
726   @Override
727   public void run() {
728     try {
729       // Do pre-registration initializations; zookeeper, lease threads, etc.
730       preRegistrationInitialization();
731     } catch (Throwable e) {
732       abort("Fatal exception during initialization", e);
733     }
734 
735     try {
736       if (!isStopped() && !isAborted()) {
737         ShutdownHook.install(conf, fs, this, Thread.currentThread());
738         // Set our ephemeral znode up in zookeeper now we have a name.
739         createMyEphemeralNode();
740         // Initialize the RegionServerCoprocessorHost now that our ephemeral
741         // node was created, in case any coprocessors want to use ZooKeeper
742         this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
743       }
744 
745       // Try and register with the Master; tell it we are here.  Break if
746       // server is stopped or the clusterup flag is down or hdfs went wacky.
747       while (keepLooping()) {
748         RegionServerStartupResponse w = reportForDuty();
749         if (w == null) {
750           LOG.warn("reportForDuty failed; sleeping and then retrying.");
751           this.sleeper.sleep();
752         } else {
753           handleReportForDutyResponse(w);
754           break;
755         }
756       }
757 
758       if (!isStopped() && isHealthy()){
759         // start the snapshot handler and other procedure handlers,
760         // since the server is ready to run
761         rspmHost.start();
762       }
763 
764       // We registered with the Master.  Go into run mode.
765       long lastMsg = System.currentTimeMillis();
766       long oldRequestCount = -1;
767       // The main run loop.
768       while (!isStopped() && isHealthy()) {
769         if (!isClusterUp()) {
770           if (isOnlineRegionsEmpty()) {
771             stop("Exiting; cluster shutdown set and not carrying any regions");
772           } else if (!this.stopping) {
773             this.stopping = true;
774             LOG.info("Closing user regions");
775             closeUserRegions(this.abortRequested);
776           } else if (this.stopping) {
777             boolean allUserRegionsOffline = areAllUserRegionsOffline();
778             if (allUserRegionsOffline) {
779               // Set stopped if no more write requests tp meta tables
780               // since last time we went around the loop.  Any open
781               // meta regions will be closed on our way out.
782               if (oldRequestCount == getWriteRequestCount()) {
783                 stop("Stopped; only catalog regions remaining online");
784                 break;
785               }
786               oldRequestCount = getWriteRequestCount();
787             } else {
788               // Make sure all regions have been closed -- some regions may
789               // have not got it because we were splitting at the time of
790               // the call to closeUserRegions.
791               closeUserRegions(this.abortRequested);
792             }
793             LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
794           }
795         }
796         long now = System.currentTimeMillis();
797         if ((now - lastMsg) >= msgInterval) {
798           tryRegionServerReport(lastMsg, now);
799           lastMsg = System.currentTimeMillis();
800           doMetrics();
801         }
802         if (!isStopped() && !isAborted()) {
803           this.sleeper.sleep();
804         }
805       } // for
806     } catch (Throwable t) {
807       if (!rpcServices.checkOOME(t)) {
808         String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
809         abort(prefix + t.getMessage(), t);
810       }
811     }
812     // Run shutdown.
813     if (mxBean != null) {
814       MBeanUtil.unregisterMBean(mxBean);
815       mxBean = null;
816     }
817     if (this.leases != null) this.leases.closeAfterLeasesExpire();
818     if (this.splitLogWorker != null) {
819       splitLogWorker.stop();
820     }
821     if (this.infoServer != null) {
822       LOG.info("Stopping infoServer");
823       try {
824         this.infoServer.stop();
825       } catch (Exception e) {
826         LOG.error("Failed to stop infoServer", e);
827       }
828     }
829     // Send cache a shutdown.
830     if (cacheConfig.isBlockCacheEnabled()) {
831       cacheConfig.getBlockCache().shutdown();
832     }
833 
834     if (movedRegionsCleaner != null) {
835       movedRegionsCleaner.stop("Region Server stopping");
836     }
837 
838     // Send interrupts to wake up threads if sleeping so they notice shutdown.
839     // TODO: Should we check they are alive? If OOME could have exited already
840     if(this.hMemManager != null) this.hMemManager.stop();
841     if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
842     if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
843     if (this.compactionChecker != null)
844       this.compactionChecker.interrupt();
845     if (this.healthCheckChore != null) {
846       this.healthCheckChore.interrupt();
847     }
848     if (this.nonceManagerChore != null) {
849       this.nonceManagerChore.interrupt();
850     }
851     if (this.storefileRefresher != null) {
852       this.storefileRefresher.interrupt();
853     }
854 
855     // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
856     if (rspmHost != null) {
857       rspmHost.stop(this.abortRequested || this.killed);
858     }
859 
860     if (this.killed) {
861       // Just skip out w/o closing regions.  Used when testing.
862     } else if (abortRequested) {
863       if (this.fsOk) {
864         closeUserRegions(abortRequested); // Don't leave any open file handles
865       }
866       LOG.info("aborting server " + this.serverName);
867     } else {
868       closeUserRegions(abortRequested);
869       LOG.info("stopping server " + this.serverName);
870     }
871 
872     // so callers waiting for meta without timeout can stop
873     if (this.metaTableLocator != null) this.metaTableLocator.stop();
874     if (this.shortCircuitConnection != null && !shortCircuitConnection.isClosed()) {
875       try {
876         this.shortCircuitConnection.close();
877       } catch (IOException e) {
878         // Although the {@link Closeable} interface throws an {@link
879         // IOException}, in reality, the implementation would never do that.
880         LOG.error("Attempt to close server's short circuit HConnection failed.", e);
881       }
882     }
883 
884     // Closing the compactSplit thread before closing meta regions
885     if (!this.killed && containsMetaTableRegions()) {
886       if (!abortRequested || this.fsOk) {
887         if (this.compactSplitThread != null) {
888           this.compactSplitThread.join();
889           this.compactSplitThread = null;
890         }
891         closeMetaTableRegions(abortRequested);
892       }
893     }
894 
895     if (!this.killed && this.fsOk) {
896       waitOnAllRegionsToClose(abortRequested);
897       LOG.info("stopping server " + this.serverName +
898         "; all regions closed.");
899     }
900 
901     //fsOk flag may be changed when closing regions throws exception.
902     if (this.fsOk) {
903       closeWAL(!abortRequested);
904     }
905 
906     // Make sure the proxy is down.
907     if (this.rssStub != null) {
908       this.rssStub = null;
909     }
910     if (this.rpcClient != null) {
911       this.rpcClient.stop();
912     }
913     if (this.leases != null) {
914       this.leases.close();
915     }
916     if (this.pauseMonitor != null) {
917       this.pauseMonitor.stop();
918     }
919 
920     if (!killed) {
921       stopServiceThreads();
922     }
923 
924     if (this.rpcServices != null) {
925       this.rpcServices.stop();
926     }
927 
928     try {
929       deleteMyEphemeralNode();
930     } catch (KeeperException e) {
931       LOG.warn("Failed deleting my ephemeral node", e);
932     }
933     // We may have failed to delete the znode at the previous step, but
934     //  we delete the file anyway: a second attempt to delete the znode is likely to fail again.
935     ZNodeClearer.deleteMyEphemeralNodeOnDisk();
936 
937     if (this.zooKeeper != null) {
938       this.zooKeeper.close();
939     }
940     LOG.info("stopping server " + this.serverName +
941       "; zookeeper connection closed.");
942 
943     LOG.info(Thread.currentThread().getName() + " exiting");
944   }
945 
946   private boolean containsMetaTableRegions() {
947     return onlineRegions.containsKey(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
948   }
949 
950   private boolean areAllUserRegionsOffline() {
951     if (getNumberOfOnlineRegions() > 2) return false;
952     boolean allUserRegionsOffline = true;
953     for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
954       if (!e.getValue().getRegionInfo().isMetaTable()) {
955         allUserRegionsOffline = false;
956         break;
957       }
958     }
959     return allUserRegionsOffline;
960   }
961 
962   /**
963    * @return Current write count for all online regions.
964    */
965   private long getWriteRequestCount() {
966     int writeCount = 0;
967     for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
968       writeCount += e.getValue().getWriteRequestsCount();
969     }
970     return writeCount;
971   }
972 
973   @VisibleForTesting
974   protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
975   throws IOException {
976     RegionServerStatusService.BlockingInterface rss = rssStub;
977     if (rss == null) {
978       // the current server could be stopping.
979       return;
980     }
981     ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
982     try {
983       RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
984       ServerName sn = ServerName.parseVersionedServerName(
985         this.serverName.getVersionedBytes());
986       request.setServer(ProtobufUtil.toServerName(sn));
987       request.setLoad(sl);
988       rss.regionServerReport(null, request.build());
989     } catch (ServiceException se) {
990       IOException ioe = ProtobufUtil.getRemoteException(se);
991       if (ioe instanceof YouAreDeadException) {
992         // This will be caught and handled as a fatal error in run()
993         throw ioe;
994       }
995       if (rssStub == rss) {
996         rssStub = null;
997       }
998       // Couldn't connect to the master, get location from zk and reconnect
999       // Method blocks until new master is found or we are stopped
1000       createRegionServerStatusStub();
1001     }
1002   }
1003 
1004   ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) {
1005     // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1006     // per second, and other metrics  As long as metrics are part of ServerLoad it's best to use
1007     // the wrapper to compute those numbers in one place.
1008     // In the long term most of these should be moved off of ServerLoad and the heart beat.
1009     // Instead they should be stored in an HBase table so that external visibility into HBase is
1010     // improved; Additionally the load balancer will be able to take advantage of a more complete
1011     // history.
1012     MetricsRegionServerWrapper regionServerWrapper = this.metricsRegionServer.getRegionServerWrapper();
1013     Collection<HRegion> regions = getOnlineRegionsLocalContext();
1014     MemoryUsage memory =
1015       ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
1016 
1017     ClusterStatusProtos.ServerLoad.Builder serverLoad =
1018       ClusterStatusProtos.ServerLoad.newBuilder();
1019     serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1020     serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
1021     serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024));
1022     serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024));
1023     Set<String> coprocessors = this.hlog.getCoprocessorHost().getCoprocessors();
1024     for (String coprocessor : coprocessors) {
1025       serverLoad.addCoprocessors(
1026         Coprocessor.newBuilder().setName(coprocessor).build());
1027     }
1028     RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1029     RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1030     for (HRegion region : regions) {
1031       serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1032     }
1033     serverLoad.setReportStartTime(reportStartTime);
1034     serverLoad.setReportEndTime(reportEndTime);
1035     if (this.infoServer != null) {
1036       serverLoad.setInfoServerPort(this.infoServer.getPort());
1037     } else {
1038       serverLoad.setInfoServerPort(-1);
1039     }
1040     return serverLoad.build();
1041   }
1042 
1043   String getOnlineRegionsAsPrintableString() {
1044     StringBuilder sb = new StringBuilder();
1045     for (HRegion r: this.onlineRegions.values()) {
1046       if (sb.length() > 0) sb.append(", ");
1047       sb.append(r.getRegionInfo().getEncodedName());
1048     }
1049     return sb.toString();
1050   }
1051 
1052   /**
1053    * Wait on regions close.
1054    */
1055   private void waitOnAllRegionsToClose(final boolean abort) {
1056     // Wait till all regions are closed before going out.
1057     int lastCount = -1;
1058     long previousLogTime = 0;
1059     Set<String> closedRegions = new HashSet<String>();
1060     boolean interrupted = false;
1061     try {
1062       while (!isOnlineRegionsEmpty()) {
1063         int count = getNumberOfOnlineRegions();
1064         // Only print a message if the count of regions has changed.
1065         if (count != lastCount) {
1066           // Log every second at most
1067           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1068             previousLogTime = System.currentTimeMillis();
1069             lastCount = count;
1070             LOG.info("Waiting on " + count + " regions to close");
1071             // Only print out regions still closing if a small number else will
1072             // swamp the log.
1073             if (count < 10 && LOG.isDebugEnabled()) {
1074               LOG.debug(this.onlineRegions);
1075             }
1076           }
1077         }
1078         // Ensure all user regions have been sent a close. Use this to
1079         // protect against the case where an open comes in after we start the
1080         // iterator of onlineRegions to close all user regions.
1081         for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1082           HRegionInfo hri = e.getValue().getRegionInfo();
1083           if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1084               && !closedRegions.contains(hri.getEncodedName())) {
1085             closedRegions.add(hri.getEncodedName());
1086             // Don't update zk with this close transition; pass false.
1087             closeRegionIgnoreErrors(hri, abort);
1088               }
1089         }
1090         // No regions in RIT, we could stop waiting now.
1091         if (this.regionsInTransitionInRS.isEmpty()) {
1092           if (!isOnlineRegionsEmpty()) {
1093             LOG.info("We were exiting though online regions are not empty," +
1094                 " because some regions failed closing");
1095           }
1096           break;
1097         }
1098         try {
1099           Thread.sleep(200);
1100         } catch (InterruptedException e) {
1101           interrupted = true;
1102           LOG.warn("Interrupted while sleeping");
1103         }
1104       }
1105     } finally {
1106       if (interrupted) {
1107         Thread.currentThread().interrupt();
1108       }
1109     }
1110   }
1111 
1112   private void closeWAL(final boolean delete) {
1113     if (this.hlogForMeta != null) {
1114       // All hlogs (meta and non-meta) are in the same directory. Don't call
1115       // closeAndDelete here since that would delete all hlogs not just the
1116       // meta ones. We will just 'close' the hlog for meta here, and leave
1117       // the directory cleanup to the follow-on closeAndDelete call.
1118       try {
1119         this.hlogForMeta.close();
1120       } catch (Throwable e) {
1121         e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1122         LOG.error("Metalog close and delete failed", e);
1123       }
1124     }
1125     if (this.hlog != null) {
1126       try {
1127         if (delete) {
1128           hlog.closeAndDelete();
1129         } else {
1130           hlog.close();
1131         }
1132       } catch (Throwable e) {
1133         e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1134         LOG.error("Close and delete failed", e);
1135       }
1136     }
1137   }
1138 
1139   /*
1140    * Run init. Sets up hlog and starts up all server threads.
1141    *
1142    * @param c Extra configuration.
1143    */
1144   protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1145   throws IOException {
1146     try {
1147       for (NameStringPair e : c.getMapEntriesList()) {
1148         String key = e.getName();
1149         // The hostname the master sees us as.
1150         if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1151           String hostnameFromMasterPOV = e.getValue();
1152           this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1153             rpcServices.isa.getPort(), this.startcode);
1154           if (!hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) {
1155             LOG.info("Master passed us a different hostname to use; was=" +
1156               rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV);
1157           }
1158           continue;
1159         }
1160         String value = e.getValue();
1161         if (LOG.isDebugEnabled()) {
1162           LOG.info("Config from master: " + key + "=" + value);
1163         }
1164         this.conf.set(key, value);
1165       }
1166 
1167       // hack! Maps DFSClient => RegionServer for logs.  HDFS made this
1168       // config param for task trackers, but we can piggyback off of it.
1169       if (this.conf.get("mapreduce.task.attempt.id") == null) {
1170         this.conf.set("mapreduce.task.attempt.id", "hb_rs_" +
1171           this.serverName.toString());
1172       }
1173 
1174       // Save it in a file, this will allow to see if we crash
1175       ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1176 
1177       this.hlog = setupWALAndReplication();
1178       // Init in here rather than in constructor after thread name has been set
1179       this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
1180 
1181       startServiceThreads();
1182       startHeapMemoryManager();
1183       LOG.info("Serving as " + this.serverName +
1184         ", RpcServer on " + rpcServices.isa +
1185         ", sessionid=0x" +
1186         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1187       synchronized (online) {
1188         online.set(true);
1189         online.notifyAll();
1190       }
1191     } catch (Throwable e) {
1192       stop("Failed initialization");
1193       throw convertThrowableToIOE(cleanup(e, "Failed init"),
1194           "Region server startup failed");
1195     } finally {
1196       sleeper.skipSleepCycle();
1197     }
1198   }
1199 
1200   private void startHeapMemoryManager() {
1201     this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this);
1202     if (this.hMemManager != null) {
1203       this.hMemManager.start();
1204     }
1205   }
1206 
1207   private void createMyEphemeralNode() throws KeeperException, IOException {
1208     RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1209     rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1210     byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1211     ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper,
1212       getMyEphemeralNodePath(), data);
1213   }
1214 
1215   private void deleteMyEphemeralNode() throws KeeperException {
1216     ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1217   }
1218 
1219   @Override
1220   public RegionServerAccounting getRegionServerAccounting() {
1221     return regionServerAccounting;
1222   }
1223 
1224   @Override
1225   public TableLockManager getTableLockManager() {
1226     return tableLockManager;
1227   }
1228 
1229   /*
1230    * @param r Region to get RegionLoad for.
1231    * @param regionLoadBldr the RegionLoad.Builder, can be null
1232    * @param regionSpecifier the RegionSpecifier.Builder, can be null
1233    * @return RegionLoad instance.
1234    *
1235    * @throws IOException
1236    */
1237   private RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1238       RegionSpecifier.Builder regionSpecifier) {
1239     byte[] name = r.getRegionName();
1240     int stores = 0;
1241     int storefiles = 0;
1242     int storeUncompressedSizeMB = 0;
1243     int storefileSizeMB = 0;
1244     int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
1245     int storefileIndexSizeMB = 0;
1246     int rootIndexSizeKB = 0;
1247     int totalStaticIndexSizeKB = 0;
1248     int totalStaticBloomSizeKB = 0;
1249     long totalCompactingKVs = 0;
1250     long currentCompactedKVs = 0;
1251     synchronized (r.stores) {
1252       stores += r.stores.size();
1253       for (Store store : r.stores.values()) {
1254         storefiles += store.getStorefilesCount();
1255         storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed()
1256             / 1024 / 1024);
1257         storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1258         storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
1259         CompactionProgress progress = store.getCompactionProgress();
1260         if (progress != null) {
1261           totalCompactingKVs += progress.totalCompactingKVs;
1262           currentCompactedKVs += progress.currentCompactedKVs;
1263         }
1264 
1265         rootIndexSizeKB +=
1266             (int) (store.getStorefilesIndexSize() / 1024);
1267 
1268         totalStaticIndexSizeKB +=
1269           (int) (store.getTotalStaticIndexSize() / 1024);
1270 
1271         totalStaticBloomSizeKB +=
1272           (int) (store.getTotalStaticBloomSize() / 1024);
1273       }
1274     }
1275     if (regionLoadBldr == null) {
1276       regionLoadBldr = RegionLoad.newBuilder();
1277     }
1278     if (regionSpecifier == null) {
1279       regionSpecifier = RegionSpecifier.newBuilder();
1280     }
1281     regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1282     regionSpecifier.setValue(ByteStringer.wrap(name));
1283     regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1284       .setStores(stores)
1285       .setStorefiles(storefiles)
1286       .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1287       .setStorefileSizeMB(storefileSizeMB)
1288       .setMemstoreSizeMB(memstoreSizeMB)
1289       .setStorefileIndexSizeMB(storefileIndexSizeMB)
1290       .setRootIndexSizeKB(rootIndexSizeKB)
1291       .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1292       .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1293       .setReadRequestsCount(r.readRequestsCount.get())
1294       .setWriteRequestsCount(r.writeRequestsCount.get())
1295       .setTotalCompactingKVs(totalCompactingKVs)
1296       .setCurrentCompactedKVs(currentCompactedKVs)
1297       .setCompleteSequenceId(r.lastFlushSeqId);
1298 
1299     return regionLoadBldr.build();
1300   }
1301 
1302   /**
1303    * @param encodedRegionName
1304    * @return An instance of RegionLoad.
1305    */
1306   public RegionLoad createRegionLoad(final String encodedRegionName) {
1307     HRegion r = null;
1308     r = this.onlineRegions.get(encodedRegionName);
1309     return r != null ? createRegionLoad(r, null, null) : null;
1310   }
1311 
1312   /*
1313    * Inner class that runs on a long period checking if regions need compaction.
1314    */
1315   private static class CompactionChecker extends Chore {
1316     private final HRegionServer instance;
1317     private final int majorCompactPriority;
1318     private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1319     private long iteration = 0;
1320 
1321     CompactionChecker(final HRegionServer h, final int sleepTime,
1322         final Stoppable stopper) {
1323       super("CompactionChecker", sleepTime, h);
1324       this.instance = h;
1325       LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));
1326 
1327       /* MajorCompactPriority is configurable.
1328        * If not set, the compaction will use default priority.
1329        */
1330       this.majorCompactPriority = this.instance.conf.
1331         getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1332         DEFAULT_PRIORITY);
1333     }
1334 
1335     @Override
1336     protected void chore() {
1337       for (HRegion r : this.instance.onlineRegions.values()) {
1338         if (r == null)
1339           continue;
1340         for (Store s : r.getStores().values()) {
1341           try {
1342             long multiplier = s.getCompactionCheckMultiplier();
1343             assert multiplier > 0;
1344             if (iteration % multiplier != 0) continue;
1345             if (s.needsCompaction()) {
1346               // Queue a compaction. Will recognize if major is needed.
1347               this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
1348                   + " requests compaction");
1349             } else if (s.isMajorCompaction()) {
1350               if (majorCompactPriority == DEFAULT_PRIORITY
1351                   || majorCompactPriority > r.getCompactPriority()) {
1352                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1353                     + " requests major compaction; use default priority", null);
1354               } else {
1355                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1356                     + " requests major compaction; use configured priority",
1357                   this.majorCompactPriority, null);
1358               }
1359             }
1360           } catch (IOException e) {
1361             LOG.warn("Failed major compaction check on " + r, e);
1362           }
1363         }
1364       }
1365       iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1366     }
1367   }
1368 
1369   class PeriodicMemstoreFlusher extends Chore {
1370     final HRegionServer server;
1371     final static int RANGE_OF_DELAY = 20000; //millisec
1372     final static int MIN_DELAY_TIME = 3000; //millisec
1373     public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1374       super(server.getServerName() + "-MemstoreFlusherChore", cacheFlushInterval, server);
1375       this.server = server;
1376     }
1377 
1378     @Override
1379     protected void chore() {
1380       for (HRegion r : this.server.onlineRegions.values()) {
1381         if (r == null)
1382           continue;
1383         if (r.shouldFlush()) {
1384           FlushRequester requester = server.getFlushRequester();
1385           if (requester != null) {
1386             long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
1387             LOG.info(getName() + " requesting flush for region " + r.getRegionNameAsString() +
1388                 " after a delay of " + randomDelay);
1389             //Throttle the flushes by putting a delay. If we don't throttle, and there
1390             //is a balanced write-load on the regions in a table, we might end up
1391             //overwhelming the filesystem with too many flushes at once.
1392             requester.requestDelayedFlush(r, randomDelay);
1393           }
1394         }
1395       }
1396     }
1397   }
1398 
1399   /**
1400    * Report the status of the server. A server is online once all the startup is
1401    * completed (setting up filesystem, starting service threads, etc.). This
1402    * method is designed mostly to be useful in tests.
1403    *
1404    * @return true if online, false if not.
1405    */
1406   public boolean isOnline() {
1407     return online.get();
1408   }
1409 
1410   /**
1411    * Setup WAL log and replication if enabled.
1412    * Replication setup is done in here because it wants to be hooked up to WAL.
1413    * @return A WAL instance.
1414    * @throws IOException
1415    */
1416   private HLog setupWALAndReplication() throws IOException {
1417     final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1418     final String logName
1419       = HLogUtil.getHLogDirectoryName(this.serverName.toString());
1420 
1421     Path logdir = new Path(rootDir, logName);
1422     if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1423     if (this.fs.exists(logdir)) {
1424       throw new RegionServerRunningException("Region server has already " +
1425         "created directory at " + this.serverName.toString());
1426     }
1427 
1428     // Instantiate replication manager if replication enabled.  Pass it the
1429     // log directories.
1430     createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
1431 
1432     return instantiateHLog(rootDir, logName);
1433   }
1434 
1435   private HLog getMetaWAL() throws IOException {
1436     if (this.hlogForMeta != null) return this.hlogForMeta;
1437     final String logName = HLogUtil.getHLogDirectoryName(this.serverName.toString());
1438     Path logdir = new Path(rootDir, logName);
1439     if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1440     this.hlogForMeta = HLogFactory.createMetaHLog(this.fs.getBackingFs(), rootDir, logName,
1441       this.conf, getMetaWALActionListeners(), this.serverName.toString());
1442     return this.hlogForMeta;
1443   }
1444 
1445   /**
1446    * Called by {@link #setupWALAndReplication()} creating WAL instance.
1447    * @param rootdir
1448    * @param logName
1449    * @return WAL instance.
1450    * @throws IOException
1451    */
1452   protected HLog instantiateHLog(Path rootdir, String logName) throws IOException {
1453     return HLogFactory.createHLog(this.fs.getBackingFs(), rootdir, logName, this.conf,
1454       getWALActionListeners(), this.serverName.toString());
1455   }
1456 
1457   /**
1458    * Called by {@link #instantiateHLog(Path, String)} setting up WAL instance.
1459    * Add any {@link WALActionsListener}s you want inserted before WAL startup.
1460    * @return List of WALActionsListener that will be passed in to
1461    * {@link org.apache.hadoop.hbase.regionserver.wal.FSHLog} on construction.
1462    */
1463   protected List<WALActionsListener> getWALActionListeners() {
1464     List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1465     // Log roller.
1466     this.hlogRoller = new LogRoller(this, this);
1467     listeners.add(this.hlogRoller);
1468     if (this.replicationSourceHandler != null &&
1469         this.replicationSourceHandler.getWALActionsListener() != null) {
1470       // Replication handler is an implementation of WALActionsListener.
1471       listeners.add(this.replicationSourceHandler.getWALActionsListener());
1472     }
1473     return listeners;
1474   }
1475 
1476   protected List<WALActionsListener> getMetaWALActionListeners() {
1477     List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1478     // Using a tmp log roller to ensure metaLogRoller is alive once it is not
1479     // null
1480     MetaLogRoller tmpLogRoller = new MetaLogRoller(this, this);
1481     String n = Thread.currentThread().getName();
1482     Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
1483         n + "-MetaLogRoller", uncaughtExceptionHandler);
1484     this.metaHLogRoller = tmpLogRoller;
1485     tmpLogRoller = null;
1486     listeners.add(this.metaHLogRoller);
1487     return listeners;
1488   }
1489 
1490   protected LogRoller getLogRoller() {
1491     return hlogRoller;
1492   }
1493 
1494   public MetricsRegionServer getRegionServerMetrics() {
1495     return this.metricsRegionServer;
1496   }
1497 
1498   /**
1499    * @return Master address tracker instance.
1500    */
1501   public MasterAddressTracker getMasterAddressTracker() {
1502     return this.masterAddressTracker;
1503   }
1504 
1505   /*
1506    * Start maintenance Threads, Server, Worker and lease checker threads.
1507    * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1508    * get an unhandled exception. We cannot set the handler on all threads.
1509    * Server's internal Listener thread is off limits. For Server, if an OOME, it
1510    * waits a while then retries. Meantime, a flush or a compaction that tries to
1511    * run should trigger same critical condition and the shutdown will run. On
1512    * its way out, this server will shut down Server. Leases are sort of
1513    * inbetween. It has an internal thread that while it inherits from Chore, it
1514    * keeps its own internal stop mechanism so needs to be stopped by this
1515    * hosting server. Worker logs the exception and exits.
1516    */
1517   private void startServiceThreads() throws IOException {
1518     // Start executor services
1519     this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
1520       conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1521     this.service.startExecutorService(ExecutorType.RS_OPEN_META,
1522       conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1523     this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1524       conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1525     this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
1526       conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1527     if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1528       this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
1529         conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
1530     }
1531     this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
1532        "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
1533 
1534     Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), getName() + ".logRoller",
1535         uncaughtExceptionHandler);
1536     this.cacheFlusher.start(uncaughtExceptionHandler);
1537     Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), getName() +
1538       ".compactionChecker", uncaughtExceptionHandler);
1539     Threads.setDaemonThreadRunning(this.periodicFlusher.getThread(), getName() +
1540         ".periodicFlusher", uncaughtExceptionHandler);
1541     if (this.healthCheckChore != null) {
1542       Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), getName() + ".healthChecker",
1543             uncaughtExceptionHandler);
1544     }
1545     if (this.nonceManagerChore != null) {
1546       Threads.setDaemonThreadRunning(this.nonceManagerChore.getThread(), getName() + ".nonceCleaner",
1547             uncaughtExceptionHandler);
1548     }
1549     if (this.storefileRefresher != null) {
1550       Threads.setDaemonThreadRunning(this.storefileRefresher.getThread(), getName() + ".storefileRefresher",
1551             uncaughtExceptionHandler);
1552     }
1553 
1554     // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1555     // an unhandled exception, it will just exit.
1556     this.leases.setName(getName() + ".leaseChecker");
1557     this.leases.start();
1558 
1559     if (this.replicationSourceHandler == this.replicationSinkHandler &&
1560         this.replicationSourceHandler != null) {
1561       this.replicationSourceHandler.startReplicationService();
1562     } else {
1563       if (this.replicationSourceHandler != null) {
1564         this.replicationSourceHandler.startReplicationService();
1565       }
1566       if (this.replicationSinkHandler != null) {
1567         this.replicationSinkHandler.startReplicationService();
1568       }
1569     }
1570 
1571     // Start Server.  This service is like leases in that it internally runs
1572     // a thread.
1573     rpcServices.rpcServer.start();
1574 
1575     // Create the log splitting worker and start it
1576     // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
1577     // quite a while inside HConnection layer. The worker won't be available for other
1578     // tasks even after current task is preempted after a split task times out.
1579     Configuration sinkConf = HBaseConfiguration.create(conf);
1580     sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1581       conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
1582     sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1583       conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
1584     sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
1585     this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this);
1586     splitLogWorker.start();
1587   }
1588 
1589   /**
1590    * Puts up the webui.
1591    * @return Returns final port -- maybe different from what we started with.
1592    * @throws IOException
1593    */
1594   private int putUpWebUI() throws IOException {
1595     int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
1596       HConstants.DEFAULT_REGIONSERVER_INFOPORT);
1597     // -1 is for disabling info server
1598     if (port < 0) return port;
1599     String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
1600     // check if auto port bind enabled
1601     boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
1602         false);
1603     while (true) {
1604       try {
1605         this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
1606         infoServer.addServlet("dump", "/dump", getDumpServlet());
1607         configureInfoServer();
1608         this.infoServer.start();
1609         break;
1610       } catch (BindException e) {
1611         if (!auto) {
1612           // auto bind disabled throw BindException
1613           LOG.error("Failed binding http info server to port: " + port);
1614           throw e;
1615         }
1616         // auto bind enabled, try to use another port
1617         LOG.info("Failed binding http info server to port: " + port);
1618         port++;
1619       }
1620     }
1621     port = this.infoServer.getPort();
1622     conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
1623     int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT,
1624       HConstants.DEFAULT_MASTER_INFOPORT);
1625     conf.setInt("hbase.master.info.port.orig", masterInfoPort);
1626     conf.setInt(HConstants.MASTER_INFO_PORT, port);
1627     return port;
1628   }
1629 
1630   /*
1631    * Verify that server is healthy
1632    */
1633   private boolean isHealthy() {
1634     if (!fsOk) {
1635       // File system problem
1636       return false;
1637     }
1638     // Verify that all threads are alive
1639     if (!(leases.isAlive()
1640         && cacheFlusher.isAlive() && hlogRoller.isAlive()
1641         && this.compactionChecker.isAlive()
1642         && this.periodicFlusher.isAlive())) {
1643       stop("One or more threads are no longer alive -- stop");
1644       return false;
1645     }
1646     if (metaHLogRoller != null && !metaHLogRoller.isAlive()) {
1647       stop("Meta HLog roller thread is no longer alive -- stop");
1648       return false;
1649     }
1650     return true;
1651   }
1652 
1653   public HLog getWAL() {
1654     try {
1655       return getWAL(null);
1656     } catch (IOException e) {
1657       LOG.warn("getWAL threw exception " + e);
1658       return null;
1659     }
1660   }
1661 
1662   @Override
1663   public HLog getWAL(HRegionInfo regionInfo) throws IOException {
1664     //TODO: at some point this should delegate to the HLogFactory
1665     //currently, we don't care about the region as much as we care about the
1666     //table.. (hence checking the tablename below)
1667     //_ROOT_ and hbase:meta regions have separate WAL.
1668     if (regionInfo != null && regionInfo.isMetaTable()) {
1669       return getMetaWAL();
1670     }
1671     return this.hlog;
1672   }
1673 
1674   @Override
1675   public HConnection getShortCircuitConnection() {
1676     return this.shortCircuitConnection;
1677   }
1678 
1679   @Override
1680   public MetaTableLocator getMetaTableLocator() {
1681     return this.metaTableLocator;
1682   }
1683 
1684   @Override
1685   public void stop(final String msg) {
1686     if (!this.stopped) {
1687       try {
1688         if (this.rsHost != null) {
1689           this.rsHost.preStop(msg);
1690         }
1691         this.stopped = true;
1692         LOG.info("STOPPED: " + msg);
1693         // Wakes run() if it is sleeping
1694         sleeper.skipSleepCycle();
1695       } catch (IOException exp) {
1696         LOG.warn("The region server did not stop", exp);
1697       }
1698     }
1699   }
1700 
1701   public void waitForServerOnline(){
1702     while (!isOnline() && !isStopped()){
1703        sleeper.sleep();
1704     }
1705   }
1706 
1707   @Override
1708   public void postOpenDeployTasks(final HRegion r)
1709   throws KeeperException, IOException {
1710     rpcServices.checkOpen();
1711     LOG.info("Post open deploy tasks for " + r.getRegionNameAsString());
1712     // Do checks to see if we need to compact (references or too many files)
1713     for (Store s : r.getStores().values()) {
1714       if (s.hasReferences() || s.needsCompaction()) {
1715        this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
1716       }
1717     }
1718     long openSeqNum = r.getOpenSeqNum();
1719     if (openSeqNum == HConstants.NO_SEQNUM) {
1720       // If we opened a region, we should have read some sequence number from it.
1721       LOG.error("No sequence number found when opening " + r.getRegionNameAsString());
1722       openSeqNum = 0;
1723     }
1724 
1725     // Update flushed sequence id of a recovering region in ZK
1726     updateRecoveringRegionLastFlushedSequenceId(r);
1727 
1728     // Notify master
1729     if (!reportRegionStateTransition(
1730         TransitionCode.OPENED, openSeqNum, r.getRegionInfo())) {
1731       throw new IOException("Failed to report opened region to master: "
1732         + r.getRegionNameAsString());
1733     }
1734 
1735     LOG.debug("Finished post open deploy task for " + r.getRegionNameAsString());
1736   }
1737 
1738   @Override
1739   public boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris) {
1740     return reportRegionStateTransition(code, HConstants.NO_SEQNUM, hris);
1741   }
1742 
1743   @Override
1744   public boolean reportRegionStateTransition(
1745       TransitionCode code, long openSeqNum, HRegionInfo... hris) {
1746     if (TEST_SKIP_REPORTING_TRANSITION) {
1747       // This is for testing only in case there is no master
1748       // to handle the region transition report at all.
1749       if (code == TransitionCode.OPENED) {
1750         Preconditions.checkArgument(hris != null && hris.length == 1);
1751         if (hris[0].isMetaRegion()) {
1752           try {
1753             MetaTableLocator.setMetaLocation(getZooKeeper(), serverName, State.OPEN);
1754           } catch (KeeperException e) {
1755             LOG.info("Failed to update meta location", e);
1756             return false;
1757           }
1758         } else {
1759           try {
1760             MetaTableAccessor.updateRegionLocation(shortCircuitConnection,
1761               hris[0], serverName, openSeqNum);
1762           } catch (IOException e) {
1763             LOG.info("Failed to update meta", e);
1764             return false;
1765           }
1766         }
1767       }
1768       return true;
1769     }
1770 
1771     ReportRegionStateTransitionRequest.Builder builder =
1772       ReportRegionStateTransitionRequest.newBuilder();
1773     builder.setServer(ProtobufUtil.toServerName(serverName));
1774     RegionStateTransition.Builder transition = builder.addTransitionBuilder();
1775     transition.setTransitionCode(code);
1776     if (code == TransitionCode.OPENED && openSeqNum >= 0) {
1777       transition.setOpenSeqNum(openSeqNum);
1778     }
1779     for (HRegionInfo hri: hris) {
1780       transition.addRegionInfo(HRegionInfo.convert(hri));
1781     }
1782     ReportRegionStateTransitionRequest request = builder.build();
1783     while (keepLooping()) {
1784       RegionServerStatusService.BlockingInterface rss = rssStub;
1785       try {
1786         if (rss == null) {
1787           createRegionServerStatusStub();
1788           continue;
1789         }
1790         ReportRegionStateTransitionResponse response =
1791           rss.reportRegionStateTransition(null, request);
1792         if (response.hasErrorMessage()) {
1793           LOG.info("Failed to transition " + hris[0]
1794             + " to " + code + ": " + response.getErrorMessage());
1795           return false;
1796         }
1797         return true;
1798       } catch (ServiceException se) {
1799         IOException ioe = ProtobufUtil.getRemoteException(se);
1800         LOG.info("Failed to report region transition, will retry", ioe);
1801         if (rssStub == rss) {
1802           rssStub = null;
1803         }
1804       }
1805     }
1806     return false;
1807   }
1808 
1809   @Override
1810   public RpcServerInterface getRpcServer() {
1811     return rpcServices.rpcServer;
1812   }
1813 
1814   @VisibleForTesting
1815   public RSRpcServices getRSRpcServices() {
1816     return rpcServices;
1817   }
1818 
1819   /**
1820    * Cause the server to exit without closing the regions it is serving, the log
1821    * it is using and without notifying the master. Used unit testing and on
1822    * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
1823    *
1824    * @param reason
1825    *          the reason we are aborting
1826    * @param cause
1827    *          the exception that caused the abort, or null
1828    */
1829   @Override
1830   public void abort(String reason, Throwable cause) {
1831     String msg = "ABORTING region server " + this + ": " + reason;
1832     if (cause != null) {
1833       LOG.fatal(msg, cause);
1834     } else {
1835       LOG.fatal(msg);
1836     }
1837     this.abortRequested = true;
1838     // HBASE-4014: show list of coprocessors that were loaded to help debug
1839     // regionserver crashes.Note that we're implicitly using
1840     // java.util.HashSet's toString() method to print the coprocessor names.
1841     LOG.fatal("RegionServer abort: loaded coprocessors are: " +
1842         CoprocessorHost.getLoadedCoprocessors());
1843     // Do our best to report our abort to the master, but this may not work
1844     try {
1845       if (cause != null) {
1846         msg += "\nCause:\n" + StringUtils.stringifyException(cause);
1847       }
1848       // Report to the master but only if we have already registered with the master.
1849       if (rssStub != null && this.serverName != null) {
1850         ReportRSFatalErrorRequest.Builder builder =
1851           ReportRSFatalErrorRequest.newBuilder();
1852         ServerName sn =
1853           ServerName.parseVersionedServerName(this.serverName.getVersionedBytes());
1854         builder.setServer(ProtobufUtil.toServerName(sn));
1855         builder.setErrorMessage(msg);
1856         rssStub.reportRSFatalError(null, builder.build());
1857       }
1858     } catch (Throwable t) {
1859       LOG.warn("Unable to report fatal error to master", t);
1860     }
1861     stop(reason);
1862   }
1863 
1864   /**
1865    * @see HRegionServer#abort(String, Throwable)
1866    */
1867   public void abort(String reason) {
1868     abort(reason, null);
1869   }
1870 
1871   @Override
1872   public boolean isAborted() {
1873     return this.abortRequested;
1874   }
1875 
1876   /*
1877    * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
1878    * logs but it does close socket in case want to bring up server on old
1879    * hostname+port immediately.
1880    */
1881   protected void kill() {
1882     this.killed = true;
1883     abort("Simulated kill");
1884   }
1885 
1886   /**
1887    * Wait on all threads to finish. Presumption is that all closes and stops
1888    * have already been called.
1889    */
1890   protected void stopServiceThreads() {
1891     if (this.nonceManagerChore != null) {
1892       Threads.shutdown(this.nonceManagerChore.getThread());
1893     }
1894     if (this.compactionChecker != null) {
1895       Threads.shutdown(this.compactionChecker.getThread());
1896     }
1897     if (this.periodicFlusher != null) {
1898       Threads.shutdown(this.periodicFlusher.getThread());
1899     }
1900     if (this.cacheFlusher != null) {
1901       this.cacheFlusher.join();
1902     }
1903     if (this.healthCheckChore != null) {
1904       Threads.shutdown(this.healthCheckChore.getThread());
1905     }
1906     if (this.spanReceiverHost != null) {
1907       this.spanReceiverHost.closeReceivers();
1908     }
1909     if (this.hlogRoller != null) {
1910       Threads.shutdown(this.hlogRoller.getThread());
1911     }
1912     if (this.metaHLogRoller != null) {
1913       Threads.shutdown(this.metaHLogRoller.getThread());
1914     }
1915     if (this.compactSplitThread != null) {
1916       this.compactSplitThread.join();
1917     }
1918     if (this.service != null) this.service.shutdown();
1919     if (this.replicationSourceHandler != null &&
1920         this.replicationSourceHandler == this.replicationSinkHandler) {
1921       this.replicationSourceHandler.stopReplicationService();
1922     } else {
1923       if (this.replicationSourceHandler != null) {
1924         this.replicationSourceHandler.stopReplicationService();
1925       }
1926       if (this.replicationSinkHandler != null) {
1927         this.replicationSinkHandler.stopReplicationService();
1928       }
1929     }
1930     if (this.storefileRefresher != null) {
1931       Threads.shutdown(this.storefileRefresher.getThread());
1932     }
1933   }
1934 
1935   /**
1936    * @return Return the object that implements the replication
1937    * source service.
1938    */
1939   ReplicationSourceService getReplicationSourceService() {
1940     return replicationSourceHandler;
1941   }
1942 
1943   /**
1944    * @return Return the object that implements the replication
1945    * sink service.
1946    */
1947   ReplicationSinkService getReplicationSinkService() {
1948     return replicationSinkHandler;
1949   }
1950 
1951   /**
1952    * Get the current master from ZooKeeper and open the RPC connection to it.
1953    *
1954    * Method will block until a master is available. You can break from this
1955    * block by requesting the server stop.
1956    *
1957    * @return master + port, or null if server has been stopped
1958    */
1959   private synchronized ServerName createRegionServerStatusStub() {
1960     if (rssStub != null) {
1961       return masterAddressTracker.getMasterAddress();
1962     }
1963     ServerName sn = null;
1964     long previousLogTime = 0;
1965     RegionServerStatusService.BlockingInterface master = null;
1966     boolean refresh = false; // for the first time, use cached data
1967     RegionServerStatusService.BlockingInterface intf = null;
1968     boolean interrupted = false;
1969     try {
1970       while (keepLooping() && master == null) {
1971         sn = this.masterAddressTracker.getMasterAddress(refresh);
1972         if (sn == null) {
1973           if (!keepLooping()) {
1974             // give up with no connection.
1975             LOG.debug("No master found and cluster is stopped; bailing out");
1976             return null;
1977           }
1978           LOG.debug("No master found; retry");
1979           previousLogTime = System.currentTimeMillis();
1980           refresh = true; // let's try pull it from ZK directly
1981           sleeper.sleep();
1982           continue;
1983         }
1984 
1985         // If we are on the active master, use the shortcut
1986         if (this instanceof HMaster && sn.equals(getServerName())) {
1987           intf = ((HMaster)this).getMasterRpcServices();
1988           break;
1989         }
1990         try {
1991           BlockingRpcChannel channel =
1992             this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), operationTimeout);
1993           intf = RegionServerStatusService.newBlockingStub(channel);
1994           break;
1995         } catch (IOException e) {
1996           e = e instanceof RemoteException ?
1997             ((RemoteException)e).unwrapRemoteException() : e;
1998           if (e instanceof ServerNotRunningYetException) {
1999             if (System.currentTimeMillis() > (previousLogTime+1000)){
2000               LOG.info("Master isn't available yet, retrying");
2001               previousLogTime = System.currentTimeMillis();
2002             }
2003           } else {
2004             if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2005               LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2006               previousLogTime = System.currentTimeMillis();
2007             }
2008           }
2009           try {
2010             Thread.sleep(200);
2011           } catch (InterruptedException ex) {
2012             interrupted = true;
2013             LOG.warn("Interrupted while sleeping");
2014           }
2015         }
2016       }
2017     } finally {
2018       if (interrupted) {
2019         Thread.currentThread().interrupt();
2020       }
2021     }
2022     rssStub = intf;
2023     return sn;
2024   }
2025 
2026   /**
2027    * @return True if we should break loop because cluster is going down or
2028    * this server has been stopped or hdfs has gone bad.
2029    */
2030   private boolean keepLooping() {
2031     return !this.stopped && isClusterUp();
2032   }
2033 
2034   /*
2035    * Let the master know we're here Run initialization using parameters passed
2036    * us by the master.
2037    * @return A Map of key/value configurations we got from the Master else
2038    * null if we failed to register.
2039    * @throws IOException
2040    */
2041   private RegionServerStartupResponse reportForDuty() throws IOException {
2042     ServerName masterServerName = createRegionServerStatusStub();
2043     if (masterServerName == null) return null;
2044     RegionServerStartupResponse result = null;
2045     try {
2046       rpcServices.requestCount.set(0);
2047       LOG.info("reportForDuty to master=" + masterServerName + " with port="
2048         + rpcServices.isa.getPort() + ", startcode=" + this.startcode);
2049       long now = EnvironmentEdgeManager.currentTime();
2050       int port = rpcServices.isa.getPort();
2051       RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2052       request.setPort(port);
2053       request.setServerStartCode(this.startcode);
2054       request.setServerCurrentTime(now);
2055       result = this.rssStub.regionServerStartup(null, request.build());
2056     } catch (ServiceException se) {
2057       IOException ioe = ProtobufUtil.getRemoteException(se);
2058       if (ioe instanceof ClockOutOfSyncException) {
2059         LOG.fatal("Master rejected startup because clock is out of sync", ioe);
2060         // Re-throw IOE will cause RS to abort
2061         throw ioe;
2062       } else if (ioe instanceof ServerNotRunningYetException) {
2063         LOG.debug("Master is not running yet");
2064       } else {
2065         LOG.warn("error telling master we are up", se);
2066       }
2067     }
2068     return result;
2069   }
2070 
2071   @Override
2072   public long getLastSequenceId(byte[] region) {
2073     Long lastFlushedSequenceId = -1l;
2074     try {
2075       GetLastFlushedSequenceIdRequest req = RequestConverter
2076           .buildGetLastFlushedSequenceIdRequest(region);
2077       lastFlushedSequenceId = rssStub.getLastFlushedSequenceId(null, req)
2078           .getLastFlushedSequenceId();
2079     } catch (ServiceException e) {
2080       lastFlushedSequenceId = -1l;
2081       LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id", e);
2082     }
2083     return lastFlushedSequenceId;
2084   }
2085 
2086   /**
2087    * Closes all regions.  Called on our way out.
2088    * Assumes that its not possible for new regions to be added to onlineRegions
2089    * while this method runs.
2090    */
2091   protected void closeAllRegions(final boolean abort) {
2092     closeUserRegions(abort);
2093     closeMetaTableRegions(abort);
2094   }
2095 
2096   /**
2097    * Close meta region if we carry it
2098    * @param abort Whether we're running an abort.
2099    */
2100   void closeMetaTableRegions(final boolean abort) {
2101     HRegion meta = null;
2102     this.lock.writeLock().lock();
2103     try {
2104       for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) {
2105         HRegionInfo hri = e.getValue().getRegionInfo();
2106         if (hri.isMetaRegion()) {
2107           meta = e.getValue();
2108         }
2109         if (meta != null) break;
2110       }
2111     } finally {
2112       this.lock.writeLock().unlock();
2113     }
2114     if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2115   }
2116 
2117   /**
2118    * Schedule closes on all user regions.
2119    * Should be safe calling multiple times because it wont' close regions
2120    * that are already closed or that are closing.
2121    * @param abort Whether we're running an abort.
2122    */
2123   void closeUserRegions(final boolean abort) {
2124     this.lock.writeLock().lock();
2125     try {
2126       for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
2127         HRegion r = e.getValue();
2128         if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) {
2129           // Don't update zk with this close transition; pass false.
2130           closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2131         }
2132       }
2133     } finally {
2134       this.lock.writeLock().unlock();
2135     }
2136   }
2137 
2138   /** @return the info server */
2139   public InfoServer getInfoServer() {
2140     return infoServer;
2141   }
2142 
2143   /**
2144    * @return true if a stop has been requested.
2145    */
2146   @Override
2147   public boolean isStopped() {
2148     return this.stopped;
2149   }
2150 
2151   @Override
2152   public boolean isStopping() {
2153     return this.stopping;
2154   }
2155 
2156   @Override
2157   public Map<String, HRegion> getRecoveringRegions() {
2158     return this.recoveringRegions;
2159   }
2160 
2161   /**
2162    *
2163    * @return the configuration
2164    */
2165   @Override
2166   public Configuration getConfiguration() {
2167     return conf;
2168   }
2169 
2170   /** @return the write lock for the server */
2171   ReentrantReadWriteLock.WriteLock getWriteLock() {
2172     return lock.writeLock();
2173   }
2174 
2175   public int getNumberOfOnlineRegions() {
2176     return this.onlineRegions.size();
2177   }
2178 
2179   boolean isOnlineRegionsEmpty() {
2180     return this.onlineRegions.isEmpty();
2181   }
2182 
2183   /**
2184    * For tests, web ui and metrics.
2185    * This method will only work if HRegionServer is in the same JVM as client;
2186    * HRegion cannot be serialized to cross an rpc.
2187    */
2188   public Collection<HRegion> getOnlineRegionsLocalContext() {
2189     Collection<HRegion> regions = this.onlineRegions.values();
2190     return Collections.unmodifiableCollection(regions);
2191   }
2192 
2193   @Override
2194   public void addToOnlineRegions(HRegion region) {
2195     this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2196   }
2197 
2198   /**
2199    * @return A new Map of online regions sorted by region size with the first entry being the
2200    * biggest.  If two regions are the same size, then the last one found wins; i.e. this method
2201    * may NOT return all regions.
2202    */
2203   SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() {
2204     // we'll sort the regions in reverse
2205     SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
2206         new Comparator<Long>() {
2207           @Override
2208           public int compare(Long a, Long b) {
2209             return -1 * a.compareTo(b);
2210           }
2211         });
2212     // Copy over all regions. Regions are sorted by size with biggest first.
2213     for (HRegion region : this.onlineRegions.values()) {
2214       sortedRegions.put(region.memstoreSize.get(), region);
2215     }
2216     return sortedRegions;
2217   }
2218 
2219   /**
2220    * @return time stamp in millis of when this region server was started
2221    */
2222   public long getStartcode() {
2223     return this.startcode;
2224   }
2225 
2226   /** @return reference to FlushRequester */
2227   @Override
2228   public FlushRequester getFlushRequester() {
2229     return this.cacheFlusher;
2230   }
2231 
2232   /**
2233    * Get the top N most loaded regions this server is serving so we can tell the
2234    * master which regions it can reallocate if we're overloaded. TODO: actually
2235    * calculate which regions are most loaded. (Right now, we're just grabbing
2236    * the first N regions being served regardless of load.)
2237    */
2238   protected HRegionInfo[] getMostLoadedRegions() {
2239     ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2240     for (HRegion r : onlineRegions.values()) {
2241       if (!r.isAvailable()) {
2242         continue;
2243       }
2244       if (regions.size() < numRegionsToReport) {
2245         regions.add(r.getRegionInfo());
2246       } else {
2247         break;
2248       }
2249     }
2250     return regions.toArray(new HRegionInfo[regions.size()]);
2251   }
2252 
2253   @Override
2254   public Leases getLeases() {
2255     return leases;
2256   }
2257 
2258   /**
2259    * @return Return the rootDir.
2260    */
2261   protected Path getRootDir() {
2262     return rootDir;
2263   }
2264 
2265   /**
2266    * @return Return the fs.
2267    */
2268   @Override
2269   public FileSystem getFileSystem() {
2270     return fs;
2271   }
2272 
2273   @Override
2274   public String toString() {
2275     return getServerName().toString();
2276   }
2277 
2278   /**
2279    * Interval at which threads should run
2280    *
2281    * @return the interval
2282    */
2283   public int getThreadWakeFrequency() {
2284     return threadWakeFrequency;
2285   }
2286 
2287   @Override
2288   public ZooKeeperWatcher getZooKeeper() {
2289     return zooKeeper;
2290   }
2291 
2292   @Override
2293   public BaseCoordinatedStateManager getCoordinatedStateManager() {
2294     return csm;
2295   }
2296 
2297   @Override
2298   public ServerName getServerName() {
2299     return serverName;
2300   }
2301 
2302   @Override
2303   public CompactionRequestor getCompactionRequester() {
2304     return this.compactSplitThread;
2305   }
2306 
2307   public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
2308     return this.rsHost;
2309   }
2310 
2311   @Override
2312   public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2313     return this.regionsInTransitionInRS;
2314   }
2315 
2316   @Override
2317   public ExecutorService getExecutorService() {
2318     return service;
2319   }
2320 
2321   //
2322   // Main program and support routines
2323   //
2324 
2325   /**
2326    * Load the replication service objects, if any
2327    */
2328   static private void createNewReplicationInstance(Configuration conf,
2329     HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
2330 
2331     // If replication is not enabled, then return immediately.
2332     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
2333         HConstants.REPLICATION_ENABLE_DEFAULT)) {
2334       return;
2335     }
2336 
2337     // read in the name of the source replication class from the config file.
2338     String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2339                                HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2340 
2341     // read in the name of the sink replication class from the config file.
2342     String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2343                              HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2344 
2345     // If both the sink and the source class names are the same, then instantiate
2346     // only one object.
2347     if (sourceClassname.equals(sinkClassname)) {
2348       server.replicationSourceHandler = (ReplicationSourceService)
2349                                          newReplicationInstance(sourceClassname,
2350                                          conf, server, fs, logDir, oldLogDir);
2351       server.replicationSinkHandler = (ReplicationSinkService)
2352                                          server.replicationSourceHandler;
2353     } else {
2354       server.replicationSourceHandler = (ReplicationSourceService)
2355                                          newReplicationInstance(sourceClassname,
2356                                          conf, server, fs, logDir, oldLogDir);
2357       server.replicationSinkHandler = (ReplicationSinkService)
2358                                          newReplicationInstance(sinkClassname,
2359                                          conf, server, fs, logDir, oldLogDir);
2360     }
2361   }
2362 
2363   static private ReplicationService newReplicationInstance(String classname,
2364     Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
2365     Path oldLogDir) throws IOException{
2366 
2367     Class<?> clazz = null;
2368     try {
2369       ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2370       clazz = Class.forName(classname, true, classLoader);
2371     } catch (java.lang.ClassNotFoundException nfe) {
2372       throw new IOException("Could not find class for " + classname);
2373     }
2374 
2375     // create an instance of the replication object.
2376     ReplicationService service = (ReplicationService)
2377                               ReflectionUtils.newInstance(clazz, conf);
2378     service.initialize(server, fs, logDir, oldLogDir);
2379     return service;
2380   }
2381 
2382   /**
2383    * Utility for constructing an instance of the passed HRegionServer class.
2384    *
2385    * @param regionServerClass
2386    * @param conf2
2387    * @return HRegionServer instance.
2388    */
2389   public static HRegionServer constructRegionServer(
2390       Class<? extends HRegionServer> regionServerClass,
2391       final Configuration conf2, CoordinatedStateManager cp) {
2392     try {
2393       Constructor<? extends HRegionServer> c = regionServerClass
2394           .getConstructor(Configuration.class, CoordinatedStateManager.class);
2395       return c.newInstance(conf2, cp);
2396     } catch (Exception e) {
2397       throw new RuntimeException("Failed construction of " + "Regionserver: "
2398           + regionServerClass.toString(), e);
2399     }
2400   }
2401 
2402   /**
2403    * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2404    */
2405   public static void main(String[] args) throws Exception {
2406 	VersionInfo.logVersion();
2407     Configuration conf = HBaseConfiguration.create();
2408     @SuppressWarnings("unchecked")
2409     Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2410         .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2411 
2412     new HRegionServerCommandLine(regionServerClass).doMain(args);
2413   }
2414 
2415   /**
2416    * Gets the online regions of the specified table.
2417    * This method looks at the in-memory onlineRegions.  It does not go to <code>hbase:meta</code>.
2418    * Only returns <em>online</em> regions.  If a region on this table has been
2419    * closed during a disable, etc., it will not be included in the returned list.
2420    * So, the returned list may not necessarily be ALL regions in this table, its
2421    * all the ONLINE regions in the table.
2422    * @param tableName
2423    * @return Online regions from <code>tableName</code>
2424    */
2425   @Override
2426   public List<HRegion> getOnlineRegions(TableName tableName) {
2427      List<HRegion> tableRegions = new ArrayList<HRegion>();
2428      synchronized (this.onlineRegions) {
2429        for (HRegion region: this.onlineRegions.values()) {
2430          HRegionInfo regionInfo = region.getRegionInfo();
2431          if(regionInfo.getTable().equals(tableName)) {
2432            tableRegions.add(region);
2433          }
2434        }
2435      }
2436      return tableRegions;
2437    }
2438 
2439   // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
2440   public String[] getRegionServerCoprocessors() {
2441     TreeSet<String> coprocessors = new TreeSet<String>(
2442         this.hlog.getCoprocessorHost().getCoprocessors());
2443     Collection<HRegion> regions = getOnlineRegionsLocalContext();
2444     for (HRegion region: regions) {
2445       coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2446     }
2447     return coprocessors.toArray(new String[coprocessors.size()]);
2448   }
2449 
2450   /**
2451    * Try to close the region, logs a warning on failure but continues.
2452    * @param region Region to close
2453    */
2454   private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
2455     try {
2456       if (!closeRegion(region.getEncodedName(), abort, null)) {
2457         LOG.warn("Failed to close " + region.getRegionNameAsString() +
2458             " - ignoring and continuing");
2459       }
2460     } catch (IOException e) {
2461       LOG.warn("Failed to close " + region.getRegionNameAsString() +
2462           " - ignoring and continuing", e);
2463     }
2464   }
2465 
2466   /**
2467    * Close asynchronously a region, can be called from the master or internally by the regionserver
2468    * when stopping. If called from the master, the region will update the znode status.
2469    *
2470    * <p>
2471    * If an opening was in progress, this method will cancel it, but will not start a new close. The
2472    * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
2473    * </p>
2474 
2475    * <p>
2476    *   If a close was in progress, this new request will be ignored, and an exception thrown.
2477    * </p>
2478    *
2479    * @param encodedName Region to close
2480    * @param abort True if we are aborting
2481    * @return True if closed a region.
2482    * @throws NotServingRegionException if the region is not online
2483    */
2484   protected boolean closeRegion(String encodedName, final boolean abort, final ServerName sn)
2485       throws NotServingRegionException {
2486     //Check for permissions to close.
2487     HRegion actualRegion = this.getFromOnlineRegions(encodedName);
2488     if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2489       try {
2490         actualRegion.getCoprocessorHost().preClose(false);
2491       } catch (IOException exp) {
2492         LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2493         return false;
2494       }
2495     }
2496 
2497     final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(encodedName.getBytes(),
2498         Boolean.FALSE);
2499 
2500     if (Boolean.TRUE.equals(previous)) {
2501       LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
2502           "trying to OPEN. Cancelling OPENING.");
2503       if (!regionsInTransitionInRS.replace(encodedName.getBytes(), previous, Boolean.FALSE)){
2504         // The replace failed. That should be an exceptional case, but theoretically it can happen.
2505         // We're going to try to do a standard close then.
2506         LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
2507             " Doing a standard close now");
2508         return closeRegion(encodedName, abort, sn);
2509       }
2510       // Let's get the region from the online region list again
2511       actualRegion = this.getFromOnlineRegions(encodedName);
2512       if (actualRegion == null) { // If already online, we still need to close it.
2513         LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
2514         // The master deletes the znode when it receives this exception.
2515         throw new NotServingRegionException("The region " + encodedName +
2516           " was opening but not yet served. Opening is cancelled.");
2517       }
2518     } else if (Boolean.FALSE.equals(previous)) {
2519       LOG.info("Received CLOSE for the region: " + encodedName +
2520         ", which we are already trying to CLOSE, but not completed yet");
2521       return true;
2522     }
2523 
2524     if (actualRegion == null) {
2525       LOG.error("Received CLOSE for a region which is not online, and we're not opening.");
2526       this.regionsInTransitionInRS.remove(encodedName.getBytes());
2527       // The master deletes the znode when it receives this exception.
2528       throw new NotServingRegionException("The region " + encodedName +
2529           " is not online, and is not opening.");
2530     }
2531 
2532     CloseRegionHandler crh;
2533     final HRegionInfo hri = actualRegion.getRegionInfo();
2534     if (hri.isMetaRegion()) {
2535       crh = new CloseMetaHandler(this, this, hri, abort);
2536     } else {
2537       crh = new CloseRegionHandler(this, this, hri, abort, sn);
2538     }
2539     this.service.submit(crh);
2540     return true;
2541   }
2542 
2543    /**
2544    * @param regionName
2545    * @return HRegion for the passed binary <code>regionName</code> or null if
2546    *         named region is not member of the online regions.
2547    */
2548   public HRegion getOnlineRegion(final byte[] regionName) {
2549     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2550     return this.onlineRegions.get(encodedRegionName);
2551   }
2552 
2553   public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) {
2554     return this.regionFavoredNodesMap.get(encodedRegionName);
2555   }
2556 
2557   @Override
2558   public HRegion getFromOnlineRegions(final String encodedRegionName) {
2559     return this.onlineRegions.get(encodedRegionName);
2560   }
2561 
2562 
2563   @Override
2564   public boolean removeFromOnlineRegions(final HRegion r, ServerName destination) {
2565     HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
2566 
2567     if (destination != null) {
2568       HLog wal = getWAL();
2569       long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes());
2570       if (closeSeqNum == HConstants.NO_SEQNUM) {
2571         // No edits in WAL for this region; get the sequence number when the region was opened.
2572         closeSeqNum = r.getOpenSeqNum();
2573         if (closeSeqNum == HConstants.NO_SEQNUM) {
2574           closeSeqNum = 0;
2575         }
2576       }
2577       addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
2578     }
2579     this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
2580     return toReturn != null;
2581   }
2582 
2583   /**
2584    * Protected utility method for safely obtaining an HRegion handle.
2585    *
2586    * @param regionName
2587    *          Name of online {@link HRegion} to return
2588    * @return {@link HRegion} for <code>regionName</code>
2589    * @throws NotServingRegionException
2590    */
2591   protected HRegion getRegion(final byte[] regionName)
2592       throws NotServingRegionException {
2593     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2594     return getRegionByEncodedName(regionName, encodedRegionName);
2595   }
2596 
2597   public HRegion getRegionByEncodedName(String encodedRegionName)
2598       throws NotServingRegionException {
2599     return getRegionByEncodedName(null, encodedRegionName);
2600   }
2601 
2602   protected HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
2603     throws NotServingRegionException {
2604     HRegion region = this.onlineRegions.get(encodedRegionName);
2605     if (region == null) {
2606       MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
2607       if (moveInfo != null) {
2608         throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
2609       }
2610       Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
2611       String regionNameStr = regionName == null?
2612         encodedRegionName: Bytes.toStringBinary(regionName);
2613       if (isOpening != null && isOpening.booleanValue()) {
2614         throw new RegionOpeningException("Region " + regionNameStr +
2615           " is opening on " + this.serverName);
2616       }
2617       throw new NotServingRegionException("Region " + regionNameStr +
2618         " is not online on " + this.serverName);
2619     }
2620     return region;
2621   }
2622 
2623   /*
2624    * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
2625    * IOE if it isn't already.
2626    *
2627    * @param t Throwable
2628    *
2629    * @param msg Message to log in error. Can be null.
2630    *
2631    * @return Throwable converted to an IOE; methods can only let out IOEs.
2632    */
2633   private Throwable cleanup(final Throwable t, final String msg) {
2634     // Don't log as error if NSRE; NSRE is 'normal' operation.
2635     if (t instanceof NotServingRegionException) {
2636       LOG.debug("NotServingRegionException; " + t.getMessage());
2637       return t;
2638     }
2639     Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
2640     if (msg == null) {
2641       LOG.error("", e);
2642     } else {
2643       LOG.error(msg, e);
2644     }
2645     if (!rpcServices.checkOOME(t)) {
2646       checkFileSystem();
2647     }
2648     return t;
2649   }
2650 
2651   /*
2652    * @param t
2653    *
2654    * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
2655    *
2656    * @return Make <code>t</code> an IOE if it isn't already.
2657    */
2658   protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
2659     return (t instanceof IOException ? (IOException) t : msg == null
2660         || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
2661   }
2662 
2663   /**
2664    * Checks to see if the file system is still accessible. If not, sets
2665    * abortRequested and stopRequested
2666    *
2667    * @return false if file system is not available
2668    */
2669   public boolean checkFileSystem() {
2670     if (this.fsOk && this.fs != null) {
2671       try {
2672         FSUtils.checkFileSystemAvailable(this.fs);
2673       } catch (IOException e) {
2674         abort("File System not available", e);
2675         this.fsOk = false;
2676       }
2677     }
2678     return this.fsOk;
2679   }
2680 
2681   @Override
2682   public void updateRegionFavoredNodesMapping(String encodedRegionName,
2683       List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
2684     InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
2685     // Refer to the comment on the declaration of regionFavoredNodesMap on why
2686     // it is a map of region name to InetSocketAddress[]
2687     for (int i = 0; i < favoredNodes.size(); i++) {
2688       addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
2689           favoredNodes.get(i).getPort());
2690     }
2691     regionFavoredNodesMap.put(encodedRegionName, addr);
2692   }
2693 
2694   /**
2695    * Return the favored nodes for a region given its encoded name. Look at the
2696    * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[]
2697    * @param encodedRegionName
2698    * @return array of favored locations
2699    */
2700   @Override
2701   public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
2702     return regionFavoredNodesMap.get(encodedRegionName);
2703   }
2704 
2705   @Override
2706   public ServerNonceManager getNonceManager() {
2707     return this.nonceManager;
2708   }
2709 
2710   private static class MovedRegionInfo {
2711     private final ServerName serverName;
2712     private final long seqNum;
2713     private final long ts;
2714 
2715     public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
2716       this.serverName = serverName;
2717       this.seqNum = closeSeqNum;
2718       ts = EnvironmentEdgeManager.currentTime();
2719      }
2720 
2721     public ServerName getServerName() {
2722       return serverName;
2723     }
2724 
2725     public long getSeqNum() {
2726       return seqNum;
2727     }
2728 
2729     public long getMoveTime() {
2730       return ts;
2731     }
2732   }
2733 
2734   // This map will contains all the regions that we closed for a move.
2735   //  We add the time it was moved as we don't want to keep too old information
2736   protected Map<String, MovedRegionInfo> movedRegions =
2737       new ConcurrentHashMap<String, MovedRegionInfo>(3000);
2738 
2739   // We need a timeout. If not there is a risk of giving a wrong information: this would double
2740   //  the number of network calls instead of reducing them.
2741   private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
2742 
2743   protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
2744     if (ServerName.isSameHostnameAndPort(destination, this.getServerName())) {
2745       LOG.warn("Not adding moved region record: " + encodedName + " to self.");
2746       return;
2747     }
2748     LOG.info("Adding moved region record: "
2749       + encodedName + " to " + destination + " as of " + closeSeqNum);
2750     movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
2751   }
2752 
2753   void removeFromMovedRegions(String encodedName) {
2754     movedRegions.remove(encodedName);
2755   }
2756 
2757   private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
2758     MovedRegionInfo dest = movedRegions.get(encodedRegionName);
2759 
2760     long now = EnvironmentEdgeManager.currentTime();
2761     if (dest != null) {
2762       if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
2763         return dest;
2764       } else {
2765         movedRegions.remove(encodedRegionName);
2766       }
2767     }
2768 
2769     return null;
2770   }
2771 
2772   /**
2773    * Remove the expired entries from the moved regions list.
2774    */
2775   protected void cleanMovedRegions() {
2776     final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
2777     Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();
2778 
2779     while (it.hasNext()){
2780       Map.Entry<String, MovedRegionInfo> e = it.next();
2781       if (e.getValue().getMoveTime() < cutOff) {
2782         it.remove();
2783       }
2784     }
2785   }
2786 
2787   /**
2788    * Creates a Chore thread to clean the moved region cache.
2789    */
2790   protected static class MovedRegionsCleaner extends Chore implements Stoppable {
2791     private HRegionServer regionServer;
2792     Stoppable stoppable;
2793 
2794     private MovedRegionsCleaner(
2795       HRegionServer regionServer, Stoppable stoppable){
2796       super("MovedRegionsCleaner for region "+regionServer, TIMEOUT_REGION_MOVED, stoppable);
2797       this.regionServer = regionServer;
2798       this.stoppable = stoppable;
2799     }
2800 
2801     static MovedRegionsCleaner createAndStart(HRegionServer rs){
2802       Stoppable stoppable = new Stoppable() {
2803         private volatile boolean isStopped = false;
2804         @Override public void stop(String why) { isStopped = true;}
2805         @Override public boolean isStopped() {return isStopped;}
2806       };
2807 
2808       return new MovedRegionsCleaner(rs, stoppable);
2809     }
2810 
2811     @Override
2812     protected void chore() {
2813       regionServer.cleanMovedRegions();
2814     }
2815 
2816     @Override
2817     public void stop(String why) {
2818       stoppable.stop(why);
2819     }
2820 
2821     @Override
2822     public boolean isStopped() {
2823       return stoppable.isStopped();
2824     }
2825   }
2826 
2827   private String getMyEphemeralNodePath() {
2828     return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
2829   }
2830 
2831   private boolean isHealthCheckerConfigured() {
2832     String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
2833     return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
2834   }
2835 
2836   /**
2837    * @return the underlying {@link CompactSplitThread} for the servers
2838    */
2839   public CompactSplitThread getCompactSplitThread() {
2840     return this.compactSplitThread;
2841   }
2842 
2843   /**
2844    * A helper function to store the last flushed sequence Id with the previous failed RS for a
2845    * recovering region. The Id is used to skip wal edits which are flushed. Since the flushed
2846    * sequence id is only valid for each RS, we associate the Id with corresponding failed RS.
2847    * @throws KeeperException
2848    * @throws IOException
2849    */
2850   private void updateRecoveringRegionLastFlushedSequenceId(HRegion r) throws KeeperException,
2851       IOException {
2852     if (!r.isRecovering()) {
2853       // return immdiately for non-recovering regions
2854       return;
2855     }
2856 
2857     HRegionInfo region = r.getRegionInfo();
2858     ZooKeeperWatcher zkw = getZooKeeper();
2859     String previousRSName = this.getLastFailedRSFromZK(region.getEncodedName());
2860     Map<byte[], Long> maxSeqIdInStores = r.getMaxStoreSeqIdForLogReplay();
2861     long minSeqIdForLogReplay = -1;
2862     for (Long storeSeqIdForReplay : maxSeqIdInStores.values()) {
2863       if (minSeqIdForLogReplay == -1 || storeSeqIdForReplay < minSeqIdForLogReplay) {
2864         minSeqIdForLogReplay = storeSeqIdForReplay;
2865       }
2866     }
2867 
2868     try {
2869       long lastRecordedFlushedSequenceId = -1;
2870       String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode,
2871         region.getEncodedName());
2872       // recovering-region level
2873       byte[] data;
2874       try {
2875         data = ZKUtil.getData(zkw, nodePath);
2876       } catch (InterruptedException e) {
2877         throw new InterruptedIOException();
2878       }
2879       if (data != null) {
2880       lastRecordedFlushedSequenceId = ZKSplitLog.parseLastFlushedSequenceIdFrom(data);
2881       }
2882       if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
2883         ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
2884       }
2885       if (previousRSName != null) {
2886         // one level deeper for the failed RS
2887         nodePath = ZKUtil.joinZNode(nodePath, previousRSName);
2888         ZKUtil.setData(zkw, nodePath,
2889           ZKUtil.regionSequenceIdsToByteArray(minSeqIdForLogReplay, maxSeqIdInStores));
2890         LOG.debug("Update last flushed sequence id of region " + region.getEncodedName() + " for "
2891             + previousRSName);
2892       } else {
2893         LOG.warn("Can't find failed region server for recovering region " +
2894           region.getEncodedName());
2895       }
2896     } catch (NoNodeException ignore) {
2897       LOG.debug("Region " + region.getEncodedName() +
2898         " must have completed recovery because its recovery znode has been removed", ignore);
2899     }
2900   }
2901 
2902   /**
2903    * Return the last failed RS name under /hbase/recovering-regions/encodedRegionName
2904    * @param encodedRegionName
2905    * @throws KeeperException
2906    */
2907   private String getLastFailedRSFromZK(String encodedRegionName) throws KeeperException {
2908     String result = null;
2909     long maxZxid = 0;
2910     ZooKeeperWatcher zkw = this.getZooKeeper();
2911     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
2912     List<String> failedServers = ZKUtil.listChildrenNoWatch(zkw, nodePath);
2913     if (failedServers == null || failedServers.isEmpty()) {
2914       return result;
2915     }
2916     for (String failedServer : failedServers) {
2917       String rsPath = ZKUtil.joinZNode(nodePath, failedServer);
2918       Stat stat = new Stat();
2919       ZKUtil.getDataNoWatch(zkw, rsPath, stat);
2920       if (maxZxid < stat.getCzxid()) {
2921         maxZxid = stat.getCzxid();
2922         result = failedServer;
2923       }
2924     }
2925     return result;
2926   }
2927 
2928   /**
2929    * @return The cache config instance used by the regionserver.
2930    */
2931   public CacheConfig getCacheConfig() {
2932     return this.cacheConfig;
2933   }
2934 }