View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.io.StringWriter;
24  import java.lang.Thread.UncaughtExceptionHandler;
25  import java.lang.annotation.Retention;
26  import java.lang.annotation.RetentionPolicy;
27  import java.lang.management.ManagementFactory;
28  import java.lang.management.MemoryUsage;
29  import java.lang.reflect.Constructor;
30  import java.lang.reflect.Method;
31  import java.net.BindException;
32  import java.net.InetSocketAddress;
33  import java.util.ArrayList;
34  import java.util.Collection;
35  import java.util.Collections;
36  import java.util.Comparator;
37  import java.util.HashMap;
38  import java.util.HashSet;
39  import java.util.LinkedList;
40  import java.util.List;
41  import java.util.Map;
42  import java.util.Map.Entry;
43  import java.util.Random;
44  import java.util.Set;
45  import java.util.SortedMap;
46  import java.util.TreeMap;
47  import java.util.TreeSet;
48  import java.util.concurrent.ConcurrentHashMap;
49  import java.util.concurrent.ConcurrentSkipListMap;
50  import java.util.concurrent.atomic.AtomicBoolean;
51  import java.util.concurrent.atomic.AtomicInteger;
52  import java.util.concurrent.locks.ReentrantReadWriteLock;
53  
54  import javax.management.ObjectName;
55  
56  import org.apache.commons.lang.mutable.MutableDouble;
57  import org.apache.commons.logging.Log;
58  import org.apache.commons.logging.LogFactory;
59  import org.apache.hadoop.conf.Configuration;
60  import org.apache.hadoop.fs.FileSystem;
61  import org.apache.hadoop.fs.Path;
62  import org.apache.hadoop.hbase.CallSequenceOutOfOrderException;
63  import org.apache.hadoop.hbase.Chore;
64  import org.apache.hadoop.hbase.ClockOutOfSyncException;
65  import org.apache.hadoop.hbase.DoNotRetryIOException;
66  import org.apache.hadoop.hbase.HBaseConfiguration;
67  import org.apache.hadoop.hbase.HConstants;
68  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
69  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
70  import org.apache.hadoop.hbase.HRegionInfo;
71  import org.apache.hadoop.hbase.HServerAddress;
72  import org.apache.hadoop.hbase.HServerInfo;
73  import org.apache.hadoop.hbase.HServerLoad;
74  import org.apache.hadoop.hbase.HTableDescriptor;
75  import org.apache.hadoop.hbase.HealthCheckChore;
76  import org.apache.hadoop.hbase.KeyValue;
77  import org.apache.hadoop.hbase.MasterAddressTracker;
78  import org.apache.hadoop.hbase.NotServingRegionException;
79  import org.apache.hadoop.hbase.RemoteExceptionHandler;
80  import org.apache.hadoop.hbase.ServerName;
81  import org.apache.hadoop.hbase.Stoppable;
82  import org.apache.hadoop.hbase.TableDescriptors;
83  import org.apache.hadoop.hbase.UnknownRowLockException;
84  import org.apache.hadoop.hbase.UnknownScannerException;
85  import org.apache.hadoop.hbase.YouAreDeadException;
86  import org.apache.hadoop.hbase.catalog.CatalogTracker;
87  import org.apache.hadoop.hbase.catalog.MetaEditor;
88  import org.apache.hadoop.hbase.catalog.MetaReader;
89  import org.apache.hadoop.hbase.catalog.RootLocationEditor;
90  import org.apache.hadoop.hbase.client.Action;
91  import org.apache.hadoop.hbase.client.Append;
92  import org.apache.hadoop.hbase.client.Delete;
93  import org.apache.hadoop.hbase.client.Get;
94  import org.apache.hadoop.hbase.client.HConnectionManager;
95  import org.apache.hadoop.hbase.client.Increment;
96  import org.apache.hadoop.hbase.client.MultiAction;
97  import org.apache.hadoop.hbase.client.MultiResponse;
98  import org.apache.hadoop.hbase.client.Mutation;
99  import org.apache.hadoop.hbase.client.Put;
100 import org.apache.hadoop.hbase.client.Result;
101 import org.apache.hadoop.hbase.client.Row;
102 import org.apache.hadoop.hbase.client.RowLock;
103 import org.apache.hadoop.hbase.client.RowMutations;
104 import org.apache.hadoop.hbase.client.Scan;
105 import org.apache.hadoop.hbase.client.UserProvider;
106 import org.apache.hadoop.hbase.client.coprocessor.Exec;
107 import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
108 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
109 import org.apache.hadoop.hbase.executor.EventHandler.EventType;
110 import org.apache.hadoop.hbase.executor.ExecutorService;
111 import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
112 import org.apache.hadoop.hbase.filter.BinaryComparator;
113 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
114 import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
115 import org.apache.hadoop.hbase.fs.HFileSystem;
116 import org.apache.hadoop.hbase.io.hfile.BlockCache;
117 import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
118 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
119 import org.apache.hadoop.hbase.io.hfile.CacheStats;
120 import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
121 import org.apache.hadoop.hbase.ipc.HBaseRPC;
122 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
123 import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics;
124 import org.apache.hadoop.hbase.ipc.HBaseServer;
125 import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
126 import org.apache.hadoop.hbase.ipc.HRegionInterface;
127 import org.apache.hadoop.hbase.ipc.Invocation;
128 import org.apache.hadoop.hbase.ipc.ProtocolSignature;
129 import org.apache.hadoop.hbase.ipc.RpcEngine;
130 import org.apache.hadoop.hbase.ipc.RpcServer;
131 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
132 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
133 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
134 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
135 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
136 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
137 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
138 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
139 import org.apache.hadoop.hbase.regionserver.handler.CloseRootHandler;
140 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
141 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
142 import org.apache.hadoop.hbase.regionserver.handler.OpenRootHandler;
143 import org.apache.hadoop.hbase.regionserver.metrics.RegionMetricsStorage;
144 import org.apache.hadoop.hbase.regionserver.metrics.RegionServerDynamicMetrics;
145 import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
146 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
147 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.StoreMetricType;
148 import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager;
149 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
150 import org.apache.hadoop.hbase.regionserver.wal.HLog;
151 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
152 import org.apache.hadoop.hbase.util.Bytes;
153 import org.apache.hadoop.hbase.util.CompressionTest;
154 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
155 import org.apache.hadoop.hbase.util.FSTableDescriptors;
156 import org.apache.hadoop.hbase.util.FSUtils;
157 import org.apache.hadoop.hbase.util.InfoServer;
158 import org.apache.hadoop.hbase.util.Pair;
159 import org.apache.hadoop.hbase.util.Sleeper;
160 import org.apache.hadoop.hbase.util.Strings;
161 import org.apache.hadoop.hbase.util.Threads;
162 import org.apache.hadoop.hbase.util.VersionInfo;
163 import org.apache.hadoop.hbase.zookeeper.ClusterId;
164 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
165 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
166 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
167 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
168 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
169 import org.apache.hadoop.io.MapWritable;
170 import org.apache.hadoop.io.Writable;
171 import org.apache.hadoop.ipc.RemoteException;
172 import org.apache.hadoop.metrics.util.MBeanUtil;
173 import org.apache.hadoop.net.DNS;
174 import org.apache.hadoop.util.ReflectionUtils;
175 import org.apache.hadoop.util.StringUtils;
176 import org.apache.zookeeper.KeeperException;
177 import org.codehaus.jackson.map.ObjectMapper;
178 
179 import com.google.common.base.Function;
180 import com.google.common.collect.Lists;
181 
182 /**
183  * HRegionServer makes a set of HRegions available to clients. It checks in with
184  * the HMaster. There are many HRegionServers in a single HBase deployment.
185  */
186 public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
187     Runnable, RegionServerServices {
188 
189   public static final Log LOG = LogFactory.getLog(HRegionServer.class);
190 
191   // Set when a report to the master comes back with a message asking us to
192   // shutdown. Also set by call to stop when debugging or running unit tests
193   // of HRegionServer in isolation.
194   protected volatile boolean stopped = false;
195 
196   // A state before we go into stopped state.  At this stage we're closing user
197   // space regions.
198   private boolean stopping = false;
199 
200   // Go down hard. Used if file system becomes unavailable and also in
201   // debugging and unit tests.
202   protected volatile boolean abortRequested;
203 
204   private volatile boolean killed = false;
205 
206   // If false, the file system has become unavailable
207   protected volatile boolean fsOk;
208 
209   protected final Configuration conf;
210 
211   protected final AtomicBoolean haveRootRegion = new AtomicBoolean(false);
212   private HFileSystem fs;
213   private boolean useHBaseChecksum; // verify hbase checksums?
214   private Path rootDir;
215   private final Random rand;
216 
217   //RegionName vs current action in progress
218   //true - if open region action in progress
219   //false - if close region action in progress
220   private final ConcurrentSkipListMap<byte[], Boolean> regionsInTransitionInRS =
221       new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
222 
223   /**
224    * Map of regions currently being served by this region server. Key is the
225    * encoded region name.  All access should be synchronized.
226    */
227   protected final Map<String, HRegion> onlineRegions =
228     new ConcurrentHashMap<String, HRegion>();
229 
230   protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
231 
232   final int numRetries;
233   protected final int threadWakeFrequency;
234   private final int msgInterval;
235 
236   protected final int numRegionsToReport;
237 
238   private final long maxScannerResultSize;
239 
240   // Remote HMaster
241   private HMasterRegionInterface hbaseMaster;
242 
243   // RPC Engine for master connection
244   private RpcEngine rpcEngine;
245 
246   // Server to handle client requests. Default access so can be accessed by
247   // unit tests.
248   RpcServer rpcServer;
249 
250   // Server to handle client requests.
251   private HBaseServer server;  
252 
253   private final InetSocketAddress isa;
254   private UncaughtExceptionHandler uncaughtExceptionHandler;
255 
256   // Leases
257   private Leases leases;
258 
259   // Request counter.
260   // Do we need this?  Can't we just sum region counters?  St.Ack 20110412
261   private AtomicInteger requestCount = new AtomicInteger();
262 
263   // Info server. Default access so can be used by unit tests. REGIONSERVER
264   // is name of the webapp and the attribute name used stuffing this instance
265   // into web context.
266   InfoServer infoServer;
267 
268   /** region server process name */
269   public static final String REGIONSERVER = "regionserver";
270   
271   /** region server configuration name */
272   public static final String REGIONSERVER_CONF = "regionserver_conf";
273 
274   /*
275    * Space is reserved in HRS constructor and then released when aborting to
276    * recover from an OOME. See HBASE-706. TODO: Make this percentage of the heap
277    * or a minimum.
278    */
279   private final LinkedList<byte[]> reservedSpace = new LinkedList<byte[]>();
280 
281   private RegionServerMetrics metrics;
282 
283   private RegionServerDynamicMetrics dynamicMetrics;
284 
285   // Compactions
286   public CompactSplitThread compactSplitThread;
287 
288   // Cache flushing
289   MemStoreFlusher cacheFlusher;
290 
291   /*
292    * Check for compactions requests.
293    */
294   Chore compactionChecker;
295 
296   /*
297    * Check for flushes
298    */
299   Chore periodicFlusher;
300 
301   // HLog and HLog roller. log is protected rather than private to avoid
302   // eclipse warning when accessed by inner classes
303   protected volatile HLog hlog;
304   // The meta updates are written to a different hlog. If this
305   // regionserver holds meta regions, then this field will be non-null.
306   protected volatile HLog hlogForMeta;
307 
308   LogRoller hlogRoller;
309   LogRoller metaHLogRoller;
310 
311   private final boolean separateHLogForMeta;
312 
313   // flag set after we're done setting up server threads (used for testing)
314   protected volatile boolean isOnline;
315 
316   final Map<String, RegionScannerHolder> scanners =
317     new ConcurrentHashMap<String, RegionScannerHolder>();
318 
319   // zookeeper connection and watcher
320   private ZooKeeperWatcher zooKeeper;
321 
322   // master address manager and watcher
323   private MasterAddressTracker masterAddressManager;
324 
325   // catalog tracker
326   private CatalogTracker catalogTracker;
327 
328   // Cluster Status Tracker
329   private ClusterStatusTracker clusterStatusTracker;
330 
331   // Log Splitting Worker
332   private SplitLogWorker splitLogWorker;
333 
334   // A sleeper that sleeps for msgInterval.
335   private final Sleeper sleeper;
336 
337   private final int rpcTimeout;
338 
339   // Instance of the hbase executor service.
340   private ExecutorService service;
341 
342   // Replication services. If no replication, this handler will be null.
343   private ReplicationSourceService replicationSourceHandler;
344   private ReplicationSinkService replicationSinkHandler;
345 
346   private final RegionServerAccounting regionServerAccounting;
347 
348   // Cache configuration and block cache reference
349   private final CacheConfig cacheConfig;
350 
351   // reference to the Thrift Server.
352   volatile private HRegionThriftServer thriftServer;
353 
354   /**
355    * The server name the Master sees us as.  Its made from the hostname the
356    * master passes us, port, and server startcode. Gets set after registration
357    * against  Master.  The hostname can differ from the hostname in {@link #isa}
358    * but usually doesn't if both servers resolve .
359    */
360   private ServerName serverNameFromMasterPOV;
361 
362   // region server static info like info port
363   private RegionServerInfo.Builder rsInfo;
364 
365   /**
366    * This servers startcode.
367    */
368   private final long startcode;
369 
370   /**
371    * Go here to get table descriptors.
372    */
373   private TableDescriptors tableDescriptors;
374 
375   /*
376    * Strings to be used in forming the exception message for
377    * RegionsAlreadyInTransitionException.
378    */
379   private static final String OPEN = "OPEN";
380   private static final String CLOSE = "CLOSE";
381 
382   /**
383    * MX Bean for RegionServerInfo
384    */
385   private ObjectName mxBean = null;
386 
387   /**
388    * ClusterId
389    */
390   private ClusterId clusterId = null;
391 
392   private RegionServerCoprocessorHost rsHost;
393 
394   /** The health check chore. */
395   private HealthCheckChore healthCheckChore;
396 
397   /**
398    * Starts a HRegionServer at the default location
399    *
400    * @param conf
401    * @throws IOException
402    * @throws InterruptedException
403    */
404   public HRegionServer(Configuration conf)
405   throws IOException, InterruptedException {
406     this.fsOk = true;
407     this.conf = conf;
408     // Set how many times to retry talking to another server over HConnection.
409     HConnectionManager.setServerSideHConnectionRetries(this.conf, LOG);
410     this.isOnline = false;
411     checkCodecs(this.conf);
412 
413     // do we use checksum verfication in the hbase? If hbase checksum verification
414     // is enabled, then we automatically switch off hdfs checksum verification.
415     this.useHBaseChecksum = conf.getBoolean(
416       HConstants.HBASE_CHECKSUM_VERIFICATION, false);
417 
418     // Config'ed params
419     this.separateHLogForMeta = conf.getBoolean(HLog.SEPARATE_HLOG_FOR_META, false);
420     this.numRetries = conf.getInt("hbase.client.retries.number", 10);
421     this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
422       10 * 1000);
423     this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
424 
425     this.sleeper = new Sleeper(this.msgInterval, this);
426 
427     this.maxScannerResultSize = conf.getLong(
428       HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
429       HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
430 
431     this.numRegionsToReport = conf.getInt(
432       "hbase.regionserver.numregionstoreport", 10);
433 
434     this.rpcTimeout = conf.getInt(
435       HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
436       HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
437 
438     this.abortRequested = false;
439     this.stopped = false;
440 
441     // Server to handle client requests.
442     String hostname = conf.get("hbase.regionserver.ipc.address",
443       Strings.domainNamePointerToHostName(DNS.getDefaultHost(
444         conf.get("hbase.regionserver.dns.interface", "default"),
445         conf.get("hbase.regionserver.dns.nameserver", "default"))));
446     int port = conf.getInt(HConstants.REGIONSERVER_PORT,
447       HConstants.DEFAULT_REGIONSERVER_PORT);
448     // Creation of a HSA will force a resolve.
449     InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
450     if (initialIsa.getAddress() == null) {
451       throw new IllegalArgumentException("Failed resolve of " + initialIsa);
452     }
453 
454     this.rand = new Random(initialIsa.hashCode());
455     this.rpcServer = HBaseRPC.getServer(this,
456       new Class<?>[]{HRegionInterface.class, HBaseRPCErrorHandler.class,
457         OnlineRegions.class},
458         initialIsa.getHostName(), // BindAddress is IP we got for this server.
459         initialIsa.getPort(),
460         conf.getInt("hbase.regionserver.handler.count", 10),
461         conf.getInt("hbase.regionserver.metahandler.count", 10),
462         conf.getBoolean("hbase.rpc.verbose", false),
463         conf, HConstants.QOS_THRESHOLD);
464     if (rpcServer instanceof HBaseServer) server = (HBaseServer) rpcServer;
465     // Set our address.
466     this.isa = this.rpcServer.getListenerAddress();
467 
468     this.rpcServer.setErrorHandler(this);
469     this.rpcServer.setQosFunction(new QosFunction());
470     this.startcode = System.currentTimeMillis();
471 
472     conf.set("hbase.regionserver.rpc.client.socket.bind.address", this.isa.getHostName());
473 
474     // login the zookeeper client principal (if using security)
475     ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
476       "hbase.zookeeper.client.kerberos.principal", this.isa.getHostName());
477 
478     // login the server principal (if using secure Hadoop)
479     UserProvider provider = UserProvider.instantiate(conf);
480     provider.login("hbase.regionserver.keytab.file",
481       "hbase.regionserver.kerberos.principal", this.isa.getHostName());
482     regionServerAccounting = new RegionServerAccounting();
483     cacheConfig = new CacheConfig(conf);
484     uncaughtExceptionHandler = new UncaughtExceptionHandler() {
485       public void uncaughtException(Thread t, Throwable e) {
486         abort("Uncaught exception in service thread " + t.getName(), e);
487       }
488     };
489     this.rsInfo = RegionServerInfo.newBuilder();
490     // Put up the webui.  Webui may come up on port other than configured if
491     // that port is occupied. Adjust serverInfo if this is the case.
492     this.rsInfo.setInfoPort(putUpWebUI());
493   }
494 
495   /** Handle all the snapshot requests to this server */
496   RegionServerSnapshotManager snapshotManager;
497 
498   /**
499    * Run test on configured codecs to make sure supporting libs are in place.
500    * @param c
501    * @throws IOException
502    */
503   private static void checkCodecs(final Configuration c) throws IOException {
504     // check to see if the codec list is available:
505     String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
506     if (codecs == null) return;
507     for (String codec : codecs) {
508       if (!CompressionTest.testCompression(codec)) {
509         throw new IOException("Compression codec " + codec +
510           " not supported, aborting RS construction");
511       }
512     }
513   }
514 
515 
516   @Retention(RetentionPolicy.RUNTIME)
517   private @interface QosPriority {
518     int priority() default 0;
519   }
520 
521   /**
522    * Utility used ensuring higher quality of service for priority rpcs; e.g.
523    * rpcs to .META. and -ROOT-, etc.
524    */
525   class QosFunction implements Function<Writable,Integer> {
526     private final Map<String, Integer> annotatedQos;
527 
528     public QosFunction() {
529       Map<String, Integer> qosMap = new HashMap<String, Integer>();
530       for (Method m : HRegionServer.class.getMethods()) {
531         QosPriority p = m.getAnnotation(QosPriority.class);
532         if (p != null) {
533           qosMap.put(m.getName(), p.priority());
534         }
535       }
536 
537       annotatedQos = qosMap;
538     }
539 
540     public boolean isMetaTable(byte[] regionName) {
541       HRegion region;
542       try {
543         region = getRegion(regionName);
544       } catch (NotServingRegionException ignored) {
545         return false;
546       }
547       return region.getRegionInfo().isMetaTable();
548     }
549 
550     @Override
551     public Integer apply(Writable from) {
552       if (!(from instanceof Invocation)) return HConstants.NORMAL_QOS;
553 
554       Invocation inv = (Invocation) from;
555       String methodName = inv.getMethodName();
556 
557       Integer priorityByAnnotation = annotatedQos.get(methodName);
558       if (priorityByAnnotation != null) {
559         return priorityByAnnotation;
560       }
561 
562       // scanner methods...
563       if (methodName.equals("next") || methodName.equals("close")) {
564         // translate!
565         Long scannerId;
566         try {
567           scannerId = (Long) inv.getParameters()[0];
568         } catch (ClassCastException ignored) {
569           // LOG.debug("Low priority: " + from);
570           return HConstants.NORMAL_QOS;
571         }
572         String scannerIdString = Long.toString(scannerId);
573         RegionScannerHolder holder = scanners.get(scannerIdString);
574         if (holder != null && holder.getScanner().getRegionInfo().isMetaRegion()) {
575           // LOG.debug("High priority scanner request: " + scannerId);
576           return HConstants.HIGH_QOS;
577         }
578       } else if (inv.getParameterClasses().length == 0) {
579        // Just let it through.  This is getOnlineRegions, etc.
580       } else if (inv.getParameterClasses()[0] == byte[].class) {
581         // first arg is byte array, so assume this is a regionname:
582         if (isMetaTable((byte[]) inv.getParameters()[0])) {
583           // LOG.debug("High priority with method: " + methodName +
584           // " and region: "
585           // + Bytes.toString((byte[]) inv.getParameters()[0]));
586           return HConstants.HIGH_QOS;
587         }
588       } else if (inv.getParameterClasses()[0] == MultiAction.class) {
589         MultiAction<?> ma = (MultiAction<?>) inv.getParameters()[0];
590         Set<byte[]> regions = ma.getRegions();
591         // ok this sucks, but if any single of the actions touches a meta, the
592         // whole
593         // thing gets pingged high priority. This is a dangerous hack because
594         // people
595         // can get their multi action tagged high QOS by tossing a Get(.META.)
596         // AND this
597         // regionserver hosts META/-ROOT-
598         for (byte[] region : regions) {
599           if (isMetaTable(region)) {
600             // LOG.debug("High priority multi with region: " +
601             // Bytes.toString(region));
602             return HConstants.HIGH_QOS; // short circuit for the win.
603           }
604         }
605       }
606       // LOG.debug("Low priority: " + from.toString());
607       return HConstants.NORMAL_QOS;
608     }
609   }
610 
611   /**
612    * All initialization needed before we go register with Master.
613    *
614    * @throws IOException
615    * @throws InterruptedException
616    */
617   private void preRegistrationInitialization(){
618     try {
619       initializeZooKeeper();
620 
621       clusterId = new ClusterId(zooKeeper, this);
622       if(clusterId.hasId()) {
623         conf.set(HConstants.CLUSTER_ID, clusterId.getId());
624       }
625 
626       initializeThreads();
627       int nbBlocks = conf.getInt("hbase.regionserver.nbreservationblocks", 4);
628       for (int i = 0; i < nbBlocks; i++) {
629         reservedSpace.add(new byte[HConstants.DEFAULT_SIZE_RESERVATION_BLOCK]);
630       }
631 
632       this.rpcEngine = HBaseRPC.getProtocolEngine(conf);
633     } catch (Throwable t) {
634       // Call stop if error or process will stick around for ever since server
635       // puts up non-daemon threads.
636       this.rpcServer.stop();
637       abort("Initialization of RS failed.  Hence aborting RS.", t);
638     }
639   }
640 
641   /**
642    * Bring up connection to zk ensemble and then wait until a master for this
643    * cluster and then after that, wait until cluster 'up' flag has been set.
644    * This is the order in which master does things.
645    * Finally put up a catalog tracker.
646    * @throws IOException
647    * @throws InterruptedException
648    */
649   private void initializeZooKeeper() throws IOException, InterruptedException {
650     // Open connection to zookeeper and set primary watcher
651     this.zooKeeper = new ZooKeeperWatcher(conf, REGIONSERVER + ":" +
652       this.isa.getPort(), this);
653 
654     // Create the master address manager, register with zk, and start it.  Then
655     // block until a master is available.  No point in starting up if no master
656     // running.
657     this.masterAddressManager = new MasterAddressTracker(this.zooKeeper, this);
658     this.masterAddressManager.start();
659     blockAndCheckIfStopped(this.masterAddressManager);
660 
661     // Wait on cluster being up.  Master will set this flag up in zookeeper
662     // when ready.
663     this.clusterStatusTracker = new ClusterStatusTracker(this.zooKeeper, this);
664     this.clusterStatusTracker.start();
665     blockAndCheckIfStopped(this.clusterStatusTracker);
666 
667     // Create the catalog tracker and start it;
668     this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf, this);
669     catalogTracker.start();
670 
671     // watch for snapshots
672     try {
673       this.snapshotManager = new RegionServerSnapshotManager(this);
674     } catch (KeeperException e) {
675       this.abort("Failed to reach zk cluster when creating snapshot handler.");
676     }
677   }
678 
679   /**
680    * Utilty method to wait indefinitely on a znode availability while checking
681    * if the region server is shut down
682    * @param tracker znode tracker to use
683    * @throws IOException any IO exception, plus if the RS is stopped
684    * @throws InterruptedException
685    */
686   private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker)
687       throws IOException, InterruptedException {
688     while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
689       if (this.stopped) {
690         throw new IOException("Received the shutdown message while waiting.");
691       }
692     }
693   }
694 
695   /**
696    * @return False if cluster shutdown in progress
697    */
698   private boolean isClusterUp() {
699     return this.clusterStatusTracker.isClusterUp();
700   }
701 
702   private void initializeThreads() throws IOException {
703     // Cache flushing thread.
704     this.cacheFlusher = new MemStoreFlusher(conf, this);
705 
706     // Compaction thread
707     this.compactSplitThread = new CompactSplitThread(this);
708 
709     // Background thread to check for compactions; needed if region
710     // has not gotten updates in a while. Make it run at a lesser frequency.
711     int multiplier = this.conf.getInt(HConstants.THREAD_WAKE_FREQUENCY +
712       ".multiplier", 1000);
713     this.compactionChecker = new CompactionChecker(this,
714       this.threadWakeFrequency * multiplier, this);
715 
716     this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
717 
718     // Health checker thread.
719     int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
720       HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
721     if (isHealthCheckerConfigured()) {
722       healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
723     }
724 
725     this.leases = new Leases((int) conf.getLong(
726         HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
727         HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD),
728         this.threadWakeFrequency);
729 
730     // Create the thread for the ThriftServer.
731     if (conf.getBoolean("hbase.regionserver.export.thrift", false)) {
732       thriftServer = new HRegionThriftServer(this, conf);
733       thriftServer.start();
734       LOG.info("Started Thrift API from Region Server.");
735     }
736   }
737 
738   /**
739    * The HRegionServer sticks in this loop until closed.
740    */
741   @SuppressWarnings("deprecation")
742   public void run() {
743     try {
744       // Do pre-registration initializations; zookeeper, lease threads, etc.
745       preRegistrationInitialization();
746     } catch (Throwable e) {
747       abort("Fatal exception during initialization", e);
748     }
749 
750     try {
751       // Try and register with the Master; tell it we are here.  Break if
752       // server is stopped or the clusterup flag is down or hdfs went wacky.
753       while (keepLooping()) {
754         MapWritable w = reportForDuty();
755         if (w == null) {
756           LOG.warn("reportForDuty failed; sleeping and then retrying.");
757           this.sleeper.sleep();
758         } else {
759           handleReportForDutyResponse(w);
760           break;
761         }
762       }
763       registerMBean();
764 
765       // start the snapshot handler, since the server is ready to run
766       this.snapshotManager.start();
767 
768       // We registered with the Master.  Go into run mode.
769       long lastMsg = 0;
770       long oldRequestCount = -1;
771       // The main run loop.
772       while (!this.stopped && isHealthy()) {
773         if (!isClusterUp()) {
774           if (isOnlineRegionsEmpty()) {
775             stop("Exiting; cluster shutdown set and not carrying any regions");
776           } else if (!this.stopping) {
777             this.stopping = true;
778             LOG.info("Closing user regions");
779             closeUserRegions(this.abortRequested);
780           } else if (this.stopping) {
781             boolean allUserRegionsOffline = areAllUserRegionsOffline();
782             if (allUserRegionsOffline) {
783               // Set stopped if no more write requests tp meta tables
784               // since last time we went around the loop.  Any open
785               // meta regions will be closed on our way out.
786               if (oldRequestCount == getWriteRequestCount()) {
787                 stop("Stopped; only catalog regions remaining online");
788                 break;
789               }
790               oldRequestCount = getWriteRequestCount();
791             } else {
792               // Make sure all regions have been closed -- some regions may
793               // have not got it because we were splitting at the time of
794               // the call to closeUserRegions.
795               closeUserRegions(this.abortRequested);
796             }
797             LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
798           }
799         }
800         long now = System.currentTimeMillis();
801         if ((now - lastMsg) >= msgInterval) {
802           doMetrics();
803           tryRegionServerReport();
804           lastMsg = System.currentTimeMillis();
805         }
806         if (!this.stopped) this.sleeper.sleep();
807       } // for
808     } catch (Throwable t) {
809       if (!checkOOME(t)) {
810         abort("Unhandled exception: " + t.getMessage(), t);
811       }
812     }
813     // Run shutdown.
814     if (mxBean != null) {
815       MBeanUtil.unregisterMBean(mxBean);
816       mxBean = null;
817     }
818     if (this.thriftServer != null) this.thriftServer.shutdown();
819     this.leases.closeAfterLeasesExpire();
820     this.rpcServer.stop();
821     if (this.splitLogWorker != null) {
822       splitLogWorker.stop();
823     }
824     if (this.infoServer != null) {
825       LOG.info("Stopping infoServer");
826       try {
827         this.infoServer.stop();
828       } catch (Exception e) {
829         e.printStackTrace();
830       }
831     }
832     // Send cache a shutdown.
833     if (cacheConfig.isBlockCacheEnabled()) {
834       cacheConfig.getBlockCache().shutdown();
835     }
836 
837     // Send interrupts to wake up threads if sleeping so they notice shutdown.
838     // TODO: Should we check they are alive? If OOME could have exited already
839     if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
840     if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
841     if (this.hlogRoller != null) this.hlogRoller.interruptIfNecessary();
842     if (this.metaHLogRoller != null) this.metaHLogRoller.interruptIfNecessary();
843     if (this.compactionChecker != null)
844       this.compactionChecker.interrupt();
845     if (this.healthCheckChore != null) {
846       this.healthCheckChore.interrupt();
847     }
848 
849     // Stop the snapshot handler, forcefully killing all running tasks
850     try {
851       if (snapshotManager != null) snapshotManager.stop(this.abortRequested || this.killed);
852     } catch (IOException e) {
853       LOG.warn("Failed to close snapshot handler cleanly", e);
854     }
855 
856     if (this.killed) {
857       // Just skip out w/o closing regions.  Used when testing.
858     } else if (abortRequested) {
859       if (this.fsOk) {
860         closeUserRegions(abortRequested); // Don't leave any open file handles
861       }
862       LOG.info("aborting server " + this.serverNameFromMasterPOV);
863     } else {
864       closeUserRegions(abortRequested);
865       closeAllScanners();
866       LOG.info("stopping server " + this.serverNameFromMasterPOV);
867     }
868     // Interrupt catalog tracker here in case any regions being opened out in
869     // handlers are stuck waiting on meta or root.
870     if (this.catalogTracker != null) this.catalogTracker.stop();
871 
872     // Closing the compactSplit thread before closing meta regions
873     if (!this.killed && containsMetaTableRegions()) {
874       if (!abortRequested || this.fsOk) {
875         if (this.compactSplitThread != null) {
876           this.compactSplitThread.join();
877           this.compactSplitThread = null;
878         }
879         closeMetaTableRegions(abortRequested);
880       }
881     }
882 
883     if (!this.killed && this.fsOk) {
884       waitOnAllRegionsToClose(abortRequested);
885       LOG.info("stopping server " + this.serverNameFromMasterPOV +
886         "; all regions closed.");
887     }
888 
889     //fsOk flag may be changed when closing regions throws exception.
890     if (!this.killed && this.fsOk) {
891       closeWAL(abortRequested ? false : true);
892     }
893 
894     // Make sure the proxy is down.
895     this.hbaseMaster = null;
896     this.rpcEngine.close();
897     this.leases.close();
898 
899     if (!killed) {
900       join();
901     }
902 
903     try {
904       deleteMyEphemeralNode();
905     } catch (KeeperException e) {
906       LOG.warn("Failed deleting my ephemeral node", e);
907     }
908     this.zooKeeper.close();
909     LOG.info("stopping server " + this.serverNameFromMasterPOV +
910       "; zookeeper connection closed.");
911 
912     LOG.info(Thread.currentThread().getName() + " exiting");
913   }
914 
915   private boolean containsMetaTableRegions() {
916     return onlineRegions.containsKey(HRegionInfo.ROOT_REGIONINFO.getEncodedName())
917         || onlineRegions.containsKey(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
918   }
919 
920   private boolean areAllUserRegionsOffline() {
921     if (getNumberOfOnlineRegions() > 2) return false;
922     boolean allUserRegionsOffline = true;
923     for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
924       if (!e.getValue().getRegionInfo().isMetaTable()) {
925         allUserRegionsOffline = false;
926         break;
927       }
928     }
929     return allUserRegionsOffline;
930   }
931 
932   /**
933    * @return Current write count for all online regions.
934    */
935   private long getWriteRequestCount() {
936     int writeCount = 0;
937     for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
938       writeCount += e.getValue().getWriteRequestsCount();
939     }
940     return writeCount;
941   }
942 
943   void tryRegionServerReport()
944   throws IOException {
945     if (!keepLooping() && hbaseMaster == null) {
946       // the current server is stopping
947       return;
948     }
949     HServerLoad hsl = buildServerLoad();
950     // Why we do this?
951     this.requestCount.set(0);
952     try {
953       this.hbaseMaster.regionServerReport(this.serverNameFromMasterPOV.getVersionedBytes(), hsl);
954     } catch (IOException ioe) {
955       if (ioe instanceof RemoteException) {
956         ioe = ((RemoteException)ioe).unwrapRemoteException();
957       }
958       if (ioe instanceof YouAreDeadException) {
959         // This will be caught and handled as a fatal error in run()
960         throw ioe;
961       }
962       // Couldn't connect to the master, get location from zk and reconnect
963       // Method blocks until new master is found or we are stopped
964       getMaster();
965     }
966   }
967 
968   HServerLoad buildServerLoad() {
969     Collection<HRegion> regions = getOnlineRegionsLocalContext();
970     TreeMap<byte [], HServerLoad.RegionLoad> regionLoads =
971       new TreeMap<byte [], HServerLoad.RegionLoad>(Bytes.BYTES_COMPARATOR);
972     for (HRegion region: regions) {
973       regionLoads.put(region.getRegionName(), createRegionLoad(region));
974     }
975     MemoryUsage memory =
976       ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
977     return new HServerLoad(requestCount.get(),(int)metrics.getRequests(),
978       (int)(memory.getUsed() / 1024 / 1024),
979       (int) (memory.getMax() / 1024 / 1024), regionLoads,
980       this.hlog.getCoprocessorHost().getCoprocessors());
981   }
982 
983   String getOnlineRegionsAsPrintableString() {
984     StringBuilder sb = new StringBuilder();
985     for (HRegion r: this.onlineRegions.values()) {
986       if (sb.length() > 0) sb.append(", ");
987       sb.append(r.getRegionInfo().getEncodedName());
988     }
989     return sb.toString();
990   }
991 
992   /**
993    * Wait on regions close.
994    */
995   private void waitOnAllRegionsToClose(final boolean abort) {
996     // Wait till all regions are closed before going out.
997     int lastCount = -1;
998     long previousLogTime = 0;
999     Set<String> closedRegions = new HashSet<String>();
1000     while (!isOnlineRegionsEmpty()) {
1001       int count = getNumberOfOnlineRegions();
1002       // Only print a message if the count of regions has changed.
1003       if (count != lastCount) {
1004         // Log every second at most
1005         if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1006           previousLogTime = System.currentTimeMillis();
1007           lastCount = count;
1008           LOG.info("Waiting on " + count + " regions to close");
1009           // Only print out regions still closing if a small number else will
1010           // swamp the log.
1011           if (count < 10 && LOG.isDebugEnabled()) {
1012             LOG.debug(this.onlineRegions);
1013           }
1014         }
1015       }
1016       // Ensure all user regions have been sent a close. Use this to
1017       // protect against the case where an open comes in after we start the
1018       // iterator of onlineRegions to close all user regions.
1019       for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1020         HRegionInfo hri = e.getValue().getRegionInfo();
1021         if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1022             && !closedRegions.contains(hri.getEncodedName())) {
1023           closedRegions.add(hri.getEncodedName());
1024           // Don't update zk with this close transition; pass false.
1025           closeRegion(hri, abort, false);
1026         }
1027       }
1028       // No regions in RIT, we could stop waiting now.
1029       if (this.regionsInTransitionInRS.isEmpty()) {
1030         if (!isOnlineRegionsEmpty()) {
1031           LOG.info("We were exiting though online regions are not empty, because some regions failed closing");
1032         }
1033         break;
1034       }
1035       Threads.sleep(200);
1036     }
1037   }
1038 
1039   private void closeWAL(final boolean delete) {
1040     if (this.hlogForMeta != null) {
1041       // All hlogs (meta and non-meta) are in the same directory. Don't call
1042       // closeAndDelete here since that would delete all hlogs not just the
1043       // meta ones. We will just 'close' the hlog for meta here, and leave
1044       // the directory cleanup to the follow-on closeAndDelete call.
1045       try { //Part of the patch from HBASE-7982 to do with exception handling 
1046         this.hlogForMeta.close();
1047       } catch (Throwable e) {
1048         LOG.error("Metalog close and delete failed", RemoteExceptionHandler.checkThrowable(e));
1049       }
1050     }
1051     if (this.hlog != null) {
1052       try {
1053         if (delete) {
1054           hlog.closeAndDelete();
1055         } else {
1056           hlog.close();
1057         }
1058       } catch (Throwable e) {
1059         LOG.error("Close and delete failed", RemoteExceptionHandler.checkThrowable(e));
1060       }
1061     }
1062   }
1063 
1064   private void closeAllScanners() {
1065     // Close any outstanding scanners. Means they'll get an UnknownScanner
1066     // exception next time they come in.
1067     for (Map.Entry<String, RegionScannerHolder> e : this.scanners.entrySet()) {
1068       try {
1069         e.getValue().getScanner().close();
1070       } catch (IOException ioe) {
1071         LOG.warn("Closing scanner " + e.getKey(), ioe);
1072       }
1073     }
1074   }
1075 
1076   /*
1077    * Run init. Sets up hlog and starts up all server threads.
1078    *
1079    * @param c Extra configuration.
1080    */
1081   protected void handleReportForDutyResponse(final MapWritable c)
1082   throws IOException {
1083     try {
1084       for (Map.Entry<Writable, Writable> e :c.entrySet()) {
1085         String key = e.getKey().toString();
1086         // The hostname the master sees us as.
1087         if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1088           String hostnameFromMasterPOV = e.getValue().toString();
1089           this.serverNameFromMasterPOV = new ServerName(hostnameFromMasterPOV,
1090             this.isa.getPort(), this.startcode);
1091           LOG.info("Master passed us hostname to use. Was=" +
1092             this.isa.getHostName() + ", Now=" +
1093             this.serverNameFromMasterPOV.getHostname());
1094           continue;
1095         }
1096         String value = e.getValue().toString();
1097         if (LOG.isDebugEnabled()) {
1098           LOG.debug("Config from master: " + key + "=" + value);
1099         }
1100         this.conf.set(key, value);
1101       }
1102 
1103       // hack! Maps DFSClient => RegionServer for logs.  HDFS made this
1104       // config param for task trackers, but we can piggyback off of it.
1105       if (this.conf.get("mapred.task.id") == null) {
1106         this.conf.set("mapred.task.id", "hb_rs_" +
1107           this.serverNameFromMasterPOV.toString());
1108       }
1109       // Set our ephemeral znode up in zookeeper now we have a name.
1110       createMyEphemeralNode();
1111 
1112       // Master sent us hbase.rootdir to use. Should be fully qualified
1113       // path with file system specification included. Set 'fs.defaultFS'
1114       // to match the filesystem on hbase.rootdir else underlying hadoop hdfs
1115       // accessors will be going against wrong filesystem (unless all is set
1116       // to defaults).
1117       this.conf.set("fs.defaultFS", this.conf.get("hbase.rootdir"));
1118       // Get fs instance used by this RS
1119       this.fs = new HFileSystem(this.conf, this.useHBaseChecksum);
1120       this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
1121       this.tableDescriptors = new FSTableDescriptors(this.fs, this.rootDir, true);
1122       this.hlog = setupWALAndReplication();
1123       // Init in here rather than in constructor after thread name has been set
1124       this.metrics = new RegionServerMetrics();
1125       this.dynamicMetrics = RegionServerDynamicMetrics.newInstance(this);
1126       this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
1127       startServiceThreads();
1128       LOG.info("Serving as " + this.serverNameFromMasterPOV +
1129         ", RPC listening on " + this.isa +
1130         ", sessionid=0x" +
1131         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1132       isOnline = true;
1133     } catch (Throwable e) {
1134       LOG.warn("Exception in region server : ", e);
1135       this.isOnline = false;
1136       stop("Failed initialization");
1137       throw convertThrowableToIOE(cleanup(e, "Failed init"),
1138           "Region server startup failed");
1139     } finally {
1140       sleeper.skipSleepCycle();
1141     }
1142   }
1143 
1144   private String getMyEphemeralNodePath() {
1145     return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
1146   }
1147 
1148   private void createMyEphemeralNode() throws KeeperException, IOException {
1149     byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1150     ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper,
1151       getMyEphemeralNodePath(), data);
1152   }
1153 
1154   private void deleteMyEphemeralNode() throws KeeperException {
1155     ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1156   }
1157 
1158   public RegionServerAccounting getRegionServerAccounting() {
1159     return regionServerAccounting;
1160   }
1161 
1162   /*
1163    * @param r Region to get RegionLoad for.
1164    *
1165    * @return RegionLoad instance.
1166    *
1167    * @throws IOException
1168    */
1169   private HServerLoad.RegionLoad createRegionLoad(final HRegion r) {
1170     byte[] name = r.getRegionName();
1171     int stores = 0;
1172     int storefiles = 0;
1173     int storeUncompressedSizeMB = 0;
1174     int storefileSizeMB = 0;
1175     int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
1176     int storefileIndexSizeMB = 0;
1177     int rootIndexSizeKB = 0;
1178     int totalStaticIndexSizeKB = 0;
1179     int totalStaticBloomSizeKB = 0;
1180     long totalCompactingKVs = 0;
1181     long currentCompactedKVs = 0;
1182     synchronized (r.stores) {
1183       stores += r.stores.size();
1184       for (Store store : r.stores.values()) {
1185         storefiles += store.getStorefilesCount();
1186         storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed()
1187             / 1024 / 1024);
1188         storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1189         storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
1190         CompactionProgress progress = store.getCompactionProgress();
1191         if (progress != null) {
1192           totalCompactingKVs += progress.totalCompactingKVs;
1193           currentCompactedKVs += progress.currentCompactedKVs;
1194         }
1195 
1196         rootIndexSizeKB +=
1197             (int) (store.getStorefilesIndexSize() / 1024);
1198 
1199         totalStaticIndexSizeKB +=
1200           (int) (store.getTotalStaticIndexSize() / 1024);
1201 
1202         totalStaticBloomSizeKB +=
1203           (int) (store.getTotalStaticBloomSize() / 1024);
1204       }
1205     }
1206     return new HServerLoad.RegionLoad(name, stores, storefiles,
1207         storeUncompressedSizeMB,
1208         storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB, rootIndexSizeKB,
1209         totalStaticIndexSizeKB, totalStaticBloomSizeKB,
1210         (int) r.readRequestsCount.get(), (int) r.writeRequestsCount.get(),
1211         totalCompactingKVs, currentCompactedKVs);
1212   }
1213 
1214   /**
1215    * @param encodedRegionName
1216    * @return An instance of RegionLoad.
1217    */
1218   public HServerLoad.RegionLoad createRegionLoad(final String encodedRegionName) {
1219     HRegion r = null;
1220     r = this.onlineRegions.get(encodedRegionName);
1221     return r != null ? createRegionLoad(r) : null;
1222   }
1223 
1224   /*
1225    * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
1226    * IOE if it isn't already.
1227    *
1228    * @param t Throwable
1229    *
1230    * @return Throwable converted to an IOE; methods can only let out IOEs.
1231    */
1232   private Throwable cleanup(final Throwable t) {
1233     return cleanup(t, null);
1234   }
1235 
1236   /*
1237    * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
1238    * IOE if it isn't already.
1239    *
1240    * @param t Throwable
1241    *
1242    * @param msg Message to log in error. Can be null.
1243    *
1244    * @return Throwable converted to an IOE; methods can only let out IOEs.
1245    */
1246   private Throwable cleanup(final Throwable t, final String msg) {
1247     // Don't log as error if NSRE; NSRE is 'normal' operation.
1248     if (t instanceof NotServingRegionException) {
1249       LOG.debug("NotServingRegionException; " +  t.getMessage());
1250       return t;
1251     }
1252     if (msg == null) {
1253       LOG.error("", RemoteExceptionHandler.checkThrowable(t));
1254     } else {
1255       LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
1256     }
1257     if (!checkOOME(t)) {
1258       checkFileSystem();
1259     }
1260     return t;
1261   }
1262 
1263   /*
1264    * @param t
1265    *
1266    * @return Make <code>t</code> an IOE if it isn't already.
1267    */
1268   private IOException convertThrowableToIOE(final Throwable t) {
1269     return convertThrowableToIOE(t, null);
1270   }
1271 
1272   /*
1273    * @param t
1274    *
1275    * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
1276    *
1277    * @return Make <code>t</code> an IOE if it isn't already.
1278    */
1279   private IOException convertThrowableToIOE(final Throwable t, final String msg) {
1280     return (t instanceof IOException ? (IOException) t : msg == null
1281         || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
1282   }
1283 
1284   /*
1285    * Check if an OOME and, if so, abort immediately to avoid creating more objects.
1286    *
1287    * @param e
1288    *
1289    * @return True if we OOME'd and are aborting.
1290    */
1291   public boolean checkOOME(final Throwable e) {
1292     boolean stop = false;
1293     try {
1294       if (e instanceof OutOfMemoryError
1295           || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
1296           || (e.getMessage() != null && e.getMessage().contains(
1297               "java.lang.OutOfMemoryError"))) {
1298         stop = true;
1299         LOG.fatal(
1300           "Run out of memory; HRegionServer will abort itself immediately", e);
1301       }
1302     } finally {
1303       if (stop) {
1304         Runtime.getRuntime().halt(1);
1305       }
1306     }
1307     return stop;
1308   }
1309 
1310   /**
1311    * Checks to see if the file system is still accessible. If not, sets
1312    * abortRequested and stopRequested
1313    *
1314    * @return false if file system is not available
1315    */
1316   public boolean checkFileSystem() {
1317     if (this.fsOk && this.fs != null) {
1318       try {
1319         FSUtils.checkFileSystemAvailable(this.fs);
1320       } catch (IOException e) {
1321         abort("File System not available", e);
1322         this.fsOk = false;
1323       }
1324     }
1325     return this.fsOk;
1326   }
1327 
1328   /*
1329    * Inner class that runs on a long period checking if regions need compaction.
1330    */
1331   private static class CompactionChecker extends Chore {
1332     private final HRegionServer instance;
1333     private final int majorCompactPriority;
1334     private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1335 
1336     CompactionChecker(final HRegionServer h, final int sleepTime,
1337         final Stoppable stopper) {
1338       super("CompactionChecker", sleepTime, h);
1339       this.instance = h;
1340       LOG.info("Runs every " + StringUtils.formatTime(sleepTime));
1341 
1342       /* MajorCompactPriority is configurable.
1343        * If not set, the compaction will use default priority.
1344        */
1345       this.majorCompactPriority = this.instance.conf.
1346         getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1347         DEFAULT_PRIORITY);
1348     }
1349 
1350     @Override
1351     protected void chore() {
1352       for (HRegion r : this.instance.onlineRegions.values()) {
1353         if (r == null)
1354           continue;
1355         for (Store s : r.getStores().values()) {
1356           try {
1357             if (s.needsCompaction()) {
1358               // Queue a compaction. Will recognize if major is needed.
1359               this.instance.compactSplitThread.requestCompaction(r, s, getName()
1360                   + " requests compaction", null);
1361             } else if (s.isMajorCompaction()) {
1362               if (majorCompactPriority == DEFAULT_PRIORITY
1363                   || majorCompactPriority > r.getCompactPriority()) {
1364                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1365                     + " requests major compaction; use default priority", null);
1366               } else {
1367                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1368                     + " requests major compaction; use configured priority",
1369                   this.majorCompactPriority, null);
1370               }
1371             }
1372           } catch (IOException e) {
1373             LOG.warn("Failed major compaction check on " + r, e);
1374           }
1375         }
1376       }
1377     }
1378   }
1379 
1380   class PeriodicMemstoreFlusher extends Chore {
1381     final HRegionServer server;
1382     final static int RANGE_OF_DELAY = 20000; //millisec
1383     final static int MIN_DELAY_TIME = 3000; //millisec
1384     public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1385       super(server.getServerName() + "-MemstoreFlusherChore", cacheFlushInterval, server);
1386       this.server = server;
1387     }
1388 
1389     @Override
1390     protected void chore() {
1391       for (HRegion r : this.server.onlineRegions.values()) {
1392         if (r == null)
1393           continue;
1394         if (r.shouldFlush()) {
1395           FlushRequester requester = server.getFlushRequester();
1396           if (requester != null) {
1397             long randomDelay = rand.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
1398             LOG.info(getName() + " requesting flush for region " + r.getRegionNameAsString() + 
1399                 " after a delay of " + randomDelay);
1400             //Throttle the flushes by putting a delay. If we don't throttle, and there
1401             //is a balanced write-load on the regions in a table, we might end up 
1402             //overwhelming the filesystem with too many flushes at once.
1403             requester.requestDelayedFlush(r, randomDelay);
1404           }
1405         }
1406       }
1407     }
1408   }
1409 
1410   /**
1411    * Report the status of the server. A server is online once all the startup is
1412    * completed (setting up filesystem, starting service threads, etc.). This
1413    * method is designed mostly to be useful in tests.
1414    *
1415    * @return true if online, false if not.
1416    */
1417   public boolean isOnline() {
1418     return isOnline;
1419   }
1420 
1421   /**
1422    * Setup WAL log and replication if enabled.
1423    * Replication setup is done in here because it wants to be hooked up to WAL.
1424    * @return A WAL instance.
1425    * @throws IOException
1426    */
1427   private HLog setupWALAndReplication() throws IOException {
1428     final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1429     Path logdir = new Path(rootDir,
1430       HLog.getHLogDirectoryName(this.serverNameFromMasterPOV.toString()));
1431     if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1432     if (this.fs.exists(logdir)) {
1433       throw new RegionServerRunningException("Region server has already " +
1434         "created directory at " + this.serverNameFromMasterPOV.toString());
1435     }
1436 
1437     // Instantiate replication manager if replication enabled.  Pass it the
1438     // log directories.
1439     createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
1440     return instantiateHLog(logdir, oldLogDir);
1441   }
1442 
1443   // The method is synchronized to guarantee atomic update to hlogForMeta - 
1444   // It is possible that multiple calls could be made to this method almost 
1445   // at the same time, one for _ROOT_ and another for .META. (if they happen
1446   // to be assigned to the same RS). Also, we want to use the same log for both
1447   private synchronized HLog getMetaWAL() throws IOException {
1448     if (this.hlogForMeta == null) {
1449       final String logName
1450       = HLog.getHLogDirectoryName(this.serverNameFromMasterPOV.toString());
1451 
1452       Path logdir = new Path(rootDir, logName);
1453       final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1454       if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1455       this.hlogForMeta = new HLog(this.fs.getBackingFs(), logdir, oldLogDir, this.conf,
1456           getMetaWALActionListeners(), false, this.serverNameFromMasterPOV.toString(), true);
1457     }
1458     return this.hlogForMeta;
1459   }
1460 
1461   /**
1462    * Called by {@link #setupWALAndReplication()} creating WAL instance.
1463    * @param logdir
1464    * @param oldLogDir
1465    * @return WAL instance.
1466    * @throws IOException
1467    */
1468   protected HLog instantiateHLog(Path logdir, Path oldLogDir) throws IOException {
1469     return new HLog(this.fs.getBackingFs(), logdir, oldLogDir, this.conf,
1470       getWALActionListeners(), this.serverNameFromMasterPOV.toString());
1471   }
1472 
1473   /**
1474    * Called by {@link #instantiateHLog(Path, Path)} setting up WAL instance.
1475    * Add any {@link WALActionsListener}s you want inserted before WAL startup.
1476    * @return List of WALActionsListener that will be passed in to
1477    * {@link HLog} on construction.
1478    */
1479   protected List<WALActionsListener> getWALActionListeners() {
1480     List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1481     // Log roller.
1482     this.hlogRoller = new LogRoller(this, this);
1483     listeners.add(this.hlogRoller);
1484     if (this.replicationSourceHandler != null &&
1485         this.replicationSourceHandler.getWALActionsListener() != null) {
1486       // Replication handler is an implementation of WALActionsListener.
1487       listeners.add(this.replicationSourceHandler.getWALActionsListener());
1488     }
1489     return listeners;
1490   }
1491 
1492   protected List<WALActionsListener> getMetaWALActionListeners() {
1493     List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1494     // Using a tmp log roller to ensure metaLogRoller is alive once it is not
1495     // null (addendum patch on HBASE-7213)
1496     MetaLogRoller tmpLogRoller = new MetaLogRoller(this, this);
1497     String n = Thread.currentThread().getName();
1498     Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
1499         n + "MetaLogRoller", uncaughtExceptionHandler);
1500     this.metaHLogRoller = tmpLogRoller;
1501     tmpLogRoller = null;
1502     listeners.add(this.metaHLogRoller);
1503     return listeners;
1504   }
1505 
1506   protected LogRoller getLogRoller() {
1507     return hlogRoller;
1508   }
1509 
1510   /*
1511    * @param interval Interval since last time metrics were called.
1512    */
1513   protected void doMetrics() {
1514     try {
1515       metrics();
1516     } catch (Throwable e) {
1517       LOG.warn("Failed metrics", e);
1518     }
1519   }
1520 
1521   protected void metrics() {
1522     this.metrics.regions.set(this.onlineRegions.size());
1523     this.metrics.incrementRequests(this.requestCount.get());
1524     this.metrics.requests.intervalHeartBeat();
1525     // Is this too expensive every three seconds getting a lock on onlineRegions
1526     // and then per store carried? Can I make metrics be sloppier and avoid
1527     // the synchronizations?
1528     int stores = 0;
1529     int storefiles = 0;
1530     long memstoreSize = 0;
1531     long readRequestsCount = 0;
1532     long writeRequestsCount = 0;
1533     long storefileIndexSize = 0;
1534     HDFSBlocksDistribution hdfsBlocksDistribution =
1535       new HDFSBlocksDistribution();
1536     long totalStaticIndexSize = 0;
1537     long totalStaticBloomSize = 0;
1538     long numPutsWithoutWAL = 0;
1539     long dataInMemoryWithoutWAL = 0;
1540     long updatesBlockedMs = 0;
1541 
1542     // Note that this is a map of Doubles instead of Longs. This is because we
1543     // do effective integer division, which would perhaps truncate more than it
1544     // should because we do it only on one part of our sum at a time. Rather
1545     // than dividing at the end, where it is difficult to know the proper
1546     // factor, everything is exact then truncated.
1547     final Map<String, MutableDouble> tempVals =
1548         new HashMap<String, MutableDouble>();
1549 
1550     for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1551       HRegion r = e.getValue();
1552       memstoreSize += r.memstoreSize.get();
1553       numPutsWithoutWAL += r.numPutsWithoutWAL.get();
1554       dataInMemoryWithoutWAL += r.dataInMemoryWithoutWAL.get();
1555       readRequestsCount += r.readRequestsCount.get();
1556       writeRequestsCount += r.writeRequestsCount.get();
1557       updatesBlockedMs += r.updatesBlockedMs.get();
1558       synchronized (r.stores) {
1559         stores += r.stores.size();
1560         for (Map.Entry<byte[], Store> ee : r.stores.entrySet()) {
1561             final Store store = ee.getValue();
1562             final SchemaMetrics schemaMetrics = store.getSchemaMetrics();
1563 
1564             {
1565               long tmpStorefiles = store.getStorefilesCount();
1566               schemaMetrics.accumulateStoreMetric(tempVals,
1567                   StoreMetricType.STORE_FILE_COUNT, tmpStorefiles);
1568               storefiles += tmpStorefiles;
1569             }
1570 
1571 
1572             {
1573               long tmpStorefileIndexSize = store.getStorefilesIndexSize();
1574               schemaMetrics.accumulateStoreMetric(tempVals,
1575                   StoreMetricType.STORE_FILE_INDEX_SIZE,
1576                   (long) (tmpStorefileIndexSize / (1024.0 * 1024)));
1577               storefileIndexSize += tmpStorefileIndexSize;
1578             }
1579 
1580             {
1581               long tmpStorefilesSize = store.getStorefilesSize();
1582               schemaMetrics.accumulateStoreMetric(tempVals,
1583                   StoreMetricType.STORE_FILE_SIZE_MB,
1584                   (long) (tmpStorefilesSize / (1024.0 * 1024)));
1585             }
1586 
1587             {
1588               long tmpStaticBloomSize = store.getTotalStaticBloomSize();
1589               schemaMetrics.accumulateStoreMetric(tempVals,
1590                   StoreMetricType.STATIC_BLOOM_SIZE_KB,
1591                   (long) (tmpStaticBloomSize / 1024.0));
1592               totalStaticBloomSize += tmpStaticBloomSize;
1593             }
1594 
1595             {
1596               long tmpStaticIndexSize = store.getTotalStaticIndexSize();
1597               schemaMetrics.accumulateStoreMetric(tempVals,
1598                   StoreMetricType.STATIC_INDEX_SIZE_KB,
1599                   (long) (tmpStaticIndexSize / 1024.0));
1600               totalStaticIndexSize += tmpStaticIndexSize;
1601             }
1602 
1603             schemaMetrics.accumulateStoreMetric(tempVals,
1604                 StoreMetricType.MEMSTORE_SIZE_MB,
1605                 (long) (store.getMemStoreSize() / (1024.0 * 1024)));
1606         }
1607       }
1608 
1609       hdfsBlocksDistribution.add(r.getHDFSBlocksDistribution());
1610     }
1611 
1612     for (Entry<String, MutableDouble> e : tempVals.entrySet()) {
1613       RegionMetricsStorage.setNumericMetric(e.getKey(), e.getValue().longValue());
1614     }
1615 
1616     this.metrics.stores.set(stores);
1617     this.metrics.storefiles.set(storefiles);
1618     this.metrics.hlogFileCount.set(this.hlog.getNumLogFiles());
1619     this.metrics.hlogFileSizeMB.set(this.hlog.getNumLogFileSize() /(1024 * 1024));
1620     this.metrics.memstoreSizeMB.set((int) (memstoreSize / (1024 * 1024)));
1621     this.metrics.mbInMemoryWithoutWAL.set((int) (dataInMemoryWithoutWAL / (1024 * 1024)));
1622     this.metrics.numPutsWithoutWAL.set(numPutsWithoutWAL);
1623     this.metrics.storefileIndexSizeMB.set(
1624         (int) (storefileIndexSize / (1024 * 1024)));
1625     this.metrics.rootIndexSizeKB.set(
1626         (int) (storefileIndexSize / 1024));
1627     this.metrics.totalStaticIndexSizeKB.set(
1628         (int) (totalStaticIndexSize / 1024));
1629     this.metrics.totalStaticBloomSizeKB.set(
1630         (int) (totalStaticBloomSize / 1024));
1631     this.metrics.readRequestsCount.set(readRequestsCount);
1632     this.metrics.writeRequestsCount.set(writeRequestsCount);
1633     this.metrics.compactionQueueSize.set(compactSplitThread
1634         .getCompactionQueueSize());
1635     this.metrics.flushQueueSize.set(cacheFlusher
1636         .getFlushQueueSize());
1637     this.metrics.updatesBlockedSeconds.set(updatesBlockedMs/1000);
1638     final long updatesBlockedMsHigherWater = cacheFlusher.getUpdatesBlockedMsHighWater().get();
1639     this.metrics.updatesBlockedSecondsHighWater.set(updatesBlockedMsHigherWater/1000);
1640 
1641     BlockCache blockCache = cacheConfig.getBlockCache();
1642     if (blockCache != null) {
1643       this.metrics.blockCacheCount.set(blockCache.size());
1644       this.metrics.blockCacheFree.set(blockCache.getFreeSize());
1645       this.metrics.blockCacheSize.set(blockCache.getCurrentSize());
1646       CacheStats cacheStats = blockCache.getStats();
1647       this.metrics.blockCacheHitCount.set(cacheStats.getHitCount());
1648       this.metrics.blockCacheMissCount.set(cacheStats.getMissCount());
1649       this.metrics.blockCacheEvictedCount.set(blockCache.getEvictedCount());
1650       double ratio = blockCache.getStats().getHitRatio();
1651       int percent = (int) (ratio * 100);
1652       this.metrics.blockCacheHitRatio.set(percent);
1653       ratio = blockCache.getStats().getHitCachingRatio();
1654       percent = (int) (ratio * 100);
1655       this.metrics.blockCacheHitCachingRatio.set(percent);
1656       // past N period block cache hit / hit caching ratios
1657       cacheStats.rollMetricsPeriod();
1658       ratio = cacheStats.getHitRatioPastNPeriods();
1659       percent = (int) (ratio * 100);
1660       this.metrics.blockCacheHitRatioPastNPeriods.set(percent);
1661       ratio = cacheStats.getHitCachingRatioPastNPeriods();
1662       percent = (int) (ratio * 100);
1663       this.metrics.blockCacheHitCachingRatioPastNPeriods.set(percent);
1664     }
1665     float localityIndex = hdfsBlocksDistribution.getBlockLocalityIndex(
1666       getServerName().getHostname());
1667     int percent = (int) (localityIndex * 100);
1668     this.metrics.hdfsBlocksLocalityIndex.set(percent);
1669 
1670   }
1671 
1672   /**
1673    * @return Region server metrics instance.
1674    */
1675   public RegionServerMetrics getMetrics() {
1676     return this.metrics;
1677   }
1678 
1679   /**
1680    * @return Master address tracker instance.
1681    */
1682   public MasterAddressTracker getMasterAddressManager() {
1683     return this.masterAddressManager;
1684   }
1685 
1686   /*
1687    * Start maintanence Threads, Server, Worker and lease checker threads.
1688    * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1689    * get an unhandled exception. We cannot set the handler on all threads.
1690    * Server's internal Listener thread is off limits. For Server, if an OOME, it
1691    * waits a while then retries. Meantime, a flush or a compaction that tries to
1692    * run should trigger same critical condition and the shutdown will run. On
1693    * its way out, this server will shut down Server. Leases are sort of
1694    * inbetween. It has an internal thread that while it inherits from Chore, it
1695    * keeps its own internal stop mechanism so needs to be stopped by this
1696    * hosting server. Worker logs the exception and exits.
1697    */
1698   private void startServiceThreads() throws IOException {
1699     String n = Thread.currentThread().getName();
1700     // Start executor services
1701     this.service = new ExecutorService(getServerName().toString());
1702     this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
1703       conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1704     this.service.startExecutorService(ExecutorType.RS_OPEN_ROOT,
1705       conf.getInt("hbase.regionserver.executor.openroot.threads", 1));
1706     this.service.startExecutorService(ExecutorType.RS_OPEN_META,
1707       conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1708     this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1709       conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1710     this.service.startExecutorService(ExecutorType.RS_CLOSE_ROOT,
1711       conf.getInt("hbase.regionserver.executor.closeroot.threads", 1));
1712     this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
1713       conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1714 
1715     Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller",
1716         uncaughtExceptionHandler);
1717     Threads.setDaemonThreadRunning(this.cacheFlusher.getThread(), n + ".cacheFlusher",
1718         uncaughtExceptionHandler);
1719     Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n +
1720       ".compactionChecker", uncaughtExceptionHandler);
1721     Threads.setDaemonThreadRunning(this.periodicFlusher.getThread(), n +
1722         ".periodicFlusher", uncaughtExceptionHandler);
1723     if (this.healthCheckChore != null) {
1724       Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker",
1725           uncaughtExceptionHandler);
1726     }
1727 
1728     // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1729     // an unhandled exception, it will just exit.
1730     this.leases.setName(n + ".leaseChecker");
1731     this.leases.start();
1732 
1733     if (this.replicationSourceHandler == this.replicationSinkHandler &&
1734         this.replicationSourceHandler != null) {
1735       this.replicationSourceHandler.startReplicationService();
1736     } else {
1737       if (this.replicationSourceHandler != null) {
1738         this.replicationSourceHandler.startReplicationService();
1739       }
1740       if (this.replicationSinkHandler != null) {
1741         this.replicationSinkHandler.startReplicationService();
1742       }
1743     }
1744 
1745     // Start Server.  This service is like leases in that it internally runs
1746     // a thread.
1747     this.rpcServer.start();
1748 
1749     // Create the log splitting worker and start it
1750     this.splitLogWorker = new SplitLogWorker(this.zooKeeper,
1751         this.getConfiguration(), this.getServerName().toString());
1752     splitLogWorker.start();
1753     
1754   }
1755 
1756   /**
1757    * Puts up the webui.
1758    * @return Returns final port -- maybe different from what we started with.
1759    * @throws IOException
1760    */
1761   private int putUpWebUI() throws IOException {
1762     int port = this.conf.getInt("hbase.regionserver.info.port", 60030);
1763     // -1 is for disabling info server
1764     if (port < 0) return port;
1765     String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
1766     // check if auto port bind enabled
1767     boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
1768         false);
1769     while (true) {
1770       try {
1771         this.infoServer = new InfoServer("regionserver", addr, port, false, this.conf);
1772         this.infoServer.addServlet("status", "/rs-status", RSStatusServlet.class);
1773         this.infoServer.addServlet("dump", "/dump", RSDumpServlet.class);
1774         this.infoServer.setAttribute(REGIONSERVER, this);
1775         this.infoServer.setAttribute(REGIONSERVER_CONF, conf);
1776         this.infoServer.start();
1777         break;
1778       } catch (BindException e) {
1779         if (!auto) {
1780           // auto bind disabled throw BindException
1781           throw e;
1782         }
1783         // auto bind enabled, try to use another port
1784         LOG.info("Failed binding http info server to port: " + port);
1785         port++;
1786       }
1787     }
1788     return this.infoServer.getPort();
1789   }
1790 
1791   /*
1792    * Verify that server is healthy
1793    */
1794   private boolean isHealthy() {
1795     if (!fsOk) {
1796       // File system problem
1797       return false;
1798     }
1799     // Verify that all threads are alive
1800     if (!(leases.isAlive()
1801         && cacheFlusher.isAlive() && hlogRoller.isAlive()
1802         && this.compactionChecker.isAlive()
1803         && this.periodicFlusher.isAlive())) {
1804       stop("One or more threads are no longer alive -- stop");
1805       return false;
1806     }
1807     if (metaHLogRoller != null && !metaHLogRoller.isAlive()) {
1808       stop("Meta HLog roller thread is no longer alive -- stop");
1809       return false;
1810     }
1811     return true;
1812   }
1813 
1814   public HLog getWAL() {
1815     try {
1816       return getWAL(null);
1817     } catch (IOException e) {
1818       LOG.warn("getWAL threw exception " + e);
1819       return null; 
1820     }
1821   }
1822 
1823   @Override
1824   public HLog getWAL(HRegionInfo regionInfo) throws IOException {
1825     //TODO: at some point this should delegate to the HLogFactory
1826     //currently, we don't care about the region as much as we care about the 
1827     //table.. (hence checking the tablename below)
1828     //_ROOT_ and .META. regions have separate WAL. 
1829     if (this.separateHLogForMeta && 
1830         regionInfo != null && 
1831         regionInfo.isMetaTable()) {
1832       return getMetaWAL();
1833     }
1834     return this.hlog;
1835   }
1836 
1837   @Override
1838   public CatalogTracker getCatalogTracker() {
1839     return this.catalogTracker;
1840   }
1841 
1842   @Override
1843   public void stop(final String msg) {
1844     if (!this.stopped) {
1845       try {
1846         if (this.rsHost != null) {
1847           this.rsHost.preStop(msg);
1848         }
1849         this.stopped = true;
1850         LOG.info("STOPPED: " + msg);
1851         // Wakes run() if it is sleeping
1852         sleeper.skipSleepCycle();
1853       } catch (IOException exp) {
1854         LOG.warn("The region server did not stop", exp);
1855       }
1856     }
1857   }
1858 
1859   public void waitForServerOnline(){
1860     while (!isOnline() && !isStopped()){
1861        sleeper.sleep();
1862     }
1863   }
1864 
1865   @Override
1866   public void postOpenDeployTasks(final HRegion r, final CatalogTracker ct,
1867       final boolean daughter)
1868   throws KeeperException, IOException {
1869     checkOpen();
1870     LOG.info("Post open deploy tasks for region=" + r.getRegionNameAsString() +
1871       ", daughter=" + daughter);
1872     // Do checks to see if we need to compact (references or too many files)
1873     for (Store s : r.getStores().values()) {
1874       if (s.hasReferences() || s.needsCompaction()) {
1875         getCompactionRequester().requestCompaction(r, s, "Opening Region", null);
1876       }
1877     }
1878     // Update ZK, ROOT or META
1879     if (r.getRegionInfo().isRootRegion()) {
1880       RootLocationEditor.setRootLocation(getZooKeeper(),
1881        this.serverNameFromMasterPOV);
1882     } else if (r.getRegionInfo().isMetaRegion()) {
1883       MetaEditor.updateMetaLocation(ct, r.getRegionInfo(),
1884         this.serverNameFromMasterPOV);
1885     } else {
1886       if (daughter) {
1887         // If daughter of a split, update whole row, not just location.
1888         MetaEditor.addDaughter(ct, r.getRegionInfo(),
1889           this.serverNameFromMasterPOV);
1890       } else {
1891         MetaEditor.updateRegionLocation(ct, r.getRegionInfo(),
1892           this.serverNameFromMasterPOV);
1893       }
1894     }
1895     LOG.info("Done with post open deploy task for region=" +
1896       r.getRegionNameAsString() + ", daughter=" + daughter);
1897 
1898   }
1899 
1900   /**
1901    * Return a reference to the metrics instance used for counting RPC calls.
1902    * @return Metrics instance.
1903    */
1904   public HBaseRpcMetrics getRpcMetrics() {
1905     return rpcServer.getRpcMetrics();
1906   }
1907 
1908   @Override
1909   public RpcServer getRpcServer() {
1910     return rpcServer;
1911   }
1912 
1913   /**
1914    * Cause the server to exit without closing the regions it is serving, the log
1915    * it is using and without notifying the master. Used unit testing and on
1916    * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
1917    *
1918    * @param reason
1919    *          the reason we are aborting
1920    * @param cause
1921    *          the exception that caused the abort, or null
1922    */
1923   public void abort(String reason, Throwable cause) {
1924     String msg = "ABORTING region server " + this + ": " + reason;
1925     if (cause != null) {
1926       LOG.fatal(msg, cause);
1927     } else {
1928       LOG.fatal(msg);
1929     }
1930     this.abortRequested = true;
1931     this.reservedSpace.clear();
1932     // HBASE-4014: show list of coprocessors that were loaded to help debug
1933     // regionserver crashes.Note that we're implicitly using
1934     // java.util.HashSet's toString() method to print the coprocessor names.
1935     LOG.fatal("RegionServer abort: loaded coprocessors are: " +
1936         CoprocessorHost.getLoadedCoprocessors());
1937     if (this.metrics != null) {
1938       LOG.info("Dump of metrics: " + this.metrics);
1939     }
1940     // Do our best to report our abort to the master, but this may not work
1941     try {
1942       if (cause != null) {
1943         msg += "\nCause:\n" + StringUtils.stringifyException(cause);
1944       }
1945       if (hbaseMaster != null) {
1946         hbaseMaster.reportRSFatalError(
1947             this.serverNameFromMasterPOV.getVersionedBytes(), msg);
1948       }
1949     } catch (Throwable t) {
1950       LOG.warn("Unable to report fatal error to master", t);
1951     }
1952     stop(reason);
1953   }
1954 
1955   /**
1956    * @see HRegionServer#abort(String, Throwable)
1957    */
1958   public void abort(String reason) {
1959     abort(reason, null);
1960   }
1961 
1962   public boolean isAborted() {
1963     return this.abortRequested;
1964   }
1965 
1966   /*
1967    * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
1968    * logs but it does close socket in case want to bring up server on old
1969    * hostname+port immediately.
1970    */
1971   protected void kill() {
1972     this.killed = true;
1973     abort("Simulated kill");
1974   }
1975 
1976   /**
1977    * Wait on all threads to finish. Presumption is that all closes and stops
1978    * have already been called.
1979    */
1980   protected void join() {
1981     Threads.shutdown(this.compactionChecker.getThread());
1982     Threads.shutdown(this.periodicFlusher.getThread());
1983     Threads.shutdown(this.cacheFlusher.getThread());
1984     if (this.healthCheckChore != null) {
1985       Threads.shutdown(this.healthCheckChore.getThread());
1986     }
1987     if (this.hlogRoller != null) {
1988       Threads.shutdown(this.hlogRoller.getThread());
1989     }
1990     if (this.metaHLogRoller != null) {
1991       Threads.shutdown(this.metaHLogRoller.getThread());
1992     }
1993     if (this.compactSplitThread != null) {
1994       this.compactSplitThread.join();
1995     }
1996     if (this.service != null) this.service.shutdown();
1997     if (this.replicationSourceHandler != null &&
1998         this.replicationSourceHandler == this.replicationSinkHandler) {
1999       this.replicationSourceHandler.stopReplicationService();
2000     } else {
2001       if (this.replicationSourceHandler != null) {
2002         this.replicationSourceHandler.stopReplicationService();
2003       }
2004       if (this.replicationSinkHandler != null) {
2005         this.replicationSinkHandler.stopReplicationService();
2006       }
2007     }
2008   }
2009 
2010   /**
2011    * @return Return the object that implements the replication
2012    * source service.
2013    */
2014   ReplicationSourceService getReplicationSourceService() {
2015     return replicationSourceHandler;
2016   }
2017 
2018   /**
2019    * @return Return the object that implements the replication
2020    * sink service.
2021    */
2022   ReplicationSinkService getReplicationSinkService() {
2023     return replicationSinkHandler;
2024   }
2025 
2026   /**
2027    * Get the current master from ZooKeeper and open the RPC connection to it.
2028    *
2029    * Method will block until a master is available. You can break from this
2030    * block by requesting the server stop.
2031    *
2032    * @return master + port, or null if server has been stopped
2033    */
2034   private ServerName getMaster() {
2035     ServerName masterServerName = null;
2036     long previousLogTime = 0;
2037     HMasterRegionInterface master = null;
2038     InetSocketAddress masterIsa = null;
2039     while (keepLooping() && master == null) {
2040       masterServerName = this.masterAddressManager.getMasterAddress();
2041       if (masterServerName == null) {
2042         if (!keepLooping()) {
2043           // give up with no connection.
2044           LOG.debug("No master found and cluster is stopped; bailing out");
2045           return null;
2046         }
2047         LOG.debug("No master found; retry");
2048         previousLogTime = System.currentTimeMillis();
2049 
2050         sleeper.sleep();
2051         continue;
2052       }
2053 
2054       masterIsa =
2055         new InetSocketAddress(masterServerName.getHostname(), masterServerName.getPort());
2056 
2057       LOG.info("Attempting connect to Master server at " + masterServerName);
2058       try {
2059         // Do initial RPC setup. The final argument indicates that the RPC
2060         // should retry indefinitely.
2061         master = HBaseRPC.waitForProxy(this.rpcEngine,
2062             HMasterRegionInterface.class, HMasterRegionInterface.VERSION,
2063             masterIsa, this.conf, -1,
2064             this.rpcTimeout, this.rpcTimeout);
2065       } catch (IOException e) {
2066         e = e instanceof RemoteException ?
2067             ((RemoteException)e).unwrapRemoteException() : e;
2068         if (e instanceof ServerNotRunningYetException) {
2069           if (System.currentTimeMillis() > (previousLogTime+1000)){
2070             LOG.info("Master isn't available yet, retrying");
2071             previousLogTime = System.currentTimeMillis();
2072           }
2073         } else {
2074           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2075             LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2076             previousLogTime = System.currentTimeMillis();
2077           }
2078         }
2079         try {
2080           Thread.sleep(200);
2081         } catch (InterruptedException ignored) {
2082         }
2083       }
2084     }
2085     LOG.info("Connected to master at " + masterIsa);
2086     this.hbaseMaster = master;
2087     return masterServerName;
2088   }
2089 
2090   /**
2091    * @return True if we should break loop because cluster is going down or
2092    * this server has been stopped or hdfs has gone bad.
2093    */
2094   private boolean keepLooping() {
2095     return !this.stopped && isClusterUp();
2096   }
2097 
2098   /*
2099    * Let the master know we're here Run initialization using parameters passed
2100    * us by the master.
2101    * @return A Map of key/value configurations we got from the Master else
2102    * null if we failed to register.
2103    * @throws IOException
2104    */
2105   private MapWritable reportForDuty() throws IOException {
2106     MapWritable result = null;
2107     ServerName masterServerName = getMaster();
2108     if (masterServerName == null) return result;
2109     try {
2110       this.requestCount.set(0);
2111       LOG.info("Telling master at " + masterServerName + " that we are up " +
2112         "with port=" + this.isa.getPort() + ", startcode=" + this.startcode);
2113       long now = EnvironmentEdgeManager.currentTimeMillis();
2114       int port = this.isa.getPort();
2115       result = this.hbaseMaster.regionServerStartup(port, this.startcode, now);
2116     } catch (RemoteException e) {
2117       IOException ioe = e.unwrapRemoteException();
2118       if (ioe instanceof ClockOutOfSyncException) {
2119         LOG.fatal("Master rejected startup because clock is out of sync", ioe);
2120         // Re-throw IOE will cause RS to abort
2121         throw ioe;
2122       } else {
2123         LOG.warn("remote error telling master we are up", e);
2124       }
2125     } catch (IOException e) {
2126       LOG.warn("error telling master we are up", e);
2127     }
2128     return result;
2129   }
2130 
2131   /**
2132    * Closes all regions.  Called on our way out.
2133    * Assumes that its not possible for new regions to be added to onlineRegions
2134    * while this method runs.
2135    */
2136   protected void closeAllRegions(final boolean abort) {
2137     closeUserRegions(abort);
2138     closeMetaTableRegions(abort);
2139   }
2140 
2141   /**
2142    * Close root and meta regions if we carry them
2143    * @param abort Whether we're running an abort.
2144    */
2145   void closeMetaTableRegions(final boolean abort) {
2146     HRegion meta = null;
2147     HRegion root = null;
2148     this.lock.writeLock().lock();
2149     try {
2150       for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) {
2151         HRegionInfo hri = e.getValue().getRegionInfo();
2152         if (hri.isRootRegion()) {
2153           root = e.getValue();
2154         } else if (hri.isMetaRegion()) {
2155           meta = e.getValue();
2156         }
2157         if (meta != null && root != null) break;
2158       }
2159     } finally {
2160       this.lock.writeLock().unlock();
2161     }
2162     if (meta != null) closeRegion(meta.getRegionInfo(), abort, false);
2163     if (root != null) closeRegion(root.getRegionInfo(), abort, false);
2164   }
2165 
2166   /**
2167    * Schedule closes on all user regions.
2168    * Should be safe calling multiple times because it wont' close regions
2169    * that are already closed or that are closing.
2170    * @param abort Whether we're running an abort.
2171    */
2172   void closeUserRegions(final boolean abort) {
2173     this.lock.writeLock().lock();
2174     try {
2175       for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
2176         HRegion r = e.getValue();
2177         if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) {
2178           // Don't update zk with this close transition; pass false.
2179           closeRegion(r.getRegionInfo(), abort, false);
2180         }
2181       }
2182     } finally {
2183       this.lock.writeLock().unlock();
2184     }
2185   }
2186 
2187   @Override
2188   @QosPriority(priority=HConstants.HIGH_QOS)
2189   public HRegionInfo getRegionInfo(final byte[] regionName)
2190   throws NotServingRegionException, IOException {
2191     checkOpen();
2192     requestCount.incrementAndGet();
2193     return getRegion(regionName).getRegionInfo();
2194   }
2195 
2196   public Result getClosestRowBefore(final byte[] regionName, final byte[] row,
2197       final byte[] family) throws IOException {
2198     checkOpen();
2199     requestCount.incrementAndGet();
2200     try {
2201       // locate the region we're operating on
2202       HRegion region = getRegion(regionName);
2203       // ask the region for all the data
2204 
2205       Result r = region.getClosestRowBefore(row, family);
2206       return r;
2207     } catch (Throwable t) {
2208       throw convertThrowableToIOE(cleanup(t));
2209     }
2210   }
2211 
2212   /** {@inheritDoc} */
2213   public Result get(byte[] regionName, Get get) throws IOException {
2214     checkOpen();
2215     requestCount.incrementAndGet();
2216     try {
2217       HRegion region = getRegion(regionName);
2218       return region.get(get, getLockFromId(get.getLockId()));
2219     } catch (Throwable t) {
2220       throw convertThrowableToIOE(cleanup(t));
2221     }
2222   }
2223 
2224   public boolean exists(byte[] regionName, Get get) throws IOException {
2225     checkOpen();
2226     requestCount.incrementAndGet();
2227     try {
2228       HRegion region = getRegion(regionName);
2229       Integer lock = getLockFromId(get.getLockId());
2230       if (region.getCoprocessorHost() != null) {
2231         Boolean result = region.getCoprocessorHost().preExists(get);
2232         if (result != null) {
2233           return result.booleanValue();
2234         }
2235       }
2236       Result r = region.get(get, lock);
2237       boolean result = r != null && !r.isEmpty();
2238       if (region.getCoprocessorHost() != null) {
2239         result = region.getCoprocessorHost().postExists(get, result);
2240       }
2241       return result;
2242     } catch (Throwable t) {
2243       throw convertThrowableToIOE(cleanup(t));
2244     }
2245   }
2246 
2247   public void put(final byte[] regionName, final Put put) throws IOException {
2248     if (put.getRow() == null) {
2249       throw new IllegalArgumentException("update has null row");
2250     }
2251 
2252     checkOpen();
2253     this.requestCount.incrementAndGet();
2254     HRegion region = getRegion(regionName);
2255     try {
2256       if (!region.getRegionInfo().isMetaTable()) {
2257         this.cacheFlusher.reclaimMemStoreMemory();
2258       }
2259       boolean writeToWAL = put.getWriteToWAL();
2260       region.put(put, getLockFromId(put.getLockId()), writeToWAL);
2261     } catch (Throwable t) {
2262       throw convertThrowableToIOE(cleanup(t));
2263     }
2264   }
2265 
2266   public int put(final byte[] regionName, final List<Put> puts)
2267       throws IOException {
2268     checkOpen();
2269     HRegion region = null;
2270     int i = 0;
2271 
2272     try {
2273       region = getRegion(regionName);
2274       if (!region.getRegionInfo().isMetaTable()) {
2275         this.cacheFlusher.reclaimMemStoreMemory();
2276       }
2277 
2278       @SuppressWarnings("unchecked")
2279       Pair<Mutation, Integer>[] putsWithLocks = new Pair[puts.size()];
2280 
2281       for (Put p : puts) {
2282         Integer lock = getLockFromId(p.getLockId());
2283         putsWithLocks[i++] = new Pair<Mutation, Integer>(p, lock);
2284       }
2285 
2286       this.requestCount.addAndGet(puts.size());
2287       OperationStatus codes[] = region.batchMutate(putsWithLocks);
2288       for (i = 0; i < codes.length; i++) {
2289         if (codes[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
2290           return i;
2291         }
2292       }
2293       return -1;
2294     } catch (Throwable t) {
2295       throw convertThrowableToIOE(cleanup(t));
2296     }
2297   }
2298 
2299   private boolean checkAndMutate(final byte[] regionName, final byte[] row,
2300       final byte[] family, final byte[] qualifier, final CompareOp compareOp,
2301       final WritableByteArrayComparable comparator, final Writable w,
2302       Integer lock) throws IOException {
2303     checkOpen();
2304     this.requestCount.incrementAndGet();
2305     HRegion region = getRegion(regionName);
2306     try {
2307       if (!region.getRegionInfo().isMetaTable()) {
2308         this.cacheFlusher.reclaimMemStoreMemory();
2309       }
2310       return region.checkAndMutate(row, family, qualifier, compareOp,
2311         comparator, w, lock, true);
2312     } catch (Throwable t) {
2313       throw convertThrowableToIOE(cleanup(t));
2314     }
2315   }
2316 
2317   /**
2318    *
2319    * @param regionName
2320    * @param row
2321    * @param family
2322    * @param qualifier
2323    * @param value
2324    *          the expected value
2325    * @param put
2326    * @throws IOException
2327    * @return true if the new put was execute, false otherwise
2328    */
2329   public boolean checkAndPut(final byte[] regionName, final byte[] row,
2330       final byte[] family, final byte[] qualifier, final byte[] value,
2331       final Put put) throws IOException {
2332     checkOpen();
2333     if (regionName == null) {
2334       throw new IOException("Invalid arguments to checkAndPut "
2335           + "regionName is null");
2336     }
2337     HRegion region = getRegion(regionName);
2338     Integer lock = getLockFromId(put.getLockId());
2339     WritableByteArrayComparable comparator = new BinaryComparator(value);
2340     if (region.getCoprocessorHost() != null) {
2341       Boolean result = region.getCoprocessorHost()
2342         .preCheckAndPut(row, family, qualifier, CompareOp.EQUAL, comparator,
2343           put);
2344       if (result != null) {
2345         return result.booleanValue();
2346       }
2347     }
2348     boolean result = checkAndMutate(regionName, row, family, qualifier,
2349         CompareOp.EQUAL, comparator, put,
2350       lock);
2351     if (region.getCoprocessorHost() != null) {
2352       result = region.getCoprocessorHost().postCheckAndPut(row, family,
2353         qualifier, CompareOp.EQUAL, comparator, put, result);
2354     }
2355     return result;
2356   }
2357 
2358   /**
2359    *
2360    * @param regionName
2361    * @param row
2362    * @param family
2363    * @param qualifier
2364    * @param compareOp
2365    * @param comparator
2366    * @param put
2367    * @throws IOException
2368    * @return true if the new put was execute, false otherwise
2369    */
2370   public boolean checkAndPut(final byte[] regionName, final byte[] row,
2371       final byte[] family, final byte[] qualifier, final CompareOp compareOp,
2372       final WritableByteArrayComparable comparator, final Put put)
2373        throws IOException {
2374     checkOpen();
2375     if (regionName == null) {
2376       throw new IOException("Invalid arguments to checkAndPut "
2377           + "regionName is null");
2378     }
2379     HRegion region = getRegion(regionName);
2380     Integer lock = getLockFromId(put.getLockId());
2381     if (region.getCoprocessorHost() != null) {
2382       Boolean result = region.getCoprocessorHost()
2383         .preCheckAndPut(row, family, qualifier, compareOp, comparator, put);
2384       if (result != null) {
2385         return result.booleanValue();
2386       }
2387     }
2388     boolean result = checkAndMutate(regionName, row, family, qualifier,
2389       compareOp, comparator, put, lock);
2390     if (region.getCoprocessorHost() != null) {
2391       result = region.getCoprocessorHost().postCheckAndPut(row, family,
2392         qualifier, compareOp, comparator, put, result);
2393     }
2394     return result;
2395   }
2396 
2397   /**
2398    *
2399    * @param regionName
2400    * @param row
2401    * @param family
2402    * @param qualifier
2403    * @param value
2404    *          the expected value
2405    * @param delete
2406    * @throws IOException
2407    * @return true if the new put was execute, false otherwise
2408    */
2409   public boolean checkAndDelete(final byte[] regionName, final byte[] row,
2410       final byte[] family, final byte[] qualifier, final byte[] value,
2411       final Delete delete) throws IOException {
2412     checkOpen();
2413 
2414     if (regionName == null) {
2415       throw new IOException("Invalid arguments to checkAndDelete "
2416           + "regionName is null");
2417     }
2418     HRegion region = getRegion(regionName);
2419     Integer lock = getLockFromId(delete.getLockId());
2420     WritableByteArrayComparable comparator = new BinaryComparator(value);
2421     if (region.getCoprocessorHost() != null) {
2422       Boolean result = region.getCoprocessorHost().preCheckAndDelete(row,
2423         family, qualifier, CompareOp.EQUAL, comparator, delete);
2424       if (result != null) {
2425         return result.booleanValue();
2426       }
2427     }
2428     boolean result = checkAndMutate(regionName, row, family, qualifier,
2429       CompareOp.EQUAL, comparator, delete, lock);
2430     if (region.getCoprocessorHost() != null) {
2431       result = region.getCoprocessorHost().postCheckAndDelete(row, family,
2432         qualifier, CompareOp.EQUAL, comparator, delete, result);
2433     }
2434     return result;
2435   }
2436 
2437   @Override
2438   public List<String> getStoreFileList(byte[] regionName, byte[] columnFamily)
2439     throws IllegalArgumentException {
2440     return getStoreFileList(regionName, new byte[][]{columnFamily});
2441   }
2442 
2443   @Override
2444   public List<String> getStoreFileList(byte[] regionName, byte[][] columnFamilies)
2445     throws IllegalArgumentException {
2446     HRegion region = getOnlineRegion(regionName);
2447     if (region == null) {
2448       throw new IllegalArgumentException("No region: " + new String(regionName)
2449           + " available");
2450     }
2451     return region.getStoreFileList(columnFamilies);
2452   }
2453 
2454   public List<String> getStoreFileList(byte[] regionName)
2455     throws IllegalArgumentException {
2456     HRegion region = getOnlineRegion(regionName);
2457     if (region == null) {
2458       throw new IllegalArgumentException("No region: " + new String(regionName)
2459           + " available");
2460     }
2461     Set<byte[]> columnFamilies = region.getStores().keySet();
2462     int nCF = columnFamilies.size();
2463     return region.getStoreFileList(columnFamilies.toArray(new byte[nCF][]));
2464   }
2465   
2466  /**
2467   * Flushes the given region
2468   */
2469   public void flushRegion(byte[] regionName)
2470     throws IllegalArgumentException, IOException {
2471     HRegion region = getOnlineRegion(regionName);
2472     if (region == null) {
2473       throw new IllegalArgumentException("No region : " + new String(regionName)
2474       + " available");
2475     }
2476     boolean needsCompaction = region.flushcache().isCompactionNeeded();
2477     if (needsCompaction) {
2478       this.compactSplitThread.requestCompaction(region, "Compaction through user triggered flush");
2479     }
2480   }
2481 
2482  /**
2483    * Flushes the given region if lastFlushTime < ifOlderThanTS
2484    */
2485    public void flushRegion(byte[] regionName, long ifOlderThanTS)
2486      throws IllegalArgumentException, IOException {
2487      HRegion region = getOnlineRegion(regionName);
2488      if (region == null) {
2489        throw new IllegalArgumentException("No region : " + new String(regionName)
2490        + " available");
2491      }
2492      if (region.getLastFlushTime() < ifOlderThanTS) {
2493       boolean needsCompaction = region.flushcache().isCompactionNeeded();
2494       if (needsCompaction) {
2495         this.compactSplitThread
2496             .requestCompaction(region, "Compaction through user triggered flush");
2497       }
2498     }
2499    }
2500 
2501   /**
2502    * Gets last flush time for the given region
2503    * @return the last flush time for a region
2504    */
2505   public long getLastFlushTime(byte[] regionName) {
2506     HRegion region = getOnlineRegion(regionName);
2507     if (region == null) {
2508       throw new IllegalArgumentException("No region : " + new String(regionName)
2509       + " available");
2510     }
2511     return region.getLastFlushTime();
2512   }
2513  
2514   /**
2515    *
2516    * @param regionName
2517    * @param row
2518    * @param family
2519    * @param qualifier
2520    * @param compareOp
2521    * @param comparator
2522    * @param delete
2523    * @throws IOException
2524    * @return true if the new put was execute, false otherwise
2525    */
2526   public boolean checkAndDelete(final byte[] regionName, final byte[] row,
2527       final byte[] family, final byte[] qualifier, final CompareOp compareOp,
2528       final WritableByteArrayComparable comparator, final Delete delete)
2529       throws IOException {
2530     checkOpen();
2531 
2532     if (regionName == null) {
2533       throw new IOException("Invalid arguments to checkAndDelete "
2534         + "regionName is null");
2535     }
2536     HRegion region = getRegion(regionName);
2537     Integer lock = getLockFromId(delete.getLockId());
2538     if (region.getCoprocessorHost() != null) {
2539       Boolean result = region.getCoprocessorHost().preCheckAndDelete(row,
2540         family, qualifier, compareOp, comparator, delete);
2541      if (result != null) {
2542        return result.booleanValue();
2543      }
2544     }
2545     boolean result = checkAndMutate(regionName, row, family, qualifier,
2546       compareOp, comparator, delete, lock);
2547    if (region.getCoprocessorHost() != null) {
2548      result = region.getCoprocessorHost().postCheckAndDelete(row, family,
2549        qualifier, compareOp, comparator, delete, result);
2550    }
2551    return result;
2552  }
2553 
2554   //
2555   // remote scanner interface
2556   //
2557   
2558   public long openScanner(byte[] regionName, Scan scan) throws IOException {
2559     RegionScanner s = internalOpenScanner(regionName, scan);
2560     long scannerId = addScanner(s);
2561     return scannerId;
2562   }
2563 
2564   private RegionScanner internalOpenScanner(byte[] regionName, Scan scan)
2565       throws IOException {
2566     checkOpen();
2567     NullPointerException npe = null;
2568     if (regionName == null) {
2569       npe = new NullPointerException("regionName is null");
2570     } else if (scan == null) {
2571       npe = new NullPointerException("scan is null");
2572     }
2573     if (npe != null) {
2574       throw new IOException("Invalid arguments to openScanner", npe);
2575     }
2576     requestCount.incrementAndGet();
2577     try {
2578       HRegion r = getRegion(regionName);
2579       r.checkRow(scan.getStartRow(), "Scan");
2580       scan.setLoadColumnFamiliesOnDemand(r.isLoadingCfsOnDemandDefault()
2581           || scan.doLoadColumnFamiliesOnDemand());
2582       r.prepareScanner(scan);
2583       RegionScanner s = null;
2584       if (r.getCoprocessorHost() != null) {
2585         s = r.getCoprocessorHost().preScannerOpen(scan);
2586       }
2587       if (s == null) {
2588         s = r.getScanner(scan);
2589       }
2590       if (r.getCoprocessorHost() != null) {
2591         RegionScanner savedScanner = r.getCoprocessorHost().postScannerOpen(
2592             scan, s);
2593         if (savedScanner == null) {
2594           LOG.warn("PostScannerOpen impl returning null. "
2595               + "Check the RegionObserver implementation.");
2596         } else {
2597           s = savedScanner;
2598         }
2599       }
2600       return s;
2601     } catch (Throwable t) {
2602       throw convertThrowableToIOE(cleanup(t, "Failed openScanner"));
2603     }
2604   }
2605 
2606   protected long addScanner(RegionScanner s) throws LeaseStillHeldException {
2607     long scannerId = -1L;
2608     scannerId = rand.nextLong();
2609     String scannerName = String.valueOf(scannerId);
2610     scanners.put(scannerName, new RegionScannerHolder(s));
2611     this.leases.createLease(scannerName, new ScannerListener(scannerName));
2612     return scannerId;
2613   }
2614 
2615   public Result next(final long scannerId) throws IOException {
2616     Result[] res = next(scannerId, 1);
2617     if (res == null || res.length == 0) {
2618       return null;
2619     }
2620     return res[0];
2621   }
2622 
2623    public Result[] next(final long scannerId, int nbRows) throws IOException {
2624     return next(scannerId, nbRows, -1);
2625   }
2626 
2627   public Result[] next(final long scannerId, int nbRows, long callSeq) throws IOException {
2628     String scannerName = String.valueOf(scannerId);
2629     RegionScannerHolder holder = this.scanners.get(scannerName);
2630     if (holder == null) {
2631       LOG.info("Client tried to access missing scanner " + scannerName);
2632       throw new UnknownScannerException("Name: " + scannerName);
2633     }
2634     // if callSeq does not match throw Exception straight away. This needs to be performed even
2635     // before checking of Lease.
2636     // Old next() APIs which do not take callSeq will pass it as -1 and for that no
2637     // need to match the callSeq from client and the one in server.
2638     if (callSeq != -1 && callSeq != holder.getCallSeq()) {
2639       throw new CallSequenceOutOfOrderException("Expected seq: " + holder.getCallSeq()
2640           + " But the seq got from client: " + callSeq);
2641      }
2642     // Increment the callSeq value which is the next expected from client.
2643     holder.incrCallSeq();
2644     return internalNext(holder.getScanner(), nbRows, scannerName);
2645   }
2646 
2647   private Result[] internalNext(final RegionScanner s, int nbRows,
2648       String scannerName) throws IOException {
2649     try {
2650       checkOpen();
2651     } catch (IOException e) {
2652       // If checkOpen failed, server not running or filesystem gone,
2653       // cancel this lease; filesystem is gone or we're closing or something.
2654       if (scannerName != null) {
2655         try {
2656           this.leases.cancelLease(scannerName);
2657         } catch (LeaseException le) {
2658           LOG.info("Server shutting down and client tried to access missing scanner "
2659               + scannerName);
2660         }
2661       }
2662       throw e;
2663     }
2664     Leases.Lease lease = null;
2665     try {
2666       // Remove lease while its being processed in server; protects against case
2667       // where processing of request takes > lease expiration time.
2668       try {
2669         if (scannerName != null) {
2670           lease = this.leases.removeLease(scannerName);
2671         }
2672       } catch (LeaseException le) {
2673         // What it really means is that there's no such scanner.
2674         LOG.info("Client tried to access missing scanner " + scannerName + " (no lease)");
2675         throw new UnknownScannerException("No lease for " + scannerName + ": " + le.getMessage());
2676       }
2677       List<Result> results = new ArrayList<Result>(nbRows);
2678       long currentScanResultSize = 0;
2679       List<KeyValue> values = new ArrayList<KeyValue>();
2680 
2681       // Call coprocessor. Get region info from scanner.
2682       HRegion region = getRegion(s.getRegionInfo().getRegionName());
2683       if (region != null && region.getCoprocessorHost() != null) {
2684         Boolean bypass = region.getCoprocessorHost().preScannerNext(s,
2685             results, nbRows);
2686         if (!results.isEmpty()) {
2687           for (Result r : results) {
2688             if (maxScannerResultSize < Long.MAX_VALUE){
2689               for (KeyValue kv : r.raw()) {
2690                 currentScanResultSize += kv.heapSize();
2691               }
2692             }
2693           }
2694         }
2695         if (bypass != null) {
2696           return s.isFilterDone() && results.isEmpty() ? null
2697               : results.toArray(new Result[results.size()]);
2698         }
2699       }
2700 
2701       MultiVersionConsistencyControl.setThreadReadPoint(s.getMvccReadPoint());
2702       region.startRegionOperation();
2703       try {
2704         int i = 0;
2705         synchronized(s) {
2706           for (; i < nbRows
2707               && currentScanResultSize < maxScannerResultSize; ) {
2708             // Collect values to be returned here
2709             boolean moreRows = s.nextRaw(values, SchemaMetrics.METRIC_NEXTSIZE);
2710             if (!values.isEmpty()) {
2711               if (maxScannerResultSize < Long.MAX_VALUE){
2712                 for (KeyValue kv : values) {
2713                   currentScanResultSize += kv.heapSize();
2714                 }
2715               }
2716               results.add(new Result(values));
2717               i++;
2718             }
2719             if (!moreRows) {
2720               break;
2721             }
2722             values.clear();
2723           }
2724         }
2725         requestCount.addAndGet(i);
2726         region.readRequestsCount.add(i);
2727         region.setOpMetricsReadRequestCount(region.readRequestsCount.get());
2728       } finally {
2729         region.closeRegionOperation();
2730       }
2731       // coprocessor postNext hook
2732       if (region != null && region.getCoprocessorHost() != null) {
2733         region.getCoprocessorHost().postScannerNext(s, results, nbRows, true);
2734       }
2735 
2736       // If the scanner's filter - if any - is done with the scan
2737       // and wants to tell the client to stop the scan. This is done by passing
2738       // a null result.
2739       return s.isFilterDone() && results.isEmpty() ? null
2740           : results.toArray(new Result[results.size()]);
2741     } catch (Throwable t) {
2742       if (t instanceof NotServingRegionException && scannerName != null) {
2743         this.scanners.remove(scannerName);
2744       }
2745       throw convertThrowableToIOE(cleanup(t));
2746     } finally {
2747       // We're done. On way out readd the above removed lease.  Adding resets
2748       // expiration time on lease.
2749       if (scannerName != null && this.scanners.containsKey(scannerName)) {
2750         if (lease != null) this.leases.addLease(lease);
2751       }
2752     }
2753   }
2754 
2755   public void close(final long scannerId) throws IOException {
2756     String scannerName = String.valueOf(scannerId);
2757     RegionScannerHolder holder = this.scanners.get(scannerName);
2758     if (holder == null) throw new UnknownScannerException("Name: " + scannerName);
2759     internalCloseScanner(holder.getScanner(), scannerName);
2760   }
2761 
2762   private void internalCloseScanner(final RegionScanner s, String scannerName)
2763       throws IOException {
2764     try {
2765       checkOpen();
2766       requestCount.incrementAndGet();
2767       HRegion region = null;
2768       if (s != null) {
2769         // call coprocessor.
2770         region = getRegion(s.getRegionInfo().getRegionName());
2771         if (region != null && region.getCoprocessorHost() != null) {
2772           if (region.getCoprocessorHost().preScannerClose(s)) {
2773             return; // bypass
2774           }
2775         }
2776       }
2777       RegionScanner toCloseScanner = s;
2778       if (scannerName != null) {
2779         RegionScannerHolder holder = scanners.remove(scannerName);
2780         if (holder!= null) {
2781           toCloseScanner = holder.getScanner();
2782         }
2783       }
2784       if (toCloseScanner != null) {
2785         toCloseScanner.close();
2786         if (scannerName != null) {
2787           this.leases.cancelLease(scannerName);
2788         }
2789 
2790         if (region != null && region.getCoprocessorHost() != null) {
2791           region.getCoprocessorHost().postScannerClose(toCloseScanner);
2792         }
2793       }
2794     } catch (Throwable t) {
2795       throw convertThrowableToIOE(cleanup(t));
2796     }
2797   }
2798 
2799   @Override
2800   public Result[] scan(byte[] regionName, Scan scan, int numberOfRows)
2801       throws IOException {
2802     RegionScanner s = internalOpenScanner(regionName, scan);
2803     try {
2804       Result[] results = internalNext(s, numberOfRows, null);
2805       return results;
2806     } finally {
2807       internalCloseScanner(s, null);
2808     }
2809   }
2810 
2811   /**
2812    * Instantiated as a scanner lease. If the lease times out, the scanner is
2813    * closed
2814    */
2815   private class ScannerListener implements LeaseListener {
2816     private final String scannerName;
2817 
2818     ScannerListener(final String n) {
2819       this.scannerName = n;
2820     }
2821 
2822     public void leaseExpired() {
2823       RegionScannerHolder holder = scanners.remove(this.scannerName);
2824       if (holder != null) {
2825         RegionScanner s = holder.getScanner();
2826         LOG.info("Scanner " + this.scannerName + " lease expired on region "
2827             + s.getRegionInfo().getRegionNameAsString());
2828         try {
2829           HRegion region = getRegion(s.getRegionInfo().getRegionName());
2830           if (region != null && region.getCoprocessorHost() != null) {
2831             region.getCoprocessorHost().preScannerClose(s);
2832           }
2833 
2834           s.close();
2835           if (region != null && region.getCoprocessorHost() != null) {
2836             region.getCoprocessorHost().postScannerClose(s);
2837           }
2838         } catch (IOException e) {
2839           LOG.error("Closing scanner for "
2840               + s.getRegionInfo().getRegionNameAsString(), e);
2841         }
2842       } else {
2843         LOG.info("Scanner " + this.scannerName + " lease expired");
2844       }
2845     }
2846   }
2847 
2848   //
2849   // Methods that do the actual work for the remote API
2850   //
2851   public void delete(final byte[] regionName, final Delete delete)
2852       throws IOException {
2853     checkOpen();
2854     try {
2855       boolean writeToWAL = delete.getWriteToWAL();
2856       this.requestCount.incrementAndGet();
2857       HRegion region = getRegion(regionName);
2858       if (!region.getRegionInfo().isMetaTable()) {
2859         this.cacheFlusher.reclaimMemStoreMemory();
2860       }
2861       Integer lid = getLockFromId(delete.getLockId());
2862       region.delete(delete, lid, writeToWAL);
2863     } catch (Throwable t) {
2864       throw convertThrowableToIOE(cleanup(t));
2865     }
2866   }
2867 
2868   public int delete(final byte[] regionName, final List<Delete> deletes)
2869       throws IOException {
2870     checkOpen();
2871     // Count of Deletes processed.
2872     int i = 0;
2873     HRegion region = null;
2874     try {
2875       region = getRegion(regionName);
2876       if (!region.getRegionInfo().isMetaTable()) {
2877         this.cacheFlusher.reclaimMemStoreMemory();
2878       }
2879       int size = deletes.size();
2880       Integer[] locks = new Integer[size];
2881       for (Delete delete : deletes) {
2882         this.requestCount.incrementAndGet();
2883         locks[i] = getLockFromId(delete.getLockId());
2884         region.delete(delete, locks[i], delete.getWriteToWAL());
2885         i++;
2886       }
2887     } catch (WrongRegionException ex) {
2888       LOG.debug("Batch deletes: " + i, ex);
2889       return i;
2890     } catch (NotServingRegionException ex) {
2891       return i;
2892     } catch (Throwable t) {
2893       throw convertThrowableToIOE(cleanup(t));
2894     }
2895     return -1;
2896   }
2897 
2898   /**
2899    * @deprecated {@link RowLock} and associated operations are deprecated.
2900    */
2901   public long lockRow(byte[] regionName, byte[] row) throws IOException {
2902     checkOpen();
2903     NullPointerException npe = null;
2904     if (regionName == null) {
2905       npe = new NullPointerException("regionName is null");
2906     } else if (row == null) {
2907       npe = new NullPointerException("row to lock is null");
2908     }
2909     if (npe != null) {
2910       IOException io = new IOException("Invalid arguments to lockRow");
2911       io.initCause(npe);
2912       throw io;
2913     }
2914     requestCount.incrementAndGet();
2915     try {
2916       HRegion region = getRegion(regionName);
2917       if (region.getCoprocessorHost() != null) {
2918         region.getCoprocessorHost().preLockRow(regionName, row);
2919       }
2920       Integer r = region.obtainRowLock(row);
2921       long lockId = addRowLock(r, region);
2922       LOG.debug("Row lock " + lockId + " explicitly acquired by client");
2923       return lockId;
2924     } catch (Throwable t) {
2925       throw convertThrowableToIOE(cleanup(t, "Error obtaining row lock (fsOk: "
2926           + this.fsOk + ")"));
2927     }
2928   }
2929 
2930   protected long addRowLock(Integer r, HRegion region)
2931       throws LeaseStillHeldException {
2932     long lockId = -1L;
2933     lockId = rand.nextLong();
2934     String lockName = String.valueOf(lockId);
2935     rowlocks.put(lockName, r);
2936     this.leases.createLease(lockName, new RowLockListener(lockName, region));
2937     return lockId;
2938   }
2939 
2940   /**
2941    * Method to get the Integer lock identifier used internally from the long
2942    * lock identifier used by the client.
2943    *
2944    * @param lockId
2945    *          long row lock identifier from client
2946    * @return intId Integer row lock used internally in HRegion
2947    * @throws IOException
2948    *           Thrown if this is not a valid client lock id.
2949    */
2950   Integer getLockFromId(long lockId) throws IOException {
2951     if (lockId == -1L) {
2952       return null;
2953     }
2954     String lockName = String.valueOf(lockId);
2955     Integer rl = rowlocks.get(lockName);
2956     if (rl == null) {
2957       throw new UnknownRowLockException("Invalid row lock");
2958     }
2959     this.leases.renewLease(lockName);
2960     return rl;
2961   }
2962 
2963   /**
2964    * @deprecated {@link RowLock} and associated operations are deprecated.
2965    */
2966   @Override
2967   @QosPriority(priority=HConstants.HIGH_QOS)
2968   public void unlockRow(byte[] regionName, long lockId) throws IOException {
2969     checkOpen();
2970     NullPointerException npe = null;
2971     if (regionName == null) {
2972       npe = new NullPointerException("regionName is null");
2973     } else if (lockId == -1L) {
2974       npe = new NullPointerException("lockId is null");
2975     }
2976     if (npe != null) {
2977       IOException io = new IOException("Invalid arguments to unlockRow");
2978       io.initCause(npe);
2979       throw io;
2980     }
2981     requestCount.incrementAndGet();
2982     try {
2983       HRegion region = getRegion(regionName);
2984       if (region.getCoprocessorHost() != null) {
2985         region.getCoprocessorHost().preUnLockRow(regionName, lockId);
2986       }
2987       String lockName = String.valueOf(lockId);
2988       Integer r = rowlocks.remove(lockName);
2989       if (r == null) {
2990         throw new UnknownRowLockException(lockName);
2991       }
2992       region.releaseRowLock(r);
2993       this.leases.cancelLease(lockName);
2994       LOG.debug("Row lock " + lockId
2995           + " has been explicitly released by client");
2996     } catch (Throwable t) {
2997       throw convertThrowableToIOE(cleanup(t));
2998     }
2999   }
3000 
3001   /**
3002    * Atomically bulk load several HFiles into an open region
3003    * @return true if successful, false is failed but recoverably (no action)
3004    * @throws IOException if failed unrecoverably
3005    */
3006   @Override
3007   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
3008       byte[] regionName) throws IOException {
3009     return bulkLoadHFiles(familyPaths, regionName, false);
3010   }
3011 
3012   /**
3013    * Atomically bulk load several HFiles into an open region
3014    * @return true if successful, false is failed but recoverably (no action)
3015    * @throws IOException if failed unrecoverably
3016    */
3017   @Override
3018   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
3019       byte[] regionName, boolean assignSeqNum) throws IOException {
3020     checkOpen();
3021     HRegion region = getRegion(regionName);
3022     boolean bypass = false;
3023     if (region.getCoprocessorHost() != null) {
3024       bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
3025     }
3026     boolean loaded = false;
3027     if (!bypass) {
3028       loaded = region.bulkLoadHFiles(familyPaths, assignSeqNum);
3029     }
3030     if (region.getCoprocessorHost() != null) {
3031       loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
3032     }
3033     return loaded;
3034   }
3035 
3036   Map<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
3037 
3038   /**
3039    * Instantiated as a row lock lease. If the lease times out, the row lock is
3040    * released
3041    */
3042   private class RowLockListener implements LeaseListener {
3043     private final String lockName;
3044     private final HRegion region;
3045 
3046     RowLockListener(final String lockName, final HRegion region) {
3047       this.lockName = lockName;
3048       this.region = region;
3049     }
3050 
3051     public void leaseExpired() {
3052       LOG.info("Row Lock " + this.lockName + " lease expired");
3053       Integer r = rowlocks.remove(this.lockName);
3054       if (r != null) {
3055         region.releaseRowLock(r);
3056       }
3057     }
3058   }
3059 
3060   // Region open/close direct RPCs
3061 
3062   @Override
3063   @QosPriority(priority=HConstants.HIGH_QOS)
3064   public RegionOpeningState openRegion(HRegionInfo region)
3065   throws IOException {
3066     return openRegion(region, -1);
3067   }
3068 
3069   @Override
3070   @QosPriority(priority = HConstants.HIGH_QOS)
3071   public RegionOpeningState openRegion(HRegionInfo region, int versionOfOfflineNode)
3072       throws IOException {
3073     return openRegion(region, versionOfOfflineNode, null);
3074   }
3075 
3076   private RegionOpeningState openRegion(HRegionInfo region, int versionOfOfflineNode,
3077       Map<String, HTableDescriptor> htds) throws IOException {
3078     checkOpen();
3079     HRegion onlineRegion = this.getFromOnlineRegions(region.getEncodedName());
3080     if (null != onlineRegion) {
3081       // See HBASE-5094. Cross check with META if still this RS is owning the
3082       // region.
3083       Pair<HRegionInfo, ServerName> p = MetaReader.getRegion(
3084           this.catalogTracker, region.getRegionName());
3085       if (this.getServerName().equals(p.getSecond())) {
3086         LOG.warn("Attempted open of " + region.getEncodedName()
3087             + " but already online on this server");
3088         return RegionOpeningState.ALREADY_OPENED;
3089       } else {
3090         LOG.warn("The region " + region.getEncodedName()
3091             + " is online on this server but META does not have this server.");
3092         this.removeFromOnlineRegions(region.getEncodedName());
3093       }
3094     }
3095     // Added to in-memory RS RIT that we are trying to open this region.
3096     // Clear it if we fail queuing an open executor.
3097     boolean isNewRit = addRegionsInTransition(region, OPEN);
3098     if (!isNewRit) {
3099       // An open is in progress. This is supported, but let's log this.
3100       LOG.info("Receiving OPEN for the region:" +
3101           region.getRegionNameAsString() + " , which we are already trying to OPEN" +
3102           " - ignoring this new request for this region.");
3103       return RegionOpeningState.OPENED;
3104     }
3105     try {
3106       LOG.info("Received request to open region: " +
3107         region.getRegionNameAsString());
3108       HTableDescriptor htd = null;
3109       if (htds == null) {
3110         htd = this.tableDescriptors.get(region.getTableName());
3111       } else {
3112         htd = htds.get(region.getTableNameAsString());
3113         if (htd == null) {
3114           htd = this.tableDescriptors.get(region.getTableName());
3115           htds.put(region.getTableNameAsString(), htd);
3116         }
3117       }
3118 
3119       // Mark the region as OPENING up in zk.  This is how we tell the master control of the
3120       // region has passed to this regionserver.
3121       int version = transitionZookeeperOfflineToOpening(region, versionOfOfflineNode);
3122       // Need to pass the expected version in the constructor.
3123       if (region.isRootRegion()) {
3124         this.service.submit(new OpenRootHandler(this, this, region, htd, version));
3125       } else if (region.isMetaRegion()) {
3126         this.service.submit(new OpenMetaHandler(this, this, region, htd, version));
3127       } else {
3128         this.service.submit(new OpenRegionHandler(this, this, region, htd, version));
3129       }
3130     } catch (IOException ie) {
3131       // Clear from this server's RIT list else will stick around for ever.
3132       removeFromRegionsInTransition(region);
3133       throw ie;
3134     }
3135     return RegionOpeningState.OPENED;
3136   }
3137 
3138   /**
3139    * Transition ZK node from OFFLINE to OPENING. The master will get a callback
3140    * and will know that the region is now ours.
3141    *
3142    * @param hri
3143    *          HRegionInfo whose znode we are updating
3144    * @param versionOfOfflineNode
3145    *          Version Of OfflineNode that needs to be compared before changing
3146    *          the node's state from OFFLINE
3147    * @throws IOException
3148    */
3149   int transitionZookeeperOfflineToOpening(final HRegionInfo hri, int versionOfOfflineNode)
3150       throws IOException {
3151     // TODO: should also handle transition from CLOSED?
3152     int version = -1;
3153     try {
3154       // Initialize the znode version.
3155       version = ZKAssign.transitionNode(this.zooKeeper, hri, this.getServerName(),
3156           EventType.M_ZK_REGION_OFFLINE, EventType.RS_ZK_REGION_OPENING, versionOfOfflineNode);
3157     } catch (KeeperException e) {
3158       LOG.error("Error transition from OFFLINE to OPENING for region=" + hri.getEncodedName(), e);
3159     }
3160     if (version == -1) {
3161       // TODO: Fix this sloppyness. The exception should be coming off zk
3162       // directly, not an
3163       // intepretation at this high-level (-1 when we call transitionNode can
3164       // mean many things).
3165       throw new IOException("Failed transition from OFFLINE to OPENING for region="
3166           + hri.getEncodedName());
3167     }
3168     return version;
3169   }
3170 
3171    /**
3172     * String currentAction) throws RegionAlreadyInTransitionException { Add
3173     * region to this regionservers list of in transitions regions ONLY if its not
3174     * already byte[] encodedName = region.getEncodedNameAsBytes(); in transition.
3175     * If a region already in RIT, we throw
3176     * {@link RegionAlreadyInTransitionException}. if
3177     * (this.regionsInTransitionInRS.containsKey(encodedName)) { Callers need to
3178     * call {@link #removeFromRegionsInTransition(HRegionInfo)} when done or if
3179     * boolean openAction = this.regionsInTransitionInRS.get(encodedName); error
3180     * processing.
3181     *
3182     * @param region
3183     *          Region to add
3184     * @param currentAction
3185     *          Whether OPEN or CLOSE.
3186     * @throws RegionAlreadyInTransitionException
3187     */
3188    protected boolean addRegionsInTransition(final HRegionInfo region, final String currentAction)
3189        throws RegionAlreadyInTransitionException {
3190      boolean isOpen = currentAction.equals(OPEN);
3191      Boolean action = this.regionsInTransitionInRS.putIfAbsent(
3192          region.getEncodedNameAsBytes(), isOpen);
3193      if (action == null) return true;
3194      if (isOpen && action.booleanValue()) {
3195        return false;
3196      }
3197      // The below exception message will be used in master.
3198      throw new RegionAlreadyInTransitionException("Received:" + currentAction
3199          + " for the region:" + region.getRegionNameAsString()
3200          + ", which we are already trying to " + (action ? OPEN : CLOSE) + ".");
3201    }
3202 
3203   @Override
3204   @QosPriority(priority=HConstants.HIGH_QOS)
3205   public void openRegions(List<HRegionInfo> regions)
3206   throws IOException {
3207     checkOpen();
3208     LOG.info("Received request to open " + regions.size() + " region(s)");
3209     Map<String, HTableDescriptor> htds = new HashMap<String, HTableDescriptor>(regions.size());
3210     for (HRegionInfo region : regions) openRegion(region, -1, htds);
3211   }
3212 
3213   @Override
3214   @QosPriority(priority=HConstants.HIGH_QOS)
3215   public boolean closeRegion(HRegionInfo region)
3216   throws IOException {
3217     return closeRegion(region, true, -1);
3218   }
3219 
3220   @Override
3221   @QosPriority(priority=HConstants.HIGH_QOS)
3222   public boolean closeRegion(final HRegionInfo region,
3223     final int versionOfClosingNode)
3224   throws IOException {
3225     return closeRegion(region, true, versionOfClosingNode);
3226   }
3227 
3228   @Override
3229   @QosPriority(priority=HConstants.HIGH_QOS)
3230   public boolean closeRegion(HRegionInfo region, final boolean zk)
3231   throws IOException {
3232     return closeRegion(region, zk, -1);
3233   }
3234 
3235   @QosPriority(priority=HConstants.HIGH_QOS)
3236   protected boolean closeRegion(HRegionInfo region, final boolean zk,
3237     final int versionOfClosingNode)
3238   throws IOException {
3239     checkOpen();
3240     //Check for permissions to close.
3241     HRegion actualRegion = this.getFromOnlineRegions(region.getEncodedName());
3242     if (actualRegion != null && actualRegion.getCoprocessorHost() != null) {
3243       actualRegion.getCoprocessorHost().preClose(false);
3244     }
3245     LOG.info("Received close region: " + region.getRegionNameAsString() +
3246       ". Version of ZK closing node:" + versionOfClosingNode);
3247     boolean hasit = this.onlineRegions.containsKey(region.getEncodedName());
3248     if (!hasit) {
3249       LOG.warn("Received close for region we are not serving; " +
3250         region.getEncodedName());
3251       throw new NotServingRegionException("Received close for "
3252         + region.getRegionNameAsString() + " but we are not serving it");
3253     }
3254     return closeRegion(region, false, zk, versionOfClosingNode);
3255   }
3256 
3257   @Override
3258   @QosPriority(priority=HConstants.HIGH_QOS)
3259   public boolean closeRegion(byte[] encodedRegionName, boolean zk)
3260     throws IOException {
3261     return closeRegion(encodedRegionName, false, zk);
3262   }
3263 
3264   /**
3265    * @param region Region to close
3266    * @param abort True if we are aborting
3267    * @param zk True if we are to update zk about the region close; if the close
3268    * was orchestrated by master, then update zk.  If the close is being run by
3269    * the regionserver because its going down, don't update zk.
3270    * @return True if closed a region.
3271    */
3272   protected boolean closeRegion(HRegionInfo region, final boolean abort,
3273       final boolean zk) {
3274     return closeRegion(region, abort, zk, -1);
3275   }
3276 
3277 
3278   /**
3279    * @param region Region to close
3280    * @param abort True if we are aborting
3281    * @param zk True if we are to update zk about the region close; if the close
3282    * was orchestrated by master, then update zk.  If the close is being run by
3283    * the regionserver because its going down, don't update zk.
3284    * @param versionOfClosingNode
3285    *   the version of znode to compare when RS transitions the znode from
3286    *   CLOSING state.
3287    * @return True if closed a region.
3288    */
3289   protected boolean closeRegion(HRegionInfo region, final boolean abort,
3290       final boolean zk, final int versionOfClosingNode) {
3291     
3292     HRegion actualRegion = this.getFromOnlineRegions(region.getEncodedName());
3293     if (actualRegion != null && actualRegion.getCoprocessorHost() != null) {
3294       try {
3295         actualRegion.getCoprocessorHost().preClose(abort);
3296       } catch (IOException e) {
3297         LOG.warn(e);
3298         return false;
3299       }
3300     }
3301     try {
3302       addRegionsInTransition(region, CLOSE);
3303     } catch (RegionAlreadyInTransitionException rate) {
3304       LOG.warn("Received close for region we are already opening or closing; "
3305           + region.getEncodedName());
3306       return false;
3307     }
3308     boolean success = false;
3309     try {
3310       CloseRegionHandler crh = null;
3311       if (region.isRootRegion()) {
3312         crh = new CloseRootHandler(this, this, region, abort, zk, versionOfClosingNode);
3313       } else if (region.isMetaRegion()) {
3314         crh = new CloseMetaHandler(this, this, region, abort, zk, versionOfClosingNode);
3315       } else {
3316         crh = new CloseRegionHandler(this, this, region, abort, zk, versionOfClosingNode);
3317       }
3318       this.service.submit(crh);
3319       success = true;
3320     } finally {
3321       // Remove from this server's RIT.
3322       if (!success) removeFromRegionsInTransition(region);
3323     }
3324     return true;
3325   }
3326 
3327   /**
3328    * @param encodedRegionName
3329    *          encodedregionName to close
3330    * @param abort
3331    *          True if we are aborting
3332    * @param zk
3333    *          True if we are to update zk about the region close; if the close
3334    *          was orchestrated by master, then update zk. If the close is being
3335    *          run by the regionserver because its going down, don't update zk.
3336    * @return True if closed a region.
3337    */
3338   protected boolean closeRegion(byte[] encodedRegionName, final boolean abort,
3339       final boolean zk) throws IOException {
3340     String encodedRegionNameStr = Bytes.toString(encodedRegionName);
3341     HRegion region = this.getFromOnlineRegions(encodedRegionNameStr);
3342     if (null != region) {
3343       return closeRegion(region.getRegionInfo(), abort, zk);
3344     }
3345     LOG.error("The specified region name" + encodedRegionNameStr
3346         + " does not exist to close the region.");
3347     return false;
3348   }
3349 
3350   // Manual remote region administration RPCs
3351 
3352   @Override
3353   @QosPriority(priority=HConstants.HIGH_QOS)
3354   public void flushRegion(HRegionInfo regionInfo)
3355       throws NotServingRegionException, IOException {
3356     checkOpen();
3357     LOG.info("Flushing " + regionInfo.getRegionNameAsString());
3358     HRegion region = getRegion(regionInfo.getRegionName());
3359     boolean needsCompaction = region.flushcache().isCompactionNeeded();
3360     if (needsCompaction) {
3361       this.compactSplitThread.requestCompaction(region, "Compaction through user triggered flush");
3362     }
3363   }
3364 
3365   @Override
3366   @QosPriority(priority=HConstants.HIGH_QOS)
3367   public void splitRegion(HRegionInfo regionInfo)
3368       throws NotServingRegionException, IOException {
3369     splitRegion(regionInfo, null);
3370   }
3371 
3372   @Override
3373   public void splitRegion(HRegionInfo regionInfo, byte[] splitPoint)
3374       throws NotServingRegionException, IOException {
3375     checkOpen();
3376     HRegion region = getRegion(regionInfo.getRegionName());
3377     region.flushcache();
3378     region.forceSplit(splitPoint);
3379     compactSplitThread.requestSplit(region, region.checkSplit());
3380   }
3381 
3382   @Override
3383   @QosPriority(priority=HConstants.HIGH_QOS)
3384   public void compactRegion(HRegionInfo regionInfo, boolean major)
3385       throws NotServingRegionException, IOException {
3386     compactRegion(regionInfo, major, null);
3387   }
3388 
3389   @Override
3390   @QosPriority(priority=HConstants.HIGH_QOS)
3391   public void compactRegion(HRegionInfo regionInfo, boolean major,  byte[] family)
3392       throws NotServingRegionException, IOException {
3393     checkOpen();
3394     HRegion region = getRegion(regionInfo.getRegionName());
3395     Store store = null;
3396     if (family != null) {
3397       store = region.getStore(family);
3398       if (store == null) {
3399         throw new IOException("column family " + Bytes.toString(family) +
3400           " does not exist in region " + new String(region.getRegionNameAsString()));
3401       }
3402     }
3403 
3404     if (major) {
3405       if (family != null) {
3406         store.triggerMajorCompaction();
3407       } else {
3408         region.triggerMajorCompaction();
3409       }
3410     }
3411     String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):"";
3412     LOG.trace("User-triggered compaction requested for region " +
3413       region.getRegionNameAsString() + familyLogMsg);
3414     String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
3415     if (family != null) {
3416       compactSplitThread.requestCompaction(region, store, log,
3417         Store.PRIORITY_USER, null);
3418     } else {
3419       compactSplitThread.requestCompaction(region, log,
3420         Store.PRIORITY_USER, null);
3421     }
3422   }
3423 
3424   /** @return the info server */
3425   public InfoServer getInfoServer() {
3426     return infoServer;
3427   }
3428 
3429   /**
3430    * @return true if a stop has been requested.
3431    */
3432   public boolean isStopped() {
3433     return this.stopped;
3434   }
3435 
3436   @Override
3437   public boolean isStopping() {
3438     return this.stopping;
3439   }
3440 
3441   /**
3442    *
3443    * @return the configuration
3444    */
3445   public Configuration getConfiguration() {
3446     return conf;
3447   }
3448 
3449   public CacheConfig getCacheConfig() {
3450     return cacheConfig;
3451   }
3452 
3453   /** @return the write lock for the server */
3454   ReentrantReadWriteLock.WriteLock getWriteLock() {
3455     return lock.writeLock();
3456   }
3457 
3458   @Override
3459   @QosPriority(priority=HConstants.HIGH_QOS)
3460   public List<HRegionInfo> getOnlineRegions() throws IOException {
3461     checkOpen();
3462     List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
3463     for (Map.Entry<String,HRegion> e: this.onlineRegions.entrySet()) {
3464       list.add(e.getValue().getRegionInfo());
3465     }
3466     Collections.sort(list);
3467     return list;
3468   }
3469 
3470   public int getNumberOfOnlineRegions() {
3471     return this.onlineRegions.size();
3472   }
3473 
3474   boolean isOnlineRegionsEmpty() {
3475     return this.onlineRegions.isEmpty();
3476   }
3477 
3478   /**
3479    * @param encodedRegionName
3480    * @return JSON Map of labels to values for passed in <code>encodedRegionName</code>
3481    * @throws IOException
3482    */
3483   public byte [] getRegionStats(final String encodedRegionName)
3484   throws IOException {
3485     HRegion r = null;
3486     synchronized (this.onlineRegions) {
3487       r = this.onlineRegions.get(encodedRegionName);
3488     }
3489     if (r == null) return null;
3490     ObjectMapper mapper = new ObjectMapper();
3491     int stores = 0;
3492     int storefiles = 0;
3493     int storefileSizeMB = 0;
3494     int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
3495     int storefileIndexSizeMB = 0;
3496     long totalCompactingKVs = 0;
3497     long currentCompactedKVs = 0;
3498     synchronized (r.stores) {
3499       stores += r.stores.size();
3500       for (Store store : r.stores.values()) {
3501         storefiles += store.getStorefilesCount();
3502         storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
3503         storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
3504       }
3505     }
3506     Map<String, Integer> map = new TreeMap<String, Integer>();
3507     map.put("stores", stores);
3508     map.put("storefiles", storefiles);
3509     map.put("storefileSizeMB", storefileIndexSizeMB);
3510     map.put("memstoreSizeMB", memstoreSizeMB);
3511     StringWriter w = new StringWriter();
3512     mapper.writeValue(w, map);
3513     w.close();
3514     return Bytes.toBytes(w.toString());
3515   }
3516 
3517   /**
3518    * For tests and web ui.
3519    * This method will only work if HRegionServer is in the same JVM as client;
3520    * HRegion cannot be serialized to cross an rpc.
3521    * @see #getOnlineRegions()
3522    */
3523   public Collection<HRegion> getOnlineRegionsLocalContext() {
3524     Collection<HRegion> regions = this.onlineRegions.values();
3525     return Collections.unmodifiableCollection(regions);
3526   }
3527 
3528   @Override
3529   public void addToOnlineRegions(HRegion region) {
3530     this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
3531   }
3532 
3533   @Override
3534   public boolean removeFromOnlineRegions(final String encodedName) {
3535     HRegion toReturn = null;
3536     toReturn = this.onlineRegions.remove(encodedName);
3537     
3538     //Clear all of the dynamic metrics as they are now probably useless.
3539     //This is a clear because dynamic metrics could include metrics per cf and
3540     //per hfile.  Figuring out which cfs, hfiles, and regions are still relevant to
3541     //this region server would be an onerous task.  Instead just clear everything
3542     //and on the next tick of the metrics everything that is still relevant will be
3543     //re-added.
3544     this.dynamicMetrics.clear();
3545     return toReturn != null;
3546   }
3547 
3548   /**
3549    * @return A new Map of online regions sorted by region size with the first
3550    *         entry being the biggest.
3551    */
3552   public SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() {
3553     // we'll sort the regions in reverse
3554     SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
3555         new Comparator<Long>() {
3556           public int compare(Long a, Long b) {
3557             return -1 * a.compareTo(b);
3558           }
3559         });
3560     // Copy over all regions. Regions are sorted by size with biggest first.
3561     for (HRegion region : this.onlineRegions.values()) {
3562       sortedRegions.put(Long.valueOf(region.memstoreSize.get()), region);
3563     }
3564     return sortedRegions;
3565   }
3566 
3567   @Override
3568   public HRegion getFromOnlineRegions(final String encodedRegionName) {
3569     HRegion r = null;
3570     r = this.onlineRegions.get(encodedRegionName);
3571     return r;
3572   }
3573 
3574   /**
3575    * @param regionName
3576    * @return HRegion for the passed binary <code>regionName</code> or null if
3577    *         named region is not member of the online regions.
3578    */
3579   public HRegion getOnlineRegion(final byte[] regionName) {
3580     return getFromOnlineRegions(HRegionInfo.encodeRegionName(regionName));
3581   }
3582 
3583   /** @return the request count */
3584   public AtomicInteger getRequestCount() {
3585     return this.requestCount;
3586   }
3587 
3588   /**
3589    * @return time stamp in millis of when this region server was started
3590    */
3591   public long getStartcode() {
3592     return this.startcode;
3593   }
3594 
3595   /** @return reference to FlushRequester */
3596   public FlushRequester getFlushRequester() {
3597     return this.cacheFlusher;
3598   }
3599 
3600   /**
3601    * Protected utility method for safely obtaining an HRegion handle.
3602    *
3603    * @param regionName
3604    *          Name of online {@link HRegion} to return
3605    * @return {@link HRegion} for <code>regionName</code>
3606    * @throws NotServingRegionException
3607    */
3608   protected HRegion getRegion(final byte[] regionName)
3609       throws NotServingRegionException {
3610     HRegion region = null;
3611     region = getOnlineRegion(regionName);
3612     if (region == null) {
3613       throw new NotServingRegionException("Region is not online: " +
3614         Bytes.toStringBinary(regionName));
3615     }
3616     return region;
3617   }
3618 
3619   /**
3620    * Get the top N most loaded regions this server is serving so we can tell the
3621    * master which regions it can reallocate if we're overloaded. TODO: actually
3622    * calculate which regions are most loaded. (Right now, we're just grabbing
3623    * the first N regions being served regardless of load.)
3624    */
3625   protected HRegionInfo[] getMostLoadedRegions() {
3626     ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
3627     for (HRegion r : onlineRegions.values()) {
3628       if (!r.isAvailable()) {
3629         continue;
3630       }
3631       if (regions.size() < numRegionsToReport) {
3632         regions.add(r.getRegionInfo());
3633       } else {
3634         break;
3635       }
3636     }
3637     return regions.toArray(new HRegionInfo[regions.size()]);
3638   }
3639 
3640   /**
3641    * Called to verify that this server is up and running.
3642    *
3643    * @throws IOException
3644    */
3645   protected void checkOpen() throws IOException {
3646     if (this.stopped || this.abortRequested) {
3647       throw new RegionServerStoppedException("Server " + getServerName() +
3648         " not running" + (this.abortRequested ? ", aborting" : ""));
3649     }
3650     if (!fsOk) {
3651       throw new RegionServerStoppedException("File system not available");
3652     }
3653   }
3654 
3655   @Override
3656   @QosPriority(priority=HConstants.HIGH_QOS)
3657   public ProtocolSignature getProtocolSignature(
3658       String protocol, long version, int clientMethodsHashCode)
3659   throws IOException {
3660     if (protocol.equals(HRegionInterface.class.getName())) {
3661       return new ProtocolSignature(HRegionInterface.VERSION, null);
3662     }
3663     throw new IOException("Unknown protocol: " + protocol);
3664   }
3665 
3666   @Override
3667   @QosPriority(priority=HConstants.HIGH_QOS)
3668   public long getProtocolVersion(final String protocol, final long clientVersion)
3669   throws IOException {
3670     if (protocol.equals(HRegionInterface.class.getName())) {
3671       return HRegionInterface.VERSION;
3672     }
3673     throw new IOException("Unknown protocol: " + protocol);
3674   }
3675 
3676   @Override
3677   public Leases getLeases() {
3678     return leases;
3679   }
3680 
3681   /**
3682    * @return Return the rootDir.
3683    */
3684   protected Path getRootDir() {
3685     return rootDir;
3686   }
3687 
3688   /**
3689    * @return Return the fs.
3690    */
3691   public FileSystem getFileSystem() {
3692     return fs;
3693   }
3694 
3695   /**
3696    * @return This servers {@link HServerInfo}
3697    */
3698   // TODO: Deprecate and do getServerName instead.
3699   public HServerInfo getServerInfo() {
3700     try {
3701       return getHServerInfo();
3702     } catch (IOException e) {
3703       e.printStackTrace();
3704     }
3705     return null;
3706   }
3707 
3708   @Override
3709   public void mutateRow(byte[] regionName, RowMutations rm)
3710       throws IOException {
3711     checkOpen();
3712     if (regionName == null) {
3713       throw new IOException("Invalid arguments to mutateRow " +
3714       "regionName is null");
3715     }
3716     requestCount.incrementAndGet();
3717     try {
3718       HRegion region = getRegion(regionName);
3719       if (!region.getRegionInfo().isMetaTable()) {
3720         this.cacheFlusher.reclaimMemStoreMemory();
3721       }
3722       region.mutateRow(rm);
3723     } catch (IOException e) {
3724       checkFileSystem();
3725       throw e;
3726     }
3727   }
3728 
3729   @Override
3730   public Result append(byte[] regionName, Append append)
3731   throws IOException {
3732     checkOpen();
3733     if (regionName == null) {
3734       throw new IOException("Invalid arguments to increment " +
3735       "regionName is null");
3736     }
3737     requestCount.incrementAndGet();
3738     try {
3739       HRegion region = getRegion(regionName);
3740       Integer lock = getLockFromId(append.getLockId());
3741       Append appVal = append;
3742       Result resVal;
3743       if (region.getCoprocessorHost() != null) {
3744         resVal = region.getCoprocessorHost().preAppend(appVal);
3745         if (resVal != null) {
3746           return resVal;
3747         }
3748       }
3749       resVal = region.append(appVal, lock, append.getWriteToWAL());
3750       if (region.getCoprocessorHost() != null) {
3751         region.getCoprocessorHost().postAppend(appVal, resVal);
3752       }
3753       return resVal;
3754     } catch (IOException e) {
3755       checkFileSystem();
3756       throw e;
3757     }
3758   }
3759 
3760   @Override
3761   public Result increment(byte[] regionName, Increment increment)
3762   throws IOException {
3763     checkOpen();
3764     if (regionName == null) {
3765       throw new IOException("Invalid arguments to increment " +
3766       "regionName is null");
3767     }
3768     requestCount.incrementAndGet();
3769     try {
3770       HRegion region = getRegion(regionName);
3771       Integer lock = getLockFromId(increment.getLockId());
3772       Increment incVal = increment;
3773       Result resVal;
3774       if (region.getCoprocessorHost() != null) {
3775         resVal = region.getCoprocessorHost().preIncrement(incVal);
3776         if (resVal != null) {
3777           return resVal;
3778         }
3779       }
3780       resVal = region.increment(incVal, lock,
3781           increment.getWriteToWAL());
3782       if (region.getCoprocessorHost() != null) {
3783         resVal = region.getCoprocessorHost().postIncrement(incVal, resVal);
3784       }
3785       return resVal;
3786     } catch (IOException e) {
3787       checkFileSystem();
3788       throw e;
3789     }
3790   }
3791 
3792   /** {@inheritDoc} */
3793   public long incrementColumnValue(byte[] regionName, byte[] row,
3794       byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
3795       throws IOException {
3796     checkOpen();
3797 
3798     if (regionName == null) {
3799       throw new IOException("Invalid arguments to incrementColumnValue "
3800           + "regionName is null");
3801     }
3802     requestCount.incrementAndGet();
3803     try {
3804       HRegion region = getRegion(regionName);
3805       if (region.getCoprocessorHost() != null) {
3806         Long amountVal = region.getCoprocessorHost().preIncrementColumnValue(row,
3807           family, qualifier, amount, writeToWAL);
3808         if (amountVal != null) {
3809           return amountVal.longValue();
3810         }
3811       }
3812       long retval = region.incrementColumnValue(row, family, qualifier, amount,
3813         writeToWAL);
3814       if (region.getCoprocessorHost() != null) {
3815         retval = region.getCoprocessorHost().postIncrementColumnValue(row,
3816           family, qualifier, amount, writeToWAL, retval);
3817       }
3818       return retval;
3819     } catch (IOException e) {
3820       checkFileSystem();
3821       throw e;
3822     }
3823   }
3824 
3825   /** {@inheritDoc}
3826    * @deprecated Use {@link #getServerName()} instead.
3827    */
3828   @Override
3829   @QosPriority(priority=HConstants.HIGH_QOS)
3830   public HServerInfo getHServerInfo() throws IOException {
3831     checkOpen();
3832     return new HServerInfo(new HServerAddress(this.isa),
3833       this.startcode, this.rsInfo.getInfoPort());
3834   }
3835 
3836   @SuppressWarnings("unchecked")
3837   @Override
3838   public <R> MultiResponse multi(MultiAction<R> multi) throws IOException {
3839     checkOpen();
3840     MultiResponse response = new MultiResponse();
3841     for (Map.Entry<byte[], List<Action<R>>> e : multi.actions.entrySet()) {
3842       byte[] regionName = e.getKey();
3843       List<Action<R>> actionsForRegion = e.getValue();
3844       // sort based on the row id - this helps in the case where we reach the
3845       // end of a region, so that we don't have to try the rest of the
3846       // actions in the list.
3847       Collections.sort(actionsForRegion);
3848       Row action;
3849       List<Action<R>> mutations = new ArrayList<Action<R>>();
3850       for (Action<R> a : actionsForRegion) {
3851         action = a.getAction();
3852         int originalIndex = a.getOriginalIndex();
3853 
3854         try {
3855           if (action instanceof Delete || action instanceof Put) {
3856             mutations.add(a); 
3857           } else if (action instanceof Get) {
3858             response.add(regionName, originalIndex,
3859                 get(regionName, (Get)action));
3860           } else if (action instanceof Exec) {
3861             ExecResult result = execCoprocessor(regionName, (Exec)action);
3862             response.add(regionName, new Pair<Integer, Object>(
3863                 a.getOriginalIndex(), result.getValue()
3864             ));
3865           } else if (action instanceof Increment) {
3866             response.add(regionName, originalIndex,
3867                 increment(regionName, (Increment)action));
3868           } else if (action instanceof Append) {
3869             response.add(regionName, originalIndex,
3870                 append(regionName, (Append)action));
3871           } else if (action instanceof RowMutations) {
3872             mutateRow(regionName, (RowMutations)action);
3873             response.add(regionName, originalIndex, new Result());
3874           } else {
3875             LOG.debug("Error: invalid Action, row must be a Get, Delete, " +
3876                 "Put, Exec, Increment, or Append.");
3877             throw new DoNotRetryIOException("Invalid Action, row must be a " +
3878                 "Get, Delete, Put, Exec, Increment, or Append.");
3879           }
3880         } catch (IOException ex) {
3881           response.add(regionName, originalIndex, ex);
3882         }
3883       }
3884 
3885       // We do the puts with result.put so we can get the batching efficiency
3886       // we so need. All this data munging doesn't seem great, but at least
3887       // we arent copying bytes or anything.
3888       if (!mutations.isEmpty()) {
3889         try {
3890           HRegion region = getRegion(regionName);
3891 
3892           if (!region.getRegionInfo().isMetaTable()) {
3893             this.cacheFlusher.reclaimMemStoreMemory();
3894           }
3895 
3896           List<Pair<Mutation,Integer>> mutationsWithLocks =
3897               Lists.newArrayListWithCapacity(mutations.size());
3898           for (Action<R> a : mutations) {
3899             Mutation m = (Mutation) a.getAction();
3900 
3901             Integer lock;
3902             try {
3903               lock = getLockFromId(m.getLockId());
3904             } catch (UnknownRowLockException ex) {
3905               response.add(regionName, a.getOriginalIndex(), ex);
3906               continue;
3907             }
3908             mutationsWithLocks.add(new Pair<Mutation, Integer>(m, lock));
3909           }
3910 
3911           this.requestCount.addAndGet(mutations.size());
3912 
3913           OperationStatus[] codes =
3914               region.batchMutate(mutationsWithLocks.toArray(new Pair[]{}));
3915 
3916           for( int i = 0 ; i < codes.length ; i++) {
3917             OperationStatus code = codes[i];
3918 
3919             Action<R> theAction = mutations.get(i);
3920             Object result = null;
3921 
3922             if (code.getOperationStatusCode() == OperationStatusCode.SUCCESS) {
3923               result = new Result();
3924             } else if (code.getOperationStatusCode()
3925                 == OperationStatusCode.SANITY_CHECK_FAILURE) {
3926               // Don't send a FailedSanityCheckException as older clients will not know about
3927               // that class being a subclass of DoNotRetryIOException
3928               // and will retry mutations that will never succeed.
3929               result = new DoNotRetryIOException(code.getExceptionMsg());
3930             } else if (code.getOperationStatusCode() == OperationStatusCode.BAD_FAMILY) {
3931               result = new NoSuchColumnFamilyException(code.getExceptionMsg());
3932             }
3933             // FAILURE && NOT_RUN becomes null, aka: need to run again.
3934 
3935             response.add(regionName, theAction.getOriginalIndex(), result);
3936           }
3937         } catch (IOException ioe) {
3938           // fail all the puts with the ioe in question.
3939           for (Action<R> a: mutations) {
3940             response.add(regionName, a.getOriginalIndex(), ioe);
3941           }
3942         }
3943       }
3944     }
3945     return response;
3946   }
3947 
3948   /**
3949    * Executes a single {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol}
3950    * method using the registered protocol handlers.
3951    * {@link CoprocessorProtocol} implementations must be registered per-region
3952    * via the
3953    * {@link org.apache.hadoop.hbase.regionserver.HRegion#registerProtocol(Class, org.apache.hadoop.hbase.ipc.CoprocessorProtocol)}
3954    * method before they are available.
3955    *
3956    * @param regionName name of the region against which the invocation is executed
3957    * @param call an {@code Exec} instance identifying the protocol, method name,
3958    *     and parameters for the method invocation
3959    * @return an {@code ExecResult} instance containing the region name of the
3960    *     invocation and the return value
3961    * @throws IOException if no registered protocol handler is found or an error
3962    *     occurs during the invocation
3963    * @see org.apache.hadoop.hbase.regionserver.HRegion#registerProtocol(Class, org.apache.hadoop.hbase.ipc.CoprocessorProtocol)
3964    */
3965   @Override
3966   public ExecResult execCoprocessor(byte[] regionName, Exec call)
3967       throws IOException {
3968     checkOpen();
3969     requestCount.incrementAndGet();
3970     try {
3971       HRegion region = getRegion(regionName);
3972       return region.exec(call);
3973     } catch (Throwable t) {
3974       throw convertThrowableToIOE(cleanup(t));
3975     }
3976   }
3977 
3978   public String toString() {
3979     return getServerName().toString();
3980   }
3981 
3982   /**
3983    * Interval at which threads should run
3984    *
3985    * @return the interval
3986    */
3987   public int getThreadWakeFrequency() {
3988     return threadWakeFrequency;
3989   }
3990 
3991   @Override
3992   public ZooKeeperWatcher getZooKeeper() {
3993     return zooKeeper;
3994   }
3995 
3996   @Override
3997   public ServerName getServerName() {
3998     // Our servername could change after we talk to the master.
3999     return this.serverNameFromMasterPOV == null?
4000       new ServerName(this.isa.getHostName(), this.isa.getPort(), this.startcode):
4001         this.serverNameFromMasterPOV;
4002   }
4003 
4004   @Override
4005   public CompactionRequestor getCompactionRequester() {
4006     return this.compactSplitThread;
4007   }
4008 
4009   public ZooKeeperWatcher getZooKeeperWatcher() {
4010     return this.zooKeeper;
4011   }
4012 
4013   public RegionServerCoprocessorHost getCoprocessorHost(){
4014     return this.rsHost;
4015   }
4016 
4017   @Override
4018   public boolean removeFromRegionsInTransition(final HRegionInfo hri) {
4019     Boolean res = this.regionsInTransitionInRS.remove(hri.getEncodedNameAsBytes());
4020     return res != null && res.booleanValue();
4021   }
4022 
4023   @Override
4024   public boolean containsKeyInRegionsInTransition(final HRegionInfo hri) {
4025     return this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes());
4026   }
4027 
4028   public ExecutorService getExecutorService() {
4029     return service;
4030   }
4031 
4032   //
4033   // Main program and support routines
4034   //
4035 
4036   /**
4037    * Load the replication service objects, if any
4038    */
4039   static private void createNewReplicationInstance(Configuration conf,
4040     HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
4041 
4042     // If replication is not enabled, then return immediately.
4043     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
4044       return;
4045     }
4046 
4047     // read in the name of the source replication class from the config file.
4048     String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
4049                                HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
4050 
4051     // read in the name of the sink replication class from the config file.
4052     String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
4053                              HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
4054 
4055     // If both the sink and the source class names are the same, then instantiate
4056     // only one object.
4057     if (sourceClassname.equals(sinkClassname)) {
4058       server.replicationSourceHandler = (ReplicationSourceService)
4059                                          newReplicationInstance(sourceClassname,
4060                                          conf, server, fs, logDir, oldLogDir);
4061       server.replicationSinkHandler = (ReplicationSinkService)
4062                                          server.replicationSourceHandler;
4063     }
4064     else {
4065       server.replicationSourceHandler = (ReplicationSourceService)
4066                                          newReplicationInstance(sourceClassname,
4067                                          conf, server, fs, logDir, oldLogDir);
4068       server.replicationSinkHandler = (ReplicationSinkService)
4069                                          newReplicationInstance(sinkClassname,
4070                                          conf, server, fs, logDir, oldLogDir);
4071     }
4072   }
4073 
4074   static private ReplicationService newReplicationInstance(String classname,
4075     Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
4076     Path oldLogDir) throws IOException{
4077 
4078     Class<?> clazz = null;
4079     try {
4080       ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
4081       clazz = Class.forName(classname, true, classLoader);
4082     } catch (java.lang.ClassNotFoundException nfe) {
4083       throw new IOException("Cound not find class for " + classname);
4084     }
4085 
4086     // create an instance of the replication object.
4087     ReplicationService service = (ReplicationService)
4088                               ReflectionUtils.newInstance(clazz, conf);
4089     service.initialize(server, fs, logDir, oldLogDir);
4090     return service;
4091   }
4092 
4093   /**
4094    * @param hrs
4095    * @return Thread the RegionServer is running in correctly named.
4096    * @throws IOException
4097    */
4098   public static Thread startRegionServer(final HRegionServer hrs)
4099       throws IOException {
4100     return startRegionServer(hrs, "regionserver" + hrs.isa.getPort());
4101   }
4102 
4103   /**
4104    * @param hrs
4105    * @param name
4106    * @return Thread the RegionServer is running in correctly named.
4107    * @throws IOException
4108    */
4109   public static Thread startRegionServer(final HRegionServer hrs,
4110       final String name) throws IOException {
4111     Thread t = new Thread(hrs);
4112     t.setName(name);
4113     t.start();
4114     // Install shutdown hook that will catch signals and run an orderly shutdown
4115     // of the hrs.
4116     ShutdownHook.install(hrs.getConfiguration(), FileSystem.get(hrs
4117         .getConfiguration()), hrs, t);
4118     return t;
4119   }
4120 
4121   /**
4122    * Utility for constructing an instance of the passed HRegionServer class.
4123    *
4124    * @param regionServerClass
4125    * @param conf2
4126    * @return HRegionServer instance.
4127    */
4128   public static HRegionServer constructRegionServer(
4129       Class<? extends HRegionServer> regionServerClass,
4130       final Configuration conf2) {
4131     try {
4132       Constructor<? extends HRegionServer> c = regionServerClass
4133           .getConstructor(Configuration.class);
4134       return c.newInstance(conf2);
4135     } catch (Exception e) {
4136       throw new RuntimeException("Failed construction of " + "Regionserver: "
4137           + regionServerClass.toString(), e);
4138     }
4139   }
4140 
4141   @Override
4142   @QosPriority(priority=HConstants.REPLICATION_QOS)
4143   public void replicateLogEntries(final HLog.Entry[] entries)
4144   throws IOException {
4145     checkOpen();
4146     if (this.replicationSinkHandler == null) return;
4147     this.replicationSinkHandler.replicateLogEntries(entries);
4148   }
4149 
4150   /**
4151    * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
4152    */
4153   public static void main(String[] args) throws Exception {
4154 	VersionInfo.logVersion();
4155     Configuration conf = HBaseConfiguration.create();
4156     @SuppressWarnings("unchecked")
4157     Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
4158         .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
4159 
4160     new HRegionServerCommandLine(regionServerClass).doMain(args);
4161   }
4162 
4163   @Override
4164   public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries() throws IOException {
4165     BlockCache c = new CacheConfig(this.conf).getBlockCache();
4166     return c.getBlockCacheColumnFamilySummaries(this.conf);
4167   }
4168 
4169   @Override
4170   public byte[][] rollHLogWriter() throws IOException, FailedLogCloseException {
4171     HLog wal = this.getWAL();
4172     return wal.rollWriter(true);
4173   }
4174 
4175   /**
4176    * Gets the online regions of the specified table.
4177    * This method looks at the in-memory onlineRegions.  It does not go to <code>.META.</code>.
4178    * Only returns <em>online</em> regions.  If a region on this table has been
4179    * closed during a disable, etc., it will not be included in the returned list.
4180    * So, the returned list may not necessarily be ALL regions in this table, its
4181    * all the ONLINE regions in the table.
4182    * @param tableName
4183    * @return Online regions from <code>tableName</code>
4184    */
4185    public List<HRegion> getOnlineRegions(byte[] tableName) {
4186      List<HRegion> tableRegions = new ArrayList<HRegion>();
4187      synchronized (this.onlineRegions) {
4188        for (HRegion region: this.onlineRegions.values()) {
4189          HRegionInfo regionInfo = region.getRegionInfo();
4190          if(Bytes.equals(regionInfo.getTableName(), tableName)) {
4191            tableRegions.add(region);
4192          }
4193        }
4194      }
4195      return tableRegions;
4196    }
4197 
4198   // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
4199   public String[] getCoprocessors() {
4200     TreeSet<String> coprocessors = new TreeSet<String>(
4201         this.hlog.getCoprocessorHost().getCoprocessors());
4202     Collection<HRegion> regions = getOnlineRegionsLocalContext();
4203     for (HRegion region: regions) {
4204       coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
4205     }
4206     return coprocessors.toArray(new String[0]);
4207   }
4208 
4209   /**
4210    * Register bean with platform management server
4211    */
4212   @SuppressWarnings("deprecation")
4213   void registerMBean() {
4214     MXBeanImpl mxBeanInfo = MXBeanImpl.init(this);
4215     mxBean = MBeanUtil.registerMBean("RegionServer", "RegionServer",
4216         mxBeanInfo);
4217     LOG.info("Registered RegionServer MXBean");
4218   }
4219 
4220   /**
4221    * Get the current compaction state of the region.
4222    *
4223    * @param regionName the name of the region to check compaction statte.
4224    * @return the compaction state name.
4225    * @throws IOException exception
4226    */
4227   public String getCompactionState(final byte[] regionName) throws IOException {
4228       checkOpen();
4229       requestCount.incrementAndGet();
4230       HRegion region = getRegion(regionName);
4231       HRegionInfo info = region.getRegionInfo();
4232       return CompactionRequest.getCompactionState(info.getRegionId()).name();
4233   }
4234 
4235   public long getResponseQueueSize(){
4236     if (server != null) {
4237       return server.getResponseQueueSize();
4238     }
4239     return 0;
4240   }
4241 
4242   private boolean isHealthCheckerConfigured() {
4243     String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
4244     return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
4245   }
4246 
4247   /**
4248    * @return the underlying {@link CompactSplitThread} for the servers
4249    */
4250   public CompactSplitThread getCompactSplitThread() {
4251     return this.compactSplitThread;
4252   }
4253 }