1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.lang.Thread.UncaughtExceptionHandler;
23  import java.lang.annotation.Retention;
24  import java.lang.annotation.RetentionPolicy;
25  import java.lang.management.ManagementFactory;
26  import java.lang.management.MemoryUsage;
27  import java.lang.reflect.Constructor;
28  import java.net.BindException;
29  import java.net.InetSocketAddress;
30  import java.util.ArrayList;
31  import java.util.Collection;
32  import java.util.Collections;
33  import java.util.Comparator;
34  import java.util.HashMap;
35  import java.util.HashSet;
36  import java.util.Iterator;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.Map.Entry;
40  import java.util.Random;
41  import java.util.Set;
42  import java.util.SortedMap;
43  import java.util.TreeMap;
44  import java.util.TreeSet;
45  import java.util.concurrent.ConcurrentHashMap;
46  import java.util.concurrent.ConcurrentMap;
47  import java.util.concurrent.ConcurrentSkipListMap;
48  import java.util.concurrent.locks.ReentrantReadWriteLock;
49  
50  import javax.management.ObjectName;
51  
52  import org.apache.commons.logging.Log;
53  import org.apache.commons.logging.LogFactory;
54  import org.apache.hadoop.classification.InterfaceAudience;
55  import org.apache.hadoop.conf.Configuration;
56  import org.apache.hadoop.fs.FileSystem;
57  import org.apache.hadoop.fs.Path;
58  import org.apache.hadoop.hbase.CellScannable;
59  import org.apache.hadoop.hbase.CellScanner;
60  import org.apache.hadoop.hbase.CellUtil;
61  import org.apache.hadoop.hbase.Chore;
62  import org.apache.hadoop.hbase.HBaseConfiguration;
63  import org.apache.hadoop.hbase.HConstants;
64  import org.apache.hadoop.hbase.HRegionInfo;
65  import org.apache.hadoop.hbase.HTableDescriptor;
66  import org.apache.hadoop.hbase.HealthCheckChore;
67  import org.apache.hadoop.hbase.KeyValue;
68  import org.apache.hadoop.hbase.RemoteExceptionHandler;
69  import org.apache.hadoop.hbase.ServerName;
70  import org.apache.hadoop.hbase.Stoppable;
71  import org.apache.hadoop.hbase.TableDescriptors;
72  import org.apache.hadoop.hbase.ZNodeClearer;
73  import org.apache.hadoop.hbase.catalog.CatalogTracker;
74  import org.apache.hadoop.hbase.catalog.MetaEditor;
75  import org.apache.hadoop.hbase.catalog.MetaReader;
76  import org.apache.hadoop.hbase.client.Append;
77  import org.apache.hadoop.hbase.client.Delete;
78  import org.apache.hadoop.hbase.client.Get;
79  import org.apache.hadoop.hbase.client.HConnectionManager;
80  import org.apache.hadoop.hbase.client.Increment;
81  import org.apache.hadoop.hbase.client.Mutation;
82  import org.apache.hadoop.hbase.client.Put;
83  import org.apache.hadoop.hbase.client.Result;
84  import org.apache.hadoop.hbase.client.RowMutations;
85  import org.apache.hadoop.hbase.client.Scan;
86  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
87  import org.apache.hadoop.hbase.exceptions.ClockOutOfSyncException;
88  import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
89  import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
90  import org.apache.hadoop.hbase.exceptions.LeaseException;
91  import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException;
92  import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
93  import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
94  import org.apache.hadoop.hbase.exceptions.RegionAlreadyInTransitionException;
95  import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
96  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
97  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
98  import org.apache.hadoop.hbase.exceptions.RegionServerRunningException;
99  import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException;
100 import org.apache.hadoop.hbase.exceptions.ServerNotRunningYetException;
101 import org.apache.hadoop.hbase.exceptions.UnknownScannerException;
102 import org.apache.hadoop.hbase.exceptions.YouAreDeadException;
103 import org.apache.hadoop.hbase.executor.ExecutorService;
104 import org.apache.hadoop.hbase.executor.ExecutorType;
105 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
106 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
107 import org.apache.hadoop.hbase.fs.HFileSystem;
108 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
109 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
110 import org.apache.hadoop.hbase.ipc.RpcClient;
111 import org.apache.hadoop.hbase.ipc.RpcServer;
112 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
113 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
114 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
115 import org.apache.hadoop.hbase.ipc.ServerRpcController;
116 import org.apache.hadoop.hbase.master.SplitLogManager;
117 import org.apache.hadoop.hbase.master.TableLockManager;
118 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
119 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
120 import org.apache.hadoop.hbase.protobuf.RequestConverter;
121 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
122 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
123 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
124 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
125 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
126 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
127 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
128 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
129 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
130 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
131 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
132 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
133 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
134 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
135 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
136 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
137 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
138 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
139 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
140 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
141 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
142 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
143 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
144 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
145 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
146 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
147 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
148 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
149 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
150 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
151 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
152 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
153 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
154 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
155 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
156 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
157 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
158 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
159 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
160 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
161 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest;
162 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetResponse;
163 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
164 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
165 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
166 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
167 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
168 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
169 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
170 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
171 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
172 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
173 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
174 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionLoad;
175 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
176 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
177 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
178 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
179 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
180 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
181 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
182 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
183 import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
184 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
185 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
186 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
187 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
188 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
189 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
190 import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager;
191 import org.apache.hadoop.hbase.regionserver.wal.HLog;
192 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
193 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
194 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
195 import org.apache.hadoop.hbase.security.User;
196 import org.apache.hadoop.hbase.util.Bytes;
197 import org.apache.hadoop.hbase.util.CompressionTest;
198 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
199 import org.apache.hadoop.hbase.util.FSTableDescriptors;
200 import org.apache.hadoop.hbase.util.FSUtils;
201 import org.apache.hadoop.hbase.util.InfoServer;
202 import org.apache.hadoop.hbase.util.Pair;
203 import org.apache.hadoop.hbase.util.Sleeper;
204 import org.apache.hadoop.hbase.util.Strings;
205 import org.apache.hadoop.hbase.util.Threads;
206 import org.apache.hadoop.hbase.util.VersionInfo;
207 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
208 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
209 import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
210 import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
211 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
212 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
213 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
214 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
215 import org.apache.hadoop.ipc.RemoteException;
216 import org.apache.hadoop.metrics.util.MBeanUtil;
217 import org.apache.hadoop.net.DNS;
218 import org.apache.hadoop.util.ReflectionUtils;
219 import org.apache.hadoop.util.StringUtils;
220 import org.apache.zookeeper.KeeperException;
221 import org.apache.zookeeper.data.Stat;
222 import org.cliffc.high_scale_lib.Counter;
223 
224 import com.google.protobuf.BlockingRpcChannel;
225 import com.google.protobuf.ByteString;
226 import com.google.protobuf.Message;
227 import com.google.protobuf.RpcController;
228 import com.google.protobuf.ServiceException;
229 import com.google.protobuf.TextFormat;
230 
231 /**
232  * HRegionServer makes a set of HRegions available to clients. It checks in with
233  * the HMaster. There are many HRegionServers in a single HBase deployment.
234  */
235 @InterfaceAudience.Private
236 @SuppressWarnings("deprecation")
237 public class HRegionServer implements ClientProtos.ClientService.BlockingInterface,
238   AdminProtos.AdminService.BlockingInterface, Runnable, RegionServerServices,
239   HBaseRPCErrorHandler, LastSequenceId {
240 
241   public static final Log LOG = LogFactory.getLog(HRegionServer.class);
242 
243   private final Random rand;
244 
245   /*
246    * Strings to be used in forming the exception message for
247    * RegionsAlreadyInTransitionException.
248    */
249   protected static final String OPEN = "OPEN";
250   protected static final String CLOSE = "CLOSE";
251 
252   //RegionName vs current action in progress
253   //true - if open region action in progress
254   //false - if close region action in progress
255   protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
256     new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
257 
258   protected long maxScannerResultSize;
259 
260   // Cache flushing
261   protected MemStoreFlusher cacheFlusher;
262 
263   // catalog tracker
264   protected CatalogTracker catalogTracker;
265 
266   // Watch if a region is out of recovering state from ZooKeeper
267   @SuppressWarnings("unused")
268   private RecoveringRegionWatcher recoveringRegionWatcher;
269 
270   /**
271    * Go here to get table descriptors.
272    */
273   protected TableDescriptors tableDescriptors;
274 
275   // Replication services. If no replication, this handler will be null.
276   protected ReplicationSourceService replicationSourceHandler;
277   protected ReplicationSinkService replicationSinkHandler;
278 
279   // Compactions
280   public CompactSplitThread compactSplitThread;
281 
282   final ConcurrentHashMap<String, RegionScannerHolder> scanners =
283       new ConcurrentHashMap<String, RegionScannerHolder>();
284 
285   /**
286    * Map of regions currently being served by this region server. Key is the
287    * encoded region name.  All access should be synchronized.
288    */
289   protected final Map<String, HRegion> onlineRegions =
290     new ConcurrentHashMap<String, HRegion>();
291 
292   /**
293    * Map of encoded region names to the DataNode locations they should be hosted on
294    * We store the value as InetSocketAddress since this is used only in HDFS
295    * API (create() that takes favored nodes as hints for placing file blocks).
296    * We could have used ServerName here as the value class, but we'd need to
297    * convert it to InetSocketAddress at some point before the HDFS API call, and
298    * it seems a bit weird to store ServerName since ServerName refers to RegionServers
299    * and here we really mean DataNode locations.
300    */
301   protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
302       new ConcurrentHashMap<String, InetSocketAddress[]>();
303    
304   /**
305    * Set of regions currently being in recovering state which means it can accept writes(edits from
306    * previous failed region server) but not reads. A recovering region is also an online region.
307    */
308   protected final Map<String, HRegion> recoveringRegions = Collections
309       .synchronizedMap(new HashMap<String, HRegion>());
310 
311   // Leases
312   protected Leases leases;
313 
314   // Instance of the hbase executor service.
315   protected ExecutorService service;
316 
317   // Request counter. (Includes requests that are not serviced by regions.)
318   final Counter requestCount = new Counter();
319 
320   // If false, the file system has become unavailable
321   protected volatile boolean fsOk;
322   protected HFileSystem fs;
323 
324   // Set when a report to the master comes back with a message asking us to
325   // shutdown. Also set by call to stop when debugging or running unit tests
326   // of HRegionServer in isolation.
327   protected volatile boolean stopped = false;
328 
329   // Go down hard. Used if file system becomes unavailable and also in
330   // debugging and unit tests.
331   protected volatile boolean abortRequested;
332 
333   // Port we put up the webui on.
334   protected int webuiport = -1;
335 
336   ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
337 
338   // A state before we go into stopped state.  At this stage we're closing user
339   // space regions.
340   private boolean stopping = false;
341 
342   private volatile boolean killed = false;
343 
344   protected final Configuration conf;
345 
346   private boolean useHBaseChecksum; // verify hbase checksums?
347   private Path rootDir;
348 
349   protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
350 
351   final int numRetries;
352   protected final int threadWakeFrequency;
353   private final int msgInterval;
354 
355   protected final int numRegionsToReport;
356 
357   // Stub to do region server status calls against the master.
358   private RegionServerStatusService.BlockingInterface rssStub;
359   // RPC client. Used to make the stub above that does region server status checking.
360   RpcClient rpcClient;
361 
362   // Server to handle client requests. Default access so can be accessed by
363   // unit tests.
364   RpcServerInterface rpcServer;
365 
366   private final InetSocketAddress isa;
367   private UncaughtExceptionHandler uncaughtExceptionHandler;
368 
369   // Info server. Default access so can be used by unit tests. REGIONSERVER
370   // is name of the webapp and the attribute name used stuffing this instance
371   // into web context.
372   InfoServer infoServer;
373 
374   /** region server process name */
375   public static final String REGIONSERVER = "regionserver";
376 
377   /** region server configuration name */
378   public static final String REGIONSERVER_CONF = "regionserver_conf";
379 
380   private MetricsRegionServer metricsRegionServer;
381 
382   /*
383    * Check for compactions requests.
384    */
385   Chore compactionChecker;
386 
387   /*
388    * Check for flushes
389    */
390   Chore periodicFlusher;
391 
392   // HLog and HLog roller. log is protected rather than private to avoid
393   // eclipse warning when accessed by inner classes
394   protected volatile HLog hlog;
395   // The meta updates are written to a different hlog. If this
396   // regionserver holds meta regions, then this field will be non-null.
397   protected volatile HLog hlogForMeta;
398 
399   LogRoller hlogRoller;
400   LogRoller metaHLogRoller;
401 
402   // flag set after we're done setting up server threads (used for testing)
403   protected volatile boolean isOnline;
404 
405   // zookeeper connection and watcher
406   private ZooKeeperWatcher zooKeeper;
407 
408   // master address manager and watcher
409   private MasterAddressTracker masterAddressManager;
410 
411   // Cluster Status Tracker
412   private ClusterStatusTracker clusterStatusTracker;
413 
414   // Log Splitting Worker
415   private SplitLogWorker splitLogWorker;
416 
417   // A sleeper that sleeps for msgInterval.
418   private final Sleeper sleeper;
419 
420   private final int rpcTimeout;
421 
422   private final RegionServerAccounting regionServerAccounting;
423 
424   // Cache configuration and block cache reference
425   final CacheConfig cacheConfig;
426 
427   // reference to the Thrift Server.
428   volatile private HRegionThriftServer thriftServer;
429 
430   /** The health check chore. */
431   private HealthCheckChore healthCheckChore;
432 
433   /**
434    * The server name the Master sees us as.  Its made from the hostname the
435    * master passes us, port, and server startcode. Gets set after registration
436    * against  Master.  The hostname can differ from the hostname in {@link #isa}
437    * but usually doesn't if both servers resolve .
438    */
439   private ServerName serverNameFromMasterPOV;
440 
441   /**
442    * This servers startcode.
443    */
444   private final long startcode;
445 
446   /**
447    * Unique identifier for the cluster we are a part of.
448    */
449   private String clusterId;
450 
451   /**
452    * MX Bean for RegionServerInfo
453    */
454   private ObjectName mxBean = null;
455 
456   /**
457    * Chore to clean periodically the moved region list
458    */
459   private MovedRegionsCleaner movedRegionsCleaner;
460 
461   /**
462    * The lease timeout period for client scanners (milliseconds).
463    */
464   private final int scannerLeaseTimeoutPeriod;
465 
466   /**
467    * The reference to the QosFunction
468    */
469   private final QosFunction qosFunction;
470 
471   private RegionServerCoprocessorHost rsHost;
472 
473   /** Handle all the snapshot requests to this server */
474   RegionServerSnapshotManager snapshotManager;
475   
476   // configuration setting on if replay WAL edits directly to another RS
477   private final boolean distributedLogReplay;
478 
479   // Table level lock manager for locking for region operations
480   private TableLockManager tableLockManager;
481 
482   /**
483    * Starts a HRegionServer at the default location
484    *
485    * @param conf
486    * @throws IOException
487    * @throws InterruptedException
488    */
489   public HRegionServer(Configuration conf)
490   throws IOException, InterruptedException {
491     this.fsOk = true;
492     this.conf = conf;
493     this.isOnline = false;
494     checkCodecs(this.conf);
495 
496     // do we use checksum verification in the hbase? If hbase checksum verification
497     // is enabled, then we automatically switch off hdfs checksum verification.
498     this.useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, false);
499 
500     // Config'ed params
501     this.numRetries = conf.getInt("hbase.client.retries.number", 10);
502     this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
503     this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
504 
505     this.sleeper = new Sleeper(this.msgInterval, this);
506 
507     this.maxScannerResultSize = conf.getLong(
508       HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
509       HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
510 
511     this.numRegionsToReport = conf.getInt(
512       "hbase.regionserver.numregionstoreport", 10);
513 
514     this.rpcTimeout = conf.getInt(
515       HConstants.HBASE_RPC_TIMEOUT_KEY,
516       HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
517 
518     this.abortRequested = false;
519     this.stopped = false;
520 
521     this.scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
522       HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
523 
524     // Server to handle client requests.
525     String hostname = conf.get("hbase.regionserver.ipc.address",
526       Strings.domainNamePointerToHostName(DNS.getDefaultHost(
527         conf.get("hbase.regionserver.dns.interface", "default"),
528         conf.get("hbase.regionserver.dns.nameserver", "default"))));
529     int port = conf.getInt(HConstants.REGIONSERVER_PORT,
530       HConstants.DEFAULT_REGIONSERVER_PORT);
531     // Creation of a HSA will force a resolve.
532     InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
533     if (initialIsa.getAddress() == null) {
534       throw new IllegalArgumentException("Failed resolve of " + initialIsa);
535     }
536     this.rand = new Random(initialIsa.hashCode());
537     String name = "regionserver/" + initialIsa.toString();
538     // Set how many times to retry talking to another server over HConnection.
539     HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG);
540     this.rpcServer = new RpcServer(this, name, getServices(),
541       /*HBaseRPCErrorHandler.class, OnlineRegions.class},*/
542       initialIsa, // BindAddress is IP we got for this server.
543       conf.getInt("hbase.regionserver.handler.count", 10),
544       conf.getInt("hbase.regionserver.metahandler.count", 10),
545       conf, HConstants.QOS_THRESHOLD);
546 
547     // Set our address.
548     this.isa = this.rpcServer.getListenerAddress();
549 
550     this.rpcServer.setErrorHandler(this);
551     this.rpcServer.setQosFunction((qosFunction = new QosFunction(this)));
552     this.startcode = System.currentTimeMillis();
553 
554     // login the zookeeper client principal (if using security)
555     ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
556       "hbase.zookeeper.client.kerberos.principal", this.isa.getHostName());
557 
558     // login the server principal (if using secure Hadoop)
559     User.login(this.conf, "hbase.regionserver.keytab.file",
560       "hbase.regionserver.kerberos.principal", this.isa.getHostName());
561     regionServerAccounting = new RegionServerAccounting();
562     cacheConfig = new CacheConfig(conf);
563     uncaughtExceptionHandler = new UncaughtExceptionHandler() {
564       public void uncaughtException(Thread t, Throwable e) {
565         abort("Uncaught exception in service thread " + t.getName(), e);
566       }
567     };
568     this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
569 
570     this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, 
571       HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
572   }
573 
574   /**
575    * @return list of blocking services and their security info classes that this server supports
576    */
577   private List<BlockingServiceAndInterface> getServices() {
578     List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(2);
579     bssi.add(new BlockingServiceAndInterface(
580         ClientProtos.ClientService.newReflectiveBlockingService(this),
581         ClientProtos.ClientService.BlockingInterface.class));
582     bssi.add(new BlockingServiceAndInterface(
583         AdminProtos.AdminService.newReflectiveBlockingService(this),
584         AdminProtos.AdminService.BlockingInterface.class));
585     return bssi;
586   }
587 
588   /**
589    * Run test on configured codecs to make sure supporting libs are in place.
590    * @param c
591    * @throws IOException
592    */
593   private static void checkCodecs(final Configuration c) throws IOException {
594     // check to see if the codec list is available:
595     String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
596     if (codecs == null) return;
597     for (String codec : codecs) {
598       if (!CompressionTest.testCompression(codec)) {
599         throw new IOException("Compression codec " + codec +
600           " not supported, aborting RS construction");
601       }
602     }
603   }
604 
605   String getClusterId() {
606     return this.clusterId;
607   }
608 
609   @Retention(RetentionPolicy.RUNTIME)
610   protected @interface QosPriority {
611     int priority() default 0;
612   }
613 
614   QosFunction getQosFunction() {
615     return qosFunction;
616   }
617 
618   RegionScanner getScanner(long scannerId) {
619     String scannerIdString = Long.toString(scannerId);
620     RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
621     if (scannerHolder != null) {
622       return scannerHolder.s;
623     }
624     return null;
625   }
626 
627   /**
628    * All initialization needed before we go register with Master.
629    *
630    * @throws IOException
631    * @throws InterruptedException
632    */
633   private void preRegistrationInitialization(){
634     try {
635       initializeZooKeeper();
636       initializeThreads();
637     } catch (Throwable t) {
638       // Call stop if error or process will stick around for ever since server
639       // puts up non-daemon threads.
640       this.rpcServer.stop();
641       abort("Initialization of RS failed.  Hence aborting RS.", t);
642     }
643   }
644 
645   /**
646    * Bring up connection to zk ensemble and then wait until a master for this
647    * cluster and then after that, wait until cluster 'up' flag has been set.
648    * This is the order in which master does things.
649    * Finally put up a catalog tracker.
650    * @throws IOException
651    * @throws InterruptedException
652    */
653   private void initializeZooKeeper() throws IOException, InterruptedException {
654     // Open connection to zookeeper and set primary watcher
655     this.zooKeeper = new ZooKeeperWatcher(conf, REGIONSERVER + ":" +
656       this.isa.getPort(), this);
657 
658     // Create the master address manager, register with zk, and start it.  Then
659     // block until a master is available.  No point in starting up if no master
660     // running.
661     this.masterAddressManager = new MasterAddressTracker(this.zooKeeper, this);
662     this.masterAddressManager.start();
663     blockAndCheckIfStopped(this.masterAddressManager);
664 
665     // Wait on cluster being up.  Master will set this flag up in zookeeper
666     // when ready.
667     this.clusterStatusTracker = new ClusterStatusTracker(this.zooKeeper, this);
668     this.clusterStatusTracker.start();
669     blockAndCheckIfStopped(this.clusterStatusTracker);
670 
671     // Create the catalog tracker and start it;
672     this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf, this);
673     catalogTracker.start();
674 
675     // Retrieve clusterId
676     // Since cluster status is now up
677     // ID should have already been set by HMaster
678     try {
679       clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
680       if (clusterId == null) {
681         this.abort("Cluster ID has not been set");
682       }
683       LOG.info("ClusterId : "+clusterId);
684     } catch (KeeperException e) {
685       this.abort("Failed to retrieve Cluster ID",e);
686     }
687 
688     // watch for snapshots
689     try {
690       this.snapshotManager = new RegionServerSnapshotManager(this);
691     } catch (KeeperException e) {
692       this.abort("Failed to reach zk cluster when creating snapshot handler.");
693     }
694     this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper,
695         new ServerName(isa.getHostName(), isa.getPort(), startcode));
696 
697     // register watcher for recovering regions
698     if(this.distributedLogReplay) {
699       this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
700     }
701   }
702 
703   /**
704    * Utilty method to wait indefinitely on a znode availability while checking
705    * if the region server is shut down
706    * @param tracker znode tracker to use
707    * @throws IOException any IO exception, plus if the RS is stopped
708    * @throws InterruptedException
709    */
710   private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker)
711       throws IOException, InterruptedException {
712     while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
713       if (this.stopped) {
714         throw new IOException("Received the shutdown message while waiting.");
715       }
716     }
717   }
718 
719   /**
720    * @return False if cluster shutdown in progress
721    */
722   private boolean isClusterUp() {
723     return this.clusterStatusTracker.isClusterUp();
724   }
725 
726   private void initializeThreads() throws IOException {
727     // Cache flushing thread.
728     this.cacheFlusher = new MemStoreFlusher(conf, this);
729 
730     // Compaction thread
731     this.compactSplitThread = new CompactSplitThread(this);
732 
733     // Background thread to check for compactions; needed if region has not gotten updates
734     // in a while. It will take care of not checking too frequently on store-by-store basis.
735     this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
736     this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
737     // Health checker thread.
738     int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
739       HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
740     if (isHealthCheckerConfigured()) {
741       healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
742     }
743 
744     this.leases = new Leases(this.threadWakeFrequency);
745 
746     // Create the thread for the ThriftServer.
747     if (conf.getBoolean("hbase.regionserver.export.thrift", false)) {
748       thriftServer = new HRegionThriftServer(this, conf);
749       thriftServer.start();
750       LOG.info("Started Thrift API from Region Server.");
751     }
752 
753     // Create the thread to clean the moved regions list
754     movedRegionsCleaner = MovedRegionsCleaner.createAndStart(this);
755 
756     // Setup RPC client for master communication
757     rpcClient = new RpcClient(conf, clusterId);
758   }
759 
760   /**
761    * The HRegionServer sticks in this loop until closed.
762    */
763   public void run() {
764     try {
765       // Do pre-registration initializations; zookeeper, lease threads, etc.
766       preRegistrationInitialization();
767     } catch (Throwable e) {
768       abort("Fatal exception during initialization", e);
769     }
770 
771     try {
772       // Try and register with the Master; tell it we are here.  Break if
773       // server is stopped or the clusterup flag is down or hdfs went wacky.
774       while (keepLooping()) {
775         RegionServerStartupResponse w = reportForDuty();
776         if (w == null) {
777           LOG.warn("reportForDuty failed; sleeping and then retrying.");
778           this.sleeper.sleep();
779         } else {
780           handleReportForDutyResponse(w);
781           break;
782         }
783       }
784 
785       // start the snapshot handler, since the server is ready to run
786       this.snapshotManager.start();
787 
788       // We registered with the Master.  Go into run mode.
789       long lastMsg = 0;
790       long oldRequestCount = -1;
791       // The main run loop.
792       while (!this.stopped && isHealthy()) {
793         if (!isClusterUp()) {
794           if (isOnlineRegionsEmpty()) {
795             stop("Exiting; cluster shutdown set and not carrying any regions");
796           } else if (!this.stopping) {
797             this.stopping = true;
798             LOG.info("Closing user regions");
799             closeUserRegions(this.abortRequested);
800           } else if (this.stopping) {
801             boolean allUserRegionsOffline = areAllUserRegionsOffline();
802             if (allUserRegionsOffline) {
803               // Set stopped if no requests since last time we went around the loop.
804               // The remaining meta regions will be closed on our way out.
805               if (oldRequestCount == this.requestCount.get()) {
806                 stop("Stopped; only catalog regions remaining online");
807                 break;
808               }
809               oldRequestCount = this.requestCount.get();
810             } else {
811               // Make sure all regions have been closed -- some regions may
812               // have not got it because we were splitting at the time of
813               // the call to closeUserRegions.
814               closeUserRegions(this.abortRequested);
815             }
816             LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
817           }
818         }
819         long now = System.currentTimeMillis();
820         if ((now - lastMsg) >= msgInterval) {
821           tryRegionServerReport(lastMsg, now);
822           lastMsg = System.currentTimeMillis();
823         }
824         if (!this.stopped) this.sleeper.sleep();
825       } // for
826     } catch (Throwable t) {
827       if (!checkOOME(t)) {
828         String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
829         abort(prefix + t.getMessage(), t);
830       }
831     }
832     // Run shutdown.
833     if (mxBean != null) {
834       MBeanUtil.unregisterMBean(mxBean);
835       mxBean = null;
836     }
837     if (this.thriftServer != null) this.thriftServer.shutdown();
838     this.leases.closeAfterLeasesExpire();
839     this.rpcServer.stop();
840     if (this.splitLogWorker != null) {
841       splitLogWorker.stop();
842     }
843     if (this.infoServer != null) {
844       LOG.info("Stopping infoServer");
845       try {
846         this.infoServer.stop();
847       } catch (Exception e) {
848         e.printStackTrace();
849       }
850     }
851     // Send cache a shutdown.
852     if (cacheConfig.isBlockCacheEnabled()) {
853       cacheConfig.getBlockCache().shutdown();
854     }
855 
856     movedRegionsCleaner.stop("Region Server stopping");
857 
858     // Send interrupts to wake up threads if sleeping so they notice shutdown.
859     // TODO: Should we check they are alive? If OOME could have exited already
860     if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
861     if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
862     if (this.hlogRoller != null) this.hlogRoller.interruptIfNecessary();
863     if (this.metaHLogRoller != null) this.metaHLogRoller.interruptIfNecessary();
864     if (this.compactionChecker != null)
865       this.compactionChecker.interrupt();
866     if (this.healthCheckChore != null) {
867       this.healthCheckChore.interrupt();
868     }
869 
870     try {
871       if (snapshotManager != null) snapshotManager.stop(this.abortRequested);
872     } catch (IOException e) {
873       LOG.warn("Failed to close snapshot handler cleanly", e);
874     }
875 
876     if (this.killed) {
877       // Just skip out w/o closing regions.  Used when testing.
878     } else if (abortRequested) {
879       if (this.fsOk) {
880         closeUserRegions(abortRequested); // Don't leave any open file handles
881       }
882       LOG.info("aborting server " + this.serverNameFromMasterPOV);
883     } else {
884       closeUserRegions(abortRequested);
885       closeAllScanners();
886       LOG.info("stopping server " + this.serverNameFromMasterPOV);
887     }
888     // Interrupt catalog tracker here in case any regions being opened out in
889     // handlers are stuck waiting on meta.
890     if (this.catalogTracker != null) this.catalogTracker.stop();
891 
892     // stop the snapshot handler, forcefully killing all running tasks
893     try {
894       if (snapshotManager != null) snapshotManager.stop(this.abortRequested || this.killed);
895     } catch (IOException e) {
896       LOG.warn("Failed to close snapshot handler cleanly", e);
897     }
898 
899     // Closing the compactSplit thread before closing meta regions
900     if (!this.killed && containsMetaTableRegions()) {
901       if (!abortRequested || this.fsOk) {
902         if (this.compactSplitThread != null) {
903           this.compactSplitThread.join();
904           this.compactSplitThread = null;
905         }
906         closeMetaTableRegions(abortRequested);
907       }
908     }
909 
910     if (!this.killed && this.fsOk) {
911       waitOnAllRegionsToClose(abortRequested);
912       LOG.info("stopping server " + this.serverNameFromMasterPOV +
913         "; all regions closed.");
914     }
915 
916     //fsOk flag may be changed when closing regions throws exception.
917     if (!this.killed && this.fsOk) {
918       closeWAL(!abortRequested);
919     }
920 
921     // Make sure the proxy is down.
922     if (this.rssStub != null) {
923       this.rssStub = null;
924     }
925     this.rpcClient.stop();
926     this.leases.close();
927 
928     if (!killed) {
929       join();
930     }
931 
932     try {
933       deleteMyEphemeralNode();
934     } catch (KeeperException e) {
935       LOG.warn("Failed deleting my ephemeral node", e);
936     }
937     // We may have failed to delete the znode at the previous step, but
938     //  we delete the file anyway: a second attempt to delete the znode is likely to fail again.
939     ZNodeClearer.deleteMyEphemeralNodeOnDisk();
940     this.zooKeeper.close();
941     LOG.info("stopping server " + this.serverNameFromMasterPOV +
942       "; zookeeper connection closed.");
943 
944     LOG.info(Thread.currentThread().getName() + " exiting");
945   }
946 
947   private boolean containsMetaTableRegions() {
948     return onlineRegions.containsKey(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
949   }
950 
951   private boolean areAllUserRegionsOffline() {
952     if (getNumberOfOnlineRegions() > 2) return false;
953     boolean allUserRegionsOffline = true;
954     for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
955       if (!e.getValue().getRegionInfo().isMetaTable()) {
956         allUserRegionsOffline = false;
957         break;
958       }
959     }
960     return allUserRegionsOffline;
961   }
962 
963   void tryRegionServerReport(long reportStartTime, long reportEndTime)
964   throws IOException {
965     HBaseProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
966     try {
967       RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
968       ServerName sn = ServerName.parseVersionedServerName(
969         this.serverNameFromMasterPOV.getVersionedBytes());
970       request.setServer(ProtobufUtil.toServerName(sn));
971       request.setLoad(sl);
972       this.rssStub.regionServerReport(null, request.build());
973     } catch (ServiceException se) {
974       IOException ioe = ProtobufUtil.getRemoteException(se);
975       if (ioe instanceof YouAreDeadException) {
976         // This will be caught and handled as a fatal error in run()
977         throw ioe;
978       }
979       // Couldn't connect to the master, get location from zk and reconnect
980       // Method blocks until new master is found or we are stopped
981       Pair<ServerName, RegionServerStatusService.BlockingInterface> p =
982         createRegionServerStatusStub();
983       this.rssStub = p.getSecond();
984     }
985   }
986 
987   HBaseProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) {
988     // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
989     // per second, and other metrics  As long as metrics are part of ServerLoad it's best to use
990     // the wrapper to compute those numbers in one place.
991     // In the long term most of these should be moved off of ServerLoad and the heart beat.
992     // Instead they should be stored in an HBase table so that external visibility into HBase is
993     // improved; Additionally the load balancer will be able to take advantage of a more complete
994     // history.
995     MetricsRegionServerWrapper regionServerWrapper = this.metricsRegionServer.getRegionServerWrapper();
996     Collection<HRegion> regions = getOnlineRegionsLocalContext();
997     MemoryUsage memory =
998       ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
999 
1000     HBaseProtos.ServerLoad.Builder serverLoad = HBaseProtos.ServerLoad.newBuilder();
1001     serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1002     serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
1003     serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024));
1004     serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024));
1005     Set<String> coprocessors = this.hlog.getCoprocessorHost().getCoprocessors();
1006     for (String coprocessor : coprocessors) {
1007       serverLoad.addCoprocessors(
1008         Coprocessor.newBuilder().setName(coprocessor).build());
1009     }
1010     for (HRegion region : regions) {
1011       serverLoad.addRegionLoads(createRegionLoad(region));
1012     }
1013     serverLoad.setReportStartTime(reportStartTime);
1014     serverLoad.setReportEndTime(reportEndTime);
1015     if (this.infoServer != null) {
1016       serverLoad.setInfoServerPort(this.infoServer.getPort());
1017     } else {
1018       serverLoad.setInfoServerPort(-1);
1019     }
1020     return serverLoad.build();
1021   }
1022 
1023   String getOnlineRegionsAsPrintableString() {
1024     StringBuilder sb = new StringBuilder();
1025     for (HRegion r: this.onlineRegions.values()) {
1026       if (sb.length() > 0) sb.append(", ");
1027       sb.append(r.getRegionInfo().getEncodedName());
1028     }
1029     return sb.toString();
1030   }
1031 
1032   /**
1033    * Wait on regions close.
1034    */
1035   private void waitOnAllRegionsToClose(final boolean abort) {
1036     // Wait till all regions are closed before going out.
1037     int lastCount = -1;
1038     long previousLogTime = 0;
1039     Set<String> closedRegions = new HashSet<String>();
1040     while (!isOnlineRegionsEmpty()) {
1041       int count = getNumberOfOnlineRegions();
1042       // Only print a message if the count of regions has changed.
1043       if (count != lastCount) {
1044         // Log every second at most
1045         if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1046           previousLogTime = System.currentTimeMillis();
1047           lastCount = count;
1048           LOG.info("Waiting on " + count + " regions to close");
1049           // Only print out regions still closing if a small number else will
1050           // swamp the log.
1051           if (count < 10 && LOG.isDebugEnabled()) {
1052             LOG.debug(this.onlineRegions);
1053           }
1054         }
1055       }
1056       // Ensure all user regions have been sent a close. Use this to
1057       // protect against the case where an open comes in after we start the
1058       // iterator of onlineRegions to close all user regions.
1059       for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1060         HRegionInfo hri = e.getValue().getRegionInfo();
1061         if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1062             && !closedRegions.contains(hri.getEncodedName())) {
1063           closedRegions.add(hri.getEncodedName());
1064           // Don't update zk with this close transition; pass false.
1065           closeRegionIgnoreErrors(hri, abort);
1066         }
1067       }
1068       // No regions in RIT, we could stop waiting now.
1069       if (this.regionsInTransitionInRS.isEmpty()) {
1070         if (!isOnlineRegionsEmpty()) {
1071           LOG.info("We were exiting though online regions are not empty," +
1072               " because some regions failed closing");
1073         }
1074         break;
1075       }
1076       Threads.sleep(200);
1077     }
1078   }
1079 
1080   private void closeWAL(final boolean delete) {
1081     if (this.hlogForMeta != null) {
1082       // All hlogs (meta and non-meta) are in the same directory. Don't call
1083       // closeAndDelete here since that would delete all hlogs not just the
1084       // meta ones. We will just 'close' the hlog for meta here, and leave
1085       // the directory cleanup to the follow-on closeAndDelete call.
1086       try {
1087         this.hlogForMeta.close();
1088       } catch (Throwable e) {
1089         LOG.error("Metalog close and delete failed", RemoteExceptionHandler.checkThrowable(e));
1090       }
1091     }
1092     if (this.hlog != null) {
1093       try {
1094         if (delete) {
1095           hlog.closeAndDelete();
1096         } else {
1097           hlog.close();
1098         }
1099       } catch (Throwable e) {
1100         LOG.error("Close and delete failed", RemoteExceptionHandler.checkThrowable(e));
1101       }
1102     }
1103   }
1104 
1105   private void closeAllScanners() {
1106     // Close any outstanding scanners. Means they'll get an UnknownScanner
1107     // exception next time they come in.
1108     for (Map.Entry<String, RegionScannerHolder> e : this.scanners.entrySet()) {
1109       try {
1110         e.getValue().s.close();
1111       } catch (IOException ioe) {
1112         LOG.warn("Closing scanner " + e.getKey(), ioe);
1113       }
1114     }
1115   }
1116 
1117   /*
1118    * Run init. Sets up hlog and starts up all server threads.
1119    *
1120    * @param c Extra configuration.
1121    */
1122   protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1123   throws IOException {
1124     try {
1125       for (NameStringPair e : c.getMapEntriesList()) {
1126         String key = e.getName();
1127         // The hostname the master sees us as.
1128         if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1129           String hostnameFromMasterPOV = e.getValue();
1130           this.serverNameFromMasterPOV = new ServerName(hostnameFromMasterPOV,
1131             this.isa.getPort(), this.startcode);
1132           if (!this.serverNameFromMasterPOV.equals(this.isa.getHostName())) {
1133             LOG.info("Master passed us a different hostname to use; was=" +
1134               this.isa.getHostName() + ", but now=" +
1135               this.serverNameFromMasterPOV.getHostname());
1136           }
1137           continue;
1138         }
1139         String value = e.getValue();
1140         if (LOG.isDebugEnabled()) {
1141           LOG.debug("Config from master: " + key + "=" + value);
1142         }
1143         this.conf.set(key, value);
1144       }
1145 
1146       // hack! Maps DFSClient => RegionServer for logs.  HDFS made this
1147       // config param for task trackers, but we can piggyback off of it.
1148       if (this.conf.get("mapred.task.id") == null) {
1149         this.conf.set("mapred.task.id", "hb_rs_" +
1150           this.serverNameFromMasterPOV.toString());
1151       }
1152       // Set our ephemeral znode up in zookeeper now we have a name.
1153       createMyEphemeralNode();
1154 
1155       // Save it in a file, this will allow to see if we crash
1156       ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1157 
1158       // Master sent us hbase.rootdir to use. Should be fully qualified
1159       // path with file system specification included. Set 'fs.defaultFS'
1160       // to match the filesystem on hbase.rootdir else underlying hadoop hdfs
1161       // accessors will be going against wrong filesystem (unless all is set
1162       // to defaults).
1163       FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
1164       // Get fs instance used by this RS
1165       this.fs = new HFileSystem(this.conf, this.useHBaseChecksum);
1166       this.rootDir = FSUtils.getRootDir(this.conf);
1167       this.tableDescriptors = new FSTableDescriptors(this.fs, this.rootDir, true);
1168       this.hlog = setupWALAndReplication();
1169       // Init in here rather than in constructor after thread name has been set
1170       this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
1171       startServiceThreads();
1172       LOG.info("Serving as " + this.serverNameFromMasterPOV +
1173         ", RPC listening on " + this.isa +
1174         ", sessionid=0x" +
1175         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1176       isOnline = true;
1177     } catch (Throwable e) {
1178       this.isOnline = false;
1179       stop("Failed initialization");
1180       throw convertThrowableToIOE(cleanup(e, "Failed init"),
1181           "Region server startup failed");
1182     } finally {
1183       sleeper.skipSleepCycle();
1184     }
1185   }
1186 
1187   private void createMyEphemeralNode() throws KeeperException {
1188     ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(),
1189       HConstants.EMPTY_BYTE_ARRAY);
1190   }
1191 
1192   private void deleteMyEphemeralNode() throws KeeperException {
1193     ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1194   }
1195 
1196   public RegionServerAccounting getRegionServerAccounting() {
1197     return regionServerAccounting;
1198   }
1199 
1200   @Override
1201   public TableLockManager getTableLockManager() {
1202     return tableLockManager;
1203   }
1204 
1205   /*
1206    * @param r Region to get RegionLoad for.
1207    *
1208    * @return RegionLoad instance.
1209    *
1210    * @throws IOException
1211    */
1212   private RegionLoad createRegionLoad(final HRegion r) {
1213     byte[] name = r.getRegionName();
1214     int stores = 0;
1215     int storefiles = 0;
1216     int storeUncompressedSizeMB = 0;
1217     int storefileSizeMB = 0;
1218     int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
1219     int storefileIndexSizeMB = 0;
1220     int rootIndexSizeKB = 0;
1221     int totalStaticIndexSizeKB = 0;
1222     int totalStaticBloomSizeKB = 0;
1223     long totalCompactingKVs = 0;
1224     long currentCompactedKVs = 0;
1225     synchronized (r.stores) {
1226       stores += r.stores.size();
1227       for (Store store : r.stores.values()) {
1228         storefiles += store.getStorefilesCount();
1229         storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed()
1230             / 1024 / 1024);
1231         storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1232         storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
1233         CompactionProgress progress = store.getCompactionProgress();
1234         if (progress != null) {
1235           totalCompactingKVs += progress.totalCompactingKVs;
1236           currentCompactedKVs += progress.currentCompactedKVs;
1237         }
1238 
1239         rootIndexSizeKB +=
1240             (int) (store.getStorefilesIndexSize() / 1024);
1241 
1242         totalStaticIndexSizeKB +=
1243           (int) (store.getTotalStaticIndexSize() / 1024);
1244 
1245         totalStaticBloomSizeKB +=
1246           (int) (store.getTotalStaticBloomSize() / 1024);
1247       }
1248     }
1249     RegionLoad.Builder regionLoad = RegionLoad.newBuilder();
1250     RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1251     regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1252     regionSpecifier.setValue(ByteString.copyFrom(name));
1253     regionLoad.setRegionSpecifier(regionSpecifier.build())
1254       .setStores(stores)
1255       .setStorefiles(storefiles)
1256       .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1257       .setStorefileSizeMB(storefileSizeMB)
1258       .setMemstoreSizeMB(memstoreSizeMB)
1259       .setStorefileIndexSizeMB(storefileIndexSizeMB)
1260       .setRootIndexSizeKB(rootIndexSizeKB)
1261       .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1262       .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1263       .setReadRequestsCount((int) r.readRequestsCount.get())
1264       .setWriteRequestsCount((int) r.writeRequestsCount.get())
1265       .setTotalCompactingKVs(totalCompactingKVs)
1266       .setCurrentCompactedKVs(currentCompactedKVs)
1267       .setCompleteSequenceId(r.completeSequenceId);
1268 
1269     return regionLoad.build();
1270   }
1271 
1272   /**
1273    * @param encodedRegionName
1274    * @return An instance of RegionLoad.
1275    */
1276   public RegionLoad createRegionLoad(final String encodedRegionName) {
1277     HRegion r = null;
1278     r = this.onlineRegions.get(encodedRegionName);
1279     return r != null ? createRegionLoad(r) : null;
1280   }
1281 
1282   /*
1283    * Inner class that runs on a long period checking if regions need compaction.
1284    */
1285   private static class CompactionChecker extends Chore {
1286     private final HRegionServer instance;
1287     private final int majorCompactPriority;
1288     private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1289     private long iteration = 0;
1290 
1291     CompactionChecker(final HRegionServer h, final int sleepTime,
1292         final Stoppable stopper) {
1293       super("CompactionChecker", sleepTime, h);
1294       this.instance = h;
1295       LOG.info("Runs every " + StringUtils.formatTime(sleepTime));
1296 
1297       /* MajorCompactPriority is configurable.
1298        * If not set, the compaction will use default priority.
1299        */
1300       this.majorCompactPriority = this.instance.conf.
1301         getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1302         DEFAULT_PRIORITY);
1303     }
1304 
1305     @Override
1306     protected void chore() {
1307       for (HRegion r : this.instance.onlineRegions.values()) {
1308         if (r == null)
1309           continue;
1310         for (Store s : r.getStores().values()) {
1311           try {
1312             long multiplier = s.getCompactionCheckMultiplier();
1313             assert multiplier > 0;
1314             if (iteration % multiplier != 0) continue;
1315             if (s.needsCompaction()) {
1316               // Queue a compaction. Will recognize if major is needed.
1317               this.instance.compactSplitThread.requestCompaction(r, s, getName()
1318                   + " requests compaction", null);
1319             } else if (s.isMajorCompaction()) {
1320               if (majorCompactPriority == DEFAULT_PRIORITY
1321                   || majorCompactPriority > r.getCompactPriority()) {
1322                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1323                     + " requests major compaction; use default priority", null);
1324               } else {
1325                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1326                     + " requests major compaction; use configured priority",
1327                   this.majorCompactPriority, null);
1328               }
1329             }
1330           } catch (IOException e) {
1331             LOG.warn("Failed major compaction check on " + r, e);
1332           }
1333         }
1334       }
1335       iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1336     }
1337   }
1338 
1339   class PeriodicMemstoreFlusher extends Chore {
1340     final HRegionServer server;
1341     final static int RANGE_OF_DELAY = 20000; //millisec
1342     final static int MIN_DELAY_TIME = 3000; //millisec
1343     public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1344       super(server.getServerName() + "-MemstoreFlusherChore", cacheFlushInterval, server);
1345       this.server = server;
1346     }
1347 
1348     @Override
1349     protected void chore() {
1350       for (HRegion r : this.server.onlineRegions.values()) {
1351         if (r == null)
1352           continue;
1353         if (r.shouldFlush()) {
1354           FlushRequester requester = server.getFlushRequester();
1355           if (requester != null) {
1356             long randomDelay = rand.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
1357             LOG.info(getName() + " requesting flush for region " + r.getRegionNameAsString() +
1358                 " after a delay of " + randomDelay);
1359             //Throttle the flushes by putting a delay. If we don't throttle, and there
1360             //is a balanced write-load on the regions in a table, we might end up
1361             //overwhelming the filesystem with too many flushes at once.
1362             requester.requestDelayedFlush(r, randomDelay);
1363           }
1364         }
1365       }
1366     }
1367   }
1368 
1369   /**
1370    * Report the status of the server. A server is online once all the startup is
1371    * completed (setting up filesystem, starting service threads, etc.). This
1372    * method is designed mostly to be useful in tests.
1373    *
1374    * @return true if online, false if not.
1375    */
1376   public boolean isOnline() {
1377     return isOnline;
1378   }
1379 
1380   /**
1381    * Setup WAL log and replication if enabled.
1382    * Replication setup is done in here because it wants to be hooked up to WAL.
1383    * @return A WAL instance.
1384    * @throws IOException
1385    */
1386   private HLog setupWALAndReplication() throws IOException {
1387     final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1388     final String logName
1389       = HLogUtil.getHLogDirectoryName(this.serverNameFromMasterPOV.toString());
1390 
1391     Path logdir = new Path(rootDir, logName);
1392     if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1393     if (this.fs.exists(logdir)) {
1394       throw new RegionServerRunningException("Region server has already " +
1395         "created directory at " + this.serverNameFromMasterPOV.toString());
1396     }
1397 
1398     // Instantiate replication manager if replication enabled.  Pass it the
1399     // log directories.
1400     createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
1401 
1402     return instantiateHLog(rootDir, logName);
1403   }
1404 
1405   private HLog getMetaWAL() throws IOException {
1406     if (this.hlogForMeta == null) {
1407       final String logName
1408       = HLogUtil.getHLogDirectoryName(this.serverNameFromMasterPOV.toString());
1409 
1410       Path logdir = new Path(rootDir, logName);
1411       if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1412 
1413       this.hlogForMeta = HLogFactory.createMetaHLog(this.fs.getBackingFs(),
1414           rootDir, logName, this.conf, getMetaWALActionListeners(),
1415           this.serverNameFromMasterPOV.toString());
1416     }
1417     return this.hlogForMeta;
1418   }
1419 
1420   /**
1421    * Called by {@link #setupWALAndReplication()} creating WAL instance.
1422    * @param rootdir
1423    * @param logName
1424    * @return WAL instance.
1425    * @throws IOException
1426    */
1427   protected HLog instantiateHLog(Path rootdir, String logName) throws IOException {
1428     return HLogFactory.createHLog(this.fs.getBackingFs(), rootdir, logName, this.conf,
1429       getWALActionListeners(), this.serverNameFromMasterPOV.toString());
1430   }
1431 
1432   /**
1433    * Called by {@link #instantiateHLog(Path, String)} setting up WAL instance.
1434    * Add any {@link WALActionsListener}s you want inserted before WAL startup.
1435    * @return List of WALActionsListener that will be passed in to
1436    * {@link org.apache.hadoop.hbase.regionserver.wal.FSHLog} on construction.
1437    */
1438   protected List<WALActionsListener> getWALActionListeners() {
1439     List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1440     // Log roller.
1441     this.hlogRoller = new LogRoller(this, this);
1442     listeners.add(this.hlogRoller);
1443     if (this.replicationSourceHandler != null &&
1444         this.replicationSourceHandler.getWALActionsListener() != null) {
1445       // Replication handler is an implementation of WALActionsListener.
1446       listeners.add(this.replicationSourceHandler.getWALActionsListener());
1447     }
1448     return listeners;
1449   }
1450 
1451   protected List<WALActionsListener> getMetaWALActionListeners() {
1452     List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1453     // Using a tmp log roller to ensure metaLogRoller is alive once it is not
1454     // null
1455     MetaLogRoller tmpLogRoller = new MetaLogRoller(this, this);
1456     String n = Thread.currentThread().getName();
1457     Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
1458         n + "MetaLogRoller", uncaughtExceptionHandler);
1459     this.metaHLogRoller = tmpLogRoller;
1460     tmpLogRoller = null;
1461     listeners.add(this.metaHLogRoller);
1462     return listeners;
1463   }
1464 
1465   protected LogRoller getLogRoller() {
1466     return hlogRoller;
1467   }
1468 
1469   public MetricsRegionServer getMetrics() {
1470     return this.metricsRegionServer;
1471   }
1472 
1473   /**
1474    * @return Master address tracker instance.
1475    */
1476   public MasterAddressTracker getMasterAddressManager() {
1477     return this.masterAddressManager;
1478   }
1479 
1480   /*
1481    * Start maintenance Threads, Server, Worker and lease checker threads.
1482    * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1483    * get an unhandled exception. We cannot set the handler on all threads.
1484    * Server's internal Listener thread is off limits. For Server, if an OOME, it
1485    * waits a while then retries. Meantime, a flush or a compaction that tries to
1486    * run should trigger same critical condition and the shutdown will run. On
1487    * its way out, this server will shut down Server. Leases are sort of
1488    * inbetween. It has an internal thread that while it inherits from Chore, it
1489    * keeps its own internal stop mechanism so needs to be stopped by this
1490    * hosting server. Worker logs the exception and exits.
1491    */
1492   private void startServiceThreads() throws IOException {
1493     String n = Thread.currentThread().getName();
1494     // Start executor services
1495     this.service = new ExecutorService(getServerName().toString());
1496     this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
1497       conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1498     this.service.startExecutorService(ExecutorType.RS_OPEN_META,
1499       conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1500     this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1501       conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1502     this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
1503       conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1504     if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1505       this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
1506         conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
1507     }
1508 
1509     Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller",
1510         uncaughtExceptionHandler);
1511     this.cacheFlusher.start(uncaughtExceptionHandler);
1512     Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n +
1513       ".compactionChecker", uncaughtExceptionHandler);
1514     Threads.setDaemonThreadRunning(this.periodicFlusher.getThread(), n +
1515         ".periodicFlusher", uncaughtExceptionHandler);
1516     if (this.healthCheckChore != null) {
1517     Threads
1518         .setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker",
1519             uncaughtExceptionHandler);
1520     }
1521 
1522     // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1523     // an unhandled exception, it will just exit.
1524     this.leases.setName(n + ".leaseChecker");
1525     this.leases.start();
1526 
1527     // Put up the webui.  Webui may come up on port other than configured if
1528     // that port is occupied. Adjust serverInfo if this is the case.
1529     this.webuiport = putUpWebUI();
1530 
1531     if (this.replicationSourceHandler == this.replicationSinkHandler &&
1532         this.replicationSourceHandler != null) {
1533       this.replicationSourceHandler.startReplicationService();
1534     } else if (this.replicationSourceHandler != null) {
1535       this.replicationSourceHandler.startReplicationService();
1536     } else if (this.replicationSinkHandler != null) {
1537       this.replicationSinkHandler.startReplicationService();
1538     }
1539 
1540     // Start Server.  This service is like leases in that it internally runs
1541     // a thread.
1542     this.rpcServer.start();
1543 
1544     // Create the log splitting worker and start it
1545     this.splitLogWorker = new SplitLogWorker(this.zooKeeper, this.getConfiguration(), this, this);
1546     splitLogWorker.start();
1547   }
1548 
1549   /**
1550    * Puts up the webui.
1551    * @return Returns final port -- maybe different from what we started with.
1552    * @throws IOException
1553    */
1554   private int putUpWebUI() throws IOException {
1555     int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 60030);
1556     // -1 is for disabling info server
1557     if (port < 0) return port;
1558     String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
1559     // check if auto port bind enabled
1560     boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
1561         false);
1562     while (true) {
1563       try {
1564         this.infoServer = new InfoServer("regionserver", addr, port, false, this.conf);
1565         this.infoServer.addServlet("status", "/rs-status", RSStatusServlet.class);
1566         this.infoServer.addServlet("dump", "/dump", RSDumpServlet.class);
1567         this.infoServer.setAttribute(REGIONSERVER, this);
1568         this.infoServer.setAttribute(REGIONSERVER_CONF, conf);
1569         this.infoServer.start();
1570         break;
1571       } catch (BindException e) {
1572         if (!auto) {
1573           // auto bind disabled throw BindException
1574           throw e;
1575         }
1576         // auto bind enabled, try to use another port
1577         LOG.info("Failed binding http info server to port: " + port);
1578         port++;
1579       }
1580     }
1581     return port;
1582   }
1583 
1584   /*
1585    * Verify that server is healthy
1586    */
1587   private boolean isHealthy() {
1588     if (!fsOk) {
1589       // File system problem
1590       return false;
1591     }
1592     // Verify that all threads are alive
1593     if (!(leases.isAlive()
1594         && cacheFlusher.isAlive() && hlogRoller.isAlive()
1595         && this.compactionChecker.isAlive())
1596         && this.periodicFlusher.isAlive()) {
1597       stop("One or more threads are no longer alive -- stop");
1598       return false;
1599     }
1600     if (metaHLogRoller != null && !metaHLogRoller.isAlive()) {
1601       stop("Meta HLog roller thread is no longer alive -- stop");
1602       return false;
1603     }
1604     return true;
1605   }
1606 
1607   public HLog getWAL() {
1608     try {
1609       return getWAL(null);
1610     } catch (IOException e) {
1611       LOG.warn("getWAL threw exception " + e);
1612       return null;
1613     }
1614   }
1615 
1616   @Override
1617   public HLog getWAL(HRegionInfo regionInfo) throws IOException {
1618     //TODO: at some point this should delegate to the HLogFactory
1619     //currently, we don't care about the region as much as we care about the
1620     //table.. (hence checking the tablename below)
1621     //_ROOT_ and .META. regions have separate WAL.
1622     if (regionInfo != null &&
1623         regionInfo.isMetaTable()) {
1624       return getMetaWAL();
1625     }
1626     return this.hlog;
1627   }
1628 
1629   @Override
1630   public CatalogTracker getCatalogTracker() {
1631     return this.catalogTracker;
1632   }
1633 
1634   @Override
1635   public void stop(final String msg) {
1636     try {
1637       this.rsHost.preStop(msg);
1638       this.stopped = true;
1639       LOG.info("STOPPED: " + msg);
1640       // Wakes run() if it is sleeping
1641       sleeper.skipSleepCycle();
1642     } catch (IOException exp) {
1643       LOG.warn("The region server did not stop", exp);
1644     }
1645   }
1646 
1647   public void waitForServerOnline(){
1648     while (!isOnline() && !isStopped()){
1649        sleeper.sleep();
1650     }
1651   }
1652 
1653   @Override
1654   public void postOpenDeployTasks(final HRegion r, final CatalogTracker ct)
1655   throws KeeperException, IOException {
1656     checkOpen();
1657     LOG.info("Post open deploy tasks for region=" + r.getRegionNameAsString());
1658     // Do checks to see if we need to compact (references or too many files)
1659     for (Store s : r.getStores().values()) {
1660       if (s.hasReferences() || s.needsCompaction()) {
1661         getCompactionRequester().requestCompaction(r, s, "Opening Region", null);
1662       }
1663     }
1664     long openSeqNum = r.getOpenSeqNum();
1665     if (openSeqNum == HConstants.NO_SEQNUM) {
1666       // If we opened a region, we should have read some sequence number from it.
1667       LOG.error("No sequence number found when opening " + r.getRegionNameAsString());
1668       openSeqNum = 0;
1669     }
1670 
1671     // Update flushed sequence id of a recovering region in ZK
1672     updateRecoveringRegionLastFlushedSequenceId(r);
1673 
1674     // Update ZK, or META
1675     if (r.getRegionInfo().isMetaRegion()) {
1676       MetaRegionTracker.setMetaLocation(getZooKeeper(),
1677           this.serverNameFromMasterPOV);
1678     } else {
1679       MetaEditor.updateRegionLocation(ct, r.getRegionInfo(),
1680         this.serverNameFromMasterPOV, openSeqNum);
1681     }
1682     LOG.info("Done with post open deploy task for region=" +
1683       r.getRegionNameAsString());
1684 
1685   }
1686 
1687   @Override
1688   public RpcServerInterface getRpcServer() {
1689     return rpcServer;
1690   }
1691 
1692   /**
1693    * Cause the server to exit without closing the regions it is serving, the log
1694    * it is using and without notifying the master. Used unit testing and on
1695    * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
1696    *
1697    * @param reason
1698    *          the reason we are aborting
1699    * @param cause
1700    *          the exception that caused the abort, or null
1701    */
1702   public void abort(String reason, Throwable cause) {
1703     String msg = "ABORTING region server " + this + ": " + reason;
1704     if (cause != null) {
1705       LOG.fatal(msg, cause);
1706     } else {
1707       LOG.fatal(msg);
1708     }
1709     this.abortRequested = true;
1710     // HBASE-4014: show list of coprocessors that were loaded to help debug
1711     // regionserver crashes.Note that we're implicitly using
1712     // java.util.HashSet's toString() method to print the coprocessor names.
1713     LOG.fatal("RegionServer abort: loaded coprocessors are: " +
1714         CoprocessorHost.getLoadedCoprocessors());
1715     // Do our best to report our abort to the master, but this may not work
1716     try {
1717       if (cause != null) {
1718         msg += "\nCause:\n" + StringUtils.stringifyException(cause);
1719       }
1720       // Report to the master but only if we have already registered with the master.
1721       if (rssStub != null && this.serverNameFromMasterPOV != null) {
1722         ReportRSFatalErrorRequest.Builder builder =
1723           ReportRSFatalErrorRequest.newBuilder();
1724         ServerName sn =
1725           ServerName.parseVersionedServerName(this.serverNameFromMasterPOV.getVersionedBytes());
1726         builder.setServer(ProtobufUtil.toServerName(sn));
1727         builder.setErrorMessage(msg);
1728         rssStub.reportRSFatalError(null, builder.build());
1729       }
1730     } catch (Throwable t) {
1731       LOG.warn("Unable to report fatal error to master", t);
1732     }
1733     stop(reason);
1734   }
1735 
1736   /**
1737    * @see HRegionServer#abort(String, Throwable)
1738    */
1739   public void abort(String reason) {
1740     abort(reason, null);
1741   }
1742 
1743   public boolean isAborted() {
1744     return this.abortRequested;
1745   }
1746 
1747   /*
1748    * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
1749    * logs but it does close socket in case want to bring up server on old
1750    * hostname+port immediately.
1751    */
1752   protected void kill() {
1753     this.killed = true;
1754     abort("Simulated kill");
1755   }
1756 
1757   /**
1758    * Wait on all threads to finish. Presumption is that all closes and stops
1759    * have already been called.
1760    */
1761   protected void join() {
1762     Threads.shutdown(this.compactionChecker.getThread());
1763     Threads.shutdown(this.periodicFlusher.getThread());
1764     this.cacheFlusher.join();
1765     if (this.healthCheckChore != null) {
1766       Threads.shutdown(this.healthCheckChore.getThread());
1767     }
1768     if (this.hlogRoller != null) {
1769       Threads.shutdown(this.hlogRoller.getThread());
1770     }
1771     if (this.metaHLogRoller != null) {
1772       Threads.shutdown(this.metaHLogRoller.getThread());
1773     }
1774     if (this.compactSplitThread != null) {
1775       this.compactSplitThread.join();
1776     }
1777     if (this.service != null) this.service.shutdown();
1778     if (this.replicationSourceHandler != null &&
1779         this.replicationSourceHandler == this.replicationSinkHandler) {
1780       this.replicationSourceHandler.stopReplicationService();
1781     } else if (this.replicationSourceHandler != null) {
1782       this.replicationSourceHandler.stopReplicationService();
1783     } else if (this.replicationSinkHandler != null) {
1784       this.replicationSinkHandler.stopReplicationService();
1785     }
1786   }
1787 
1788   /**
1789    * @return Return the object that implements the replication
1790    * source service.
1791    */
1792   ReplicationSourceService getReplicationSourceService() {
1793     return replicationSourceHandler;
1794   }
1795 
1796   /**
1797    * @return Return the object that implements the replication
1798    * sink service.
1799    */
1800   ReplicationSinkService getReplicationSinkService() {
1801     return replicationSinkHandler;
1802   }
1803 
1804   /**
1805    * Get the current master from ZooKeeper and open the RPC connection to it.
1806    *
1807    * Method will block until a master is available. You can break from this
1808    * block by requesting the server stop.
1809    *
1810    * @return master + port, or null if server has been stopped
1811    */
1812   private Pair<ServerName, RegionServerStatusService.BlockingInterface>
1813   createRegionServerStatusStub() {
1814     ServerName sn = null;
1815     long previousLogTime = 0;
1816     RegionServerStatusService.BlockingInterface master = null;
1817     boolean refresh = false; // for the first time, use cached data
1818     RegionServerStatusService.BlockingInterface intf = null;
1819     while (keepLooping() && master == null) {
1820       sn = this.masterAddressManager.getMasterAddress(refresh);
1821       if (sn == null) {
1822         if (!keepLooping()) {
1823           // give up with no connection.
1824           LOG.debug("No master found and cluster is stopped; bailing out");
1825           return null;
1826         }
1827         LOG.debug("No master found; retry");
1828         previousLogTime = System.currentTimeMillis();
1829         refresh = true; // let's try pull it from ZK directly
1830         sleeper.sleep();
1831         continue;
1832       }
1833 
1834       InetSocketAddress isa =
1835         new InetSocketAddress(sn.getHostname(), sn.getPort());
1836 
1837       LOG.info("Attempting connect to Master server at " +
1838         this.masterAddressManager.getMasterAddress());
1839       try {
1840         BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
1841             User.getCurrent(), this.rpcTimeout);
1842         intf = RegionServerStatusService.newBlockingStub(channel);
1843         break;
1844       } catch (IOException e) {
1845         e = e instanceof RemoteException ?
1846             ((RemoteException)e).unwrapRemoteException() : e;
1847         if (e instanceof ServerNotRunningYetException) {
1848           if (System.currentTimeMillis() > (previousLogTime+1000)){
1849             LOG.info("Master isn't available yet, retrying");
1850             previousLogTime = System.currentTimeMillis();
1851           }
1852         } else {
1853           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1854             LOG.warn("Unable to connect to master. Retrying. Error was:", e);
1855             previousLogTime = System.currentTimeMillis();
1856           }
1857         }
1858         try {
1859           Thread.sleep(200);
1860         } catch (InterruptedException ignored) {
1861         }
1862       }
1863     }
1864     return new Pair<ServerName, RegionServerStatusService.BlockingInterface>(sn, intf);
1865   }
1866 
1867   /**
1868    * @return True if we should break loop because cluster is going down or
1869    * this server has been stopped or hdfs has gone bad.
1870    */
1871   private boolean keepLooping() {
1872     return !this.stopped && isClusterUp();
1873   }
1874 
1875   /*
1876    * Let the master know we're here Run initialization using parameters passed
1877    * us by the master.
1878    * @return A Map of key/value configurations we got from the Master else
1879    * null if we failed to register.
1880    * @throws IOException
1881    */
1882   private RegionServerStartupResponse reportForDuty() throws IOException {
1883     RegionServerStartupResponse result = null;
1884     Pair<ServerName, RegionServerStatusService.BlockingInterface> p =
1885       createRegionServerStatusStub();
1886     this.rssStub = p.getSecond();
1887     ServerName masterServerName = p.getFirst();
1888     if (masterServerName == null) return result;
1889     try {
1890       this.requestCount.set(0);
1891       LOG.info("Telling master at " + masterServerName + " that we are up " +
1892         "with port=" + this.isa.getPort() + ", startcode=" + this.startcode);
1893       long now = EnvironmentEdgeManager.currentTimeMillis();
1894       int port = this.isa.getPort();
1895       RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
1896       request.setPort(port);
1897       request.setServerStartCode(this.startcode);
1898       request.setServerCurrentTime(now);
1899       result = this.rssStub.regionServerStartup(null, request.build());
1900     } catch (ServiceException se) {
1901       IOException ioe = ProtobufUtil.getRemoteException(se);
1902       if (ioe instanceof ClockOutOfSyncException) {
1903         LOG.fatal("Master rejected startup because clock is out of sync", ioe);
1904         // Re-throw IOE will cause RS to abort
1905         throw ioe;
1906       } else {
1907         LOG.warn("error telling master we are up", se);
1908       }
1909     }
1910     return result;
1911   }
1912 
1913   @Override
1914   public long getLastSequenceId(byte[] region) {
1915     Long lastFlushedSequenceId = -1l;
1916     try {
1917       GetLastFlushedSequenceIdRequest req = RequestConverter
1918           .buildGetLastFlushedSequenceIdRequest(region);
1919       lastFlushedSequenceId = rssStub.getLastFlushedSequenceId(null, req)
1920           .getLastFlushedSequenceId();
1921     } catch (ServiceException e) {
1922       lastFlushedSequenceId = -1l;
1923       LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id", e);
1924     }
1925     return lastFlushedSequenceId;
1926   }
1927 
1928   /**
1929    * Closes all regions.  Called on our way out.
1930    * Assumes that its not possible for new regions to be added to onlineRegions
1931    * while this method runs.
1932    */
1933   protected void closeAllRegions(final boolean abort) {
1934     closeUserRegions(abort);
1935     closeMetaTableRegions(abort);
1936   }
1937 
1938   /**
1939    * Close meta region if we carry it
1940    * @param abort Whether we're running an abort.
1941    */
1942   void closeMetaTableRegions(final boolean abort) {
1943     HRegion meta = null;
1944     this.lock.writeLock().lock();
1945     try {
1946       for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) {
1947         HRegionInfo hri = e.getValue().getRegionInfo();
1948         if (hri.isMetaRegion()) {
1949           meta = e.getValue();
1950         }
1951         if (meta != null) break;
1952       }
1953     } finally {
1954       this.lock.writeLock().unlock();
1955     }
1956     if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
1957   }
1958 
1959   /**
1960    * Schedule closes on all user regions.
1961    * Should be safe calling multiple times because it wont' close regions
1962    * that are already closed or that are closing.
1963    * @param abort Whether we're running an abort.
1964    */
1965   void closeUserRegions(final boolean abort) {
1966     this.lock.writeLock().lock();
1967     try {
1968       for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1969         HRegion r = e.getValue();
1970         if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) {
1971           // Don't update zk with this close transition; pass false.
1972           closeRegionIgnoreErrors(r.getRegionInfo(), abort);
1973         }
1974       }
1975     } finally {
1976       this.lock.writeLock().unlock();
1977     }
1978   }
1979 
1980   /** @return the info server */
1981   public InfoServer getInfoServer() {
1982     return infoServer;
1983   }
1984 
1985   /**
1986    * @return true if a stop has been requested.
1987    */
1988   public boolean isStopped() {
1989     return this.stopped;
1990   }
1991 
1992   @Override
1993   public boolean isStopping() {
1994     return this.stopping;
1995   }
1996 
1997   public Map<String, HRegion> getRecoveringRegions() {
1998     return this.recoveringRegions;
1999   }
2000 
2001   /**
2002    *
2003    * @return the configuration
2004    */
2005   public Configuration getConfiguration() {
2006     return conf;
2007   }
2008 
2009   /** @return the write lock for the server */
2010   ReentrantReadWriteLock.WriteLock getWriteLock() {
2011     return lock.writeLock();
2012   }
2013 
2014   public int getNumberOfOnlineRegions() {
2015     return this.onlineRegions.size();
2016   }
2017 
2018   boolean isOnlineRegionsEmpty() {
2019     return this.onlineRegions.isEmpty();
2020   }
2021 
2022   /**
2023    * For tests, web ui and metrics.
2024    * This method will only work if HRegionServer is in the same JVM as client;
2025    * HRegion cannot be serialized to cross an rpc.
2026    */
2027   public Collection<HRegion> getOnlineRegionsLocalContext() {
2028     Collection<HRegion> regions = this.onlineRegions.values();
2029     return Collections.unmodifiableCollection(regions);
2030   }
2031 
2032   @Override
2033   public void addToOnlineRegions(HRegion region) {
2034     this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2035   }
2036 
2037   /**
2038    * @return A new Map of online regions sorted by region size with the first
2039    *         entry being the biggest.
2040    */
2041   public SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() {
2042     // we'll sort the regions in reverse
2043     SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
2044         new Comparator<Long>() {
2045           public int compare(Long a, Long b) {
2046             return -1 * a.compareTo(b);
2047           }
2048         });
2049     // Copy over all regions. Regions are sorted by size with biggest first.
2050     for (HRegion region : this.onlineRegions.values()) {
2051       sortedRegions.put(region.memstoreSize.get(), region);
2052     }
2053     return sortedRegions;
2054   }
2055 
2056   /**
2057    * @return time stamp in millis of when this region server was started
2058    */
2059   public long getStartcode() {
2060     return this.startcode;
2061   }
2062 
2063   /** @return reference to FlushRequester */
2064   public FlushRequester getFlushRequester() {
2065     return this.cacheFlusher;
2066   }
2067 
2068   /**
2069    * Get the top N most loaded regions this server is serving so we can tell the
2070    * master which regions it can reallocate if we're overloaded. TODO: actually
2071    * calculate which regions are most loaded. (Right now, we're just grabbing
2072    * the first N regions being served regardless of load.)
2073    */
2074   protected HRegionInfo[] getMostLoadedRegions() {
2075     ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2076     for (HRegion r : onlineRegions.values()) {
2077       if (!r.isAvailable()) {
2078         continue;
2079       }
2080       if (regions.size() < numRegionsToReport) {
2081         regions.add(r.getRegionInfo());
2082       } else {
2083         break;
2084       }
2085     }
2086     return regions.toArray(new HRegionInfo[regions.size()]);
2087   }
2088 
2089   @Override
2090   public Leases getLeases() {
2091     return leases;
2092   }
2093 
2094   /**
2095    * @return Return the rootDir.
2096    */
2097   protected Path getRootDir() {
2098     return rootDir;
2099   }
2100 
2101   /**
2102    * @return Return the fs.
2103    */
2104   public FileSystem getFileSystem() {
2105     return fs;
2106   }
2107 
2108   public String toString() {
2109     return getServerName().toString();
2110   }
2111 
2112   /**
2113    * Interval at which threads should run
2114    *
2115    * @return the interval
2116    */
2117   public int getThreadWakeFrequency() {
2118     return threadWakeFrequency;
2119   }
2120 
2121   @Override
2122   public ZooKeeperWatcher getZooKeeper() {
2123     return zooKeeper;
2124   }
2125 
2126   @Override
2127   public ServerName getServerName() {
2128     // Our servername could change after we talk to the master.
2129     return this.serverNameFromMasterPOV == null?
2130       new ServerName(this.isa.getHostName(), this.isa.getPort(), this.startcode):
2131         this.serverNameFromMasterPOV;
2132   }
2133 
2134   @Override
2135   public CompactionRequestor getCompactionRequester() {
2136     return this.compactSplitThread;
2137   }
2138 
2139   public ZooKeeperWatcher getZooKeeperWatcher() {
2140     return this.zooKeeper;
2141   }
2142 
2143   public RegionServerCoprocessorHost getCoprocessorHost(){
2144     return this.rsHost;
2145   }
2146 
2147 
2148   public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2149     return this.regionsInTransitionInRS;
2150   }
2151 
2152   public ExecutorService getExecutorService() {
2153     return service;
2154   }
2155 
2156   //
2157   // Main program and support routines
2158   //
2159 
2160   /**
2161    * Load the replication service objects, if any
2162    */
2163   static private void createNewReplicationInstance(Configuration conf,
2164     HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
2165 
2166     // If replication is not enabled, then return immediately.
2167     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
2168       return;
2169     }
2170 
2171     // read in the name of the source replication class from the config file.
2172     String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2173                                HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2174 
2175     // read in the name of the sink replication class from the config file.
2176     String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2177                              HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2178 
2179     // If both the sink and the source class names are the same, then instantiate
2180     // only one object.
2181     if (sourceClassname.equals(sinkClassname)) {
2182       server.replicationSourceHandler = (ReplicationSourceService)
2183                                          newReplicationInstance(sourceClassname,
2184                                          conf, server, fs, logDir, oldLogDir);
2185       server.replicationSinkHandler = (ReplicationSinkService)
2186                                          server.replicationSourceHandler;
2187     }
2188     else {
2189       server.replicationSourceHandler = (ReplicationSourceService)
2190                                          newReplicationInstance(sourceClassname,
2191                                          conf, server, fs, logDir, oldLogDir);
2192       server.replicationSinkHandler = (ReplicationSinkService)
2193                                          newReplicationInstance(sinkClassname,
2194                                          conf, server, fs, logDir, oldLogDir);
2195     }
2196   }
2197 
2198   static private ReplicationService newReplicationInstance(String classname,
2199     Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
2200     Path oldLogDir) throws IOException{
2201 
2202     Class<?> clazz = null;
2203     try {
2204       ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2205       clazz = Class.forName(classname, true, classLoader);
2206     } catch (java.lang.ClassNotFoundException nfe) {
2207       throw new IOException("Cound not find class for " + classname);
2208     }
2209 
2210     // create an instance of the replication object.
2211     ReplicationService service = (ReplicationService)
2212                               ReflectionUtils.newInstance(clazz, conf);
2213     service.initialize(server, fs, logDir, oldLogDir);
2214     return service;
2215   }
2216 
2217   /**
2218    * @param hrs
2219    * @return Thread the RegionServer is running in correctly named.
2220    * @throws IOException
2221    */
2222   public static Thread startRegionServer(final HRegionServer hrs)
2223       throws IOException {
2224     return startRegionServer(hrs, "regionserver" + hrs.isa.getPort());
2225   }
2226 
2227   /**
2228    * @param hrs
2229    * @param name
2230    * @return Thread the RegionServer is running in correctly named.
2231    * @throws IOException
2232    */
2233   public static Thread startRegionServer(final HRegionServer hrs,
2234       final String name) throws IOException {
2235     Thread t = new Thread(hrs);
2236     t.setName(name);
2237     t.start();
2238     // Install shutdown hook that will catch signals and run an orderly shutdown
2239     // of the hrs.
2240     ShutdownHook.install(hrs.getConfiguration(), FileSystem.get(hrs
2241         .getConfiguration()), hrs, t);
2242     return t;
2243   }
2244 
2245   /**
2246    * Utility for constructing an instance of the passed HRegionServer class.
2247    *
2248    * @param regionServerClass
2249    * @param conf2
2250    * @return HRegionServer instance.
2251    */
2252   public static HRegionServer constructRegionServer(
2253       Class<? extends HRegionServer> regionServerClass,
2254       final Configuration conf2) {
2255     try {
2256       Constructor<? extends HRegionServer> c = regionServerClass
2257           .getConstructor(Configuration.class);
2258       return c.newInstance(conf2);
2259     } catch (Exception e) {
2260       throw new RuntimeException("Failed construction of " + "Regionserver: "
2261           + regionServerClass.toString(), e);
2262     }
2263   }
2264 
2265   /**
2266    * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2267    */
2268   public static void main(String[] args) throws Exception {
2269 	VersionInfo.logVersion();
2270     Configuration conf = HBaseConfiguration.create();
2271     @SuppressWarnings("unchecked")
2272     Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2273         .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2274 
2275     new HRegionServerCommandLine(regionServerClass).doMain(args);
2276   }
2277 
2278   /**
2279    * Gets the online regions of the specified table.
2280    * This method looks at the in-memory onlineRegions.  It does not go to <code>.META.</code>.
2281    * Only returns <em>online</em> regions.  If a region on this table has been
2282    * closed during a disable, etc., it will not be included in the returned list.
2283    * So, the returned list may not necessarily be ALL regions in this table, its
2284    * all the ONLINE regions in the table.
2285    * @param tableName
2286    * @return Online regions from <code>tableName</code>
2287    */
2288    public List<HRegion> getOnlineRegions(byte[] tableName) {
2289      List<HRegion> tableRegions = new ArrayList<HRegion>();
2290      synchronized (this.onlineRegions) {
2291        for (HRegion region: this.onlineRegions.values()) {
2292          HRegionInfo regionInfo = region.getRegionInfo();
2293          if(Bytes.equals(regionInfo.getTableName(), tableName)) {
2294            tableRegions.add(region);
2295          }
2296        }
2297      }
2298      return tableRegions;
2299    }
2300 
2301   // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
2302   public String[] getCoprocessors() {
2303     TreeSet<String> coprocessors = new TreeSet<String>(
2304         this.hlog.getCoprocessorHost().getCoprocessors());
2305     Collection<HRegion> regions = getOnlineRegionsLocalContext();
2306     for (HRegion region: regions) {
2307       coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2308     }
2309     return coprocessors.toArray(new String[coprocessors.size()]);
2310   }
2311 
2312   /**
2313    * Instantiated as a scanner lease. If the lease times out, the scanner is
2314    * closed
2315    */
2316   private class ScannerListener implements LeaseListener {
2317     private final String scannerName;
2318 
2319     ScannerListener(final String n) {
2320       this.scannerName = n;
2321     }
2322 
2323     public void leaseExpired() {
2324       RegionScannerHolder rsh = scanners.remove(this.scannerName);
2325       if (rsh != null) {
2326         RegionScanner s = rsh.s;
2327         LOG.info("Scanner " + this.scannerName + " lease expired on region "
2328             + s.getRegionInfo().getRegionNameAsString());
2329         try {
2330           HRegion region = getRegion(s.getRegionInfo().getRegionName());
2331           if (region != null && region.getCoprocessorHost() != null) {
2332             region.getCoprocessorHost().preScannerClose(s);
2333           }
2334 
2335           s.close();
2336           if (region != null && region.getCoprocessorHost() != null) {
2337             region.getCoprocessorHost().postScannerClose(s);
2338           }
2339         } catch (IOException e) {
2340           LOG.error("Closing scanner for "
2341               + s.getRegionInfo().getRegionNameAsString(), e);
2342         }
2343       } else {
2344         LOG.info("Scanner " + this.scannerName + " lease expired");
2345       }
2346     }
2347   }
2348 
2349   /**
2350    * Called to verify that this server is up and running.
2351    *
2352    * @throws IOException
2353    */
2354   protected void checkOpen() throws IOException {
2355     if (this.stopped || this.abortRequested) {
2356       throw new RegionServerStoppedException("Server " + getServerName() +
2357         " not running" + (this.abortRequested ? ", aborting" : ""));
2358     }
2359     if (!fsOk) {
2360       throw new RegionServerStoppedException("File system not available");
2361     }
2362   }
2363 
2364 
2365   /**
2366    * Try to close the region, logs a warning on failure but continues.
2367    * @param region Region to close
2368    */
2369   private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
2370     try {
2371       if (!closeRegion(region.getEncodedName(), abort, false, -1, null)) {
2372         LOG.warn("Failed to close " + region.getRegionNameAsString() +
2373             " - ignoring and continuing");
2374       }
2375     } catch (NotServingRegionException e) {
2376       LOG.warn("Failed to close " + region.getRegionNameAsString() +
2377           " - ignoring and continuing", e);
2378     }
2379   }
2380 
2381   /**
2382    * Close asynchronously a region, can be called from the master or internally by the regionserver
2383    * when stopping. If called from the master, the region will update the znode status.
2384    *
2385    * <p>
2386    * If an opening was in progress, this method will cancel it, but will not start a new close. The
2387    * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
2388    * </p>
2389 
2390    * <p>
2391    *   If a close was in progress, this new request will be ignored, and an exception thrown.
2392    * </p>
2393    *
2394    * @param encodedName Region to close
2395    * @param abort True if we are aborting
2396    * @param zk True if we are to update zk about the region close; if the close
2397    * was orchestrated by master, then update zk.  If the close is being run by
2398    * the regionserver because its going down, don't update zk.
2399    * @param versionOfClosingNode the version of znode to compare when RS transitions the znode from
2400    *   CLOSING state.
2401    * @return True if closed a region.
2402    * @throws NotServingRegionException if the region is not online or if a close
2403    * request in in progress.
2404    */
2405   protected boolean closeRegion(String encodedName, final boolean abort,
2406       final boolean zk, final int versionOfClosingNode, final ServerName sn)
2407       throws NotServingRegionException {
2408     //Check for permissions to close.
2409     final HRegion actualRegion = this.getFromOnlineRegions(encodedName);
2410     if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2411       try {
2412         actualRegion.getCoprocessorHost().preClose(false);
2413       } catch (IOException exp) {
2414         LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2415         return false;
2416       }
2417     }
2418 
2419     final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(encodedName.getBytes(),
2420         Boolean.FALSE);
2421 
2422     if (Boolean.TRUE.equals(previous)) {
2423       LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
2424           "trying to OPEN. Cancelling OPENING.");
2425       if (!regionsInTransitionInRS.replace(encodedName.getBytes(), previous, Boolean.FALSE)){
2426         // The replace failed. That should be an exceptional case, but theoretically it can happen.
2427         // We're going to try to do a standard close then.
2428         LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
2429             " Doing a standard close now");
2430         return closeRegion(encodedName, abort, zk, versionOfClosingNode, sn);
2431       } else {
2432         LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
2433         // The master deletes the znode when it receives this exception.
2434         throw new NotServingRegionException("The region " + encodedName +
2435             " was opening but not yet served. Opening is cancelled.");
2436       }
2437     } else if (Boolean.FALSE.equals(previous)) {
2438       LOG.info("Received CLOSE for the region: " + encodedName +
2439           " ,which we are already trying to CLOSE");
2440       // The master deletes the znode when it receives this exception.
2441       throw new NotServingRegionException("The region " + encodedName +
2442           " was already closing. New CLOSE request is ignored.");
2443     }
2444 
2445     if (actualRegion == null){
2446       LOG.error("Received CLOSE for a region which is not online, and we're not opening.");
2447       this.regionsInTransitionInRS.remove(encodedName.getBytes());
2448       // The master deletes the znode when it receives this exception.
2449       throw new NotServingRegionException("The region " + encodedName +
2450           " is not online, and is not opening.");
2451     }
2452 
2453     CloseRegionHandler crh;
2454     final HRegionInfo hri = actualRegion.getRegionInfo();
2455     if (hri.isMetaRegion()) {
2456       crh = new CloseMetaHandler(this, this, hri, abort, zk, versionOfClosingNode);
2457     } else {
2458       crh = new CloseRegionHandler(this, this, hri, abort, zk, versionOfClosingNode, sn);
2459     }
2460     this.service.submit(crh);
2461     return true;
2462   }
2463 
2464    /**
2465    * @param regionName
2466    * @return HRegion for the passed binary <code>regionName</code> or null if
2467    *         named region is not member of the online regions.
2468    */
2469   public HRegion getOnlineRegion(final byte[] regionName) {
2470     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2471     return this.onlineRegions.get(encodedRegionName);
2472   }
2473 
2474   public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) {
2475     return this.regionFavoredNodesMap.get(encodedRegionName);
2476   }
2477 
2478   @Override
2479   public HRegion getFromOnlineRegions(final String encodedRegionName) {
2480     return this.onlineRegions.get(encodedRegionName);
2481   }
2482 
2483 
2484   @Override
2485   public boolean removeFromOnlineRegions(final HRegion r, ServerName destination) {
2486     HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
2487 
2488     if (destination != null) {
2489       HLog wal = getWAL();
2490       long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes());
2491       if (closeSeqNum == HConstants.NO_SEQNUM) {
2492         // No edits in WAL for this region; get the sequence number when the region was opened.
2493         closeSeqNum = r.getOpenSeqNum();
2494         if (closeSeqNum == HConstants.NO_SEQNUM) {
2495           closeSeqNum = 0;
2496         }
2497       }
2498       addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
2499     }
2500     this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
2501     return toReturn != null;
2502   }
2503 
2504   /**
2505    * Protected utility method for safely obtaining an HRegion handle.
2506    *
2507    * @param regionName
2508    *          Name of online {@link HRegion} to return
2509    * @return {@link HRegion} for <code>regionName</code>
2510    * @throws NotServingRegionException
2511    */
2512   protected HRegion getRegion(final byte[] regionName)
2513       throws NotServingRegionException {
2514     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2515     return getRegionByEncodedName(encodedRegionName);
2516   }
2517 
2518   protected HRegion getRegionByEncodedName(String encodedRegionName)
2519     throws NotServingRegionException {
2520     HRegion region = this.onlineRegions.get(encodedRegionName);
2521     if (region == null) {
2522       MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
2523       if (moveInfo != null) {
2524         throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
2525       }
2526       Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
2527       if (isOpening != null && isOpening.booleanValue()) {
2528         throw new RegionOpeningException("Region is being opened: " + encodedRegionName);
2529       }
2530       throw new NotServingRegionException("Region is not online: " + encodedRegionName);
2531     }
2532     return region;
2533   }
2534 
2535   /*
2536    * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
2537    * IOE if it isn't already.
2538    *
2539    * @param t Throwable
2540    *
2541    * @return Throwable converted to an IOE; methods can only let out IOEs.
2542    */
2543   protected Throwable cleanup(final Throwable t) {
2544     return cleanup(t, null);
2545   }
2546 
2547   /*
2548    * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
2549    * IOE if it isn't already.
2550    *
2551    * @param t Throwable
2552    *
2553    * @param msg Message to log in error. Can be null.
2554    *
2555    * @return Throwable converted to an IOE; methods can only let out IOEs.
2556    */
2557   protected Throwable cleanup(final Throwable t, final String msg) {
2558     // Don't log as error if NSRE; NSRE is 'normal' operation.
2559     if (t instanceof NotServingRegionException) {
2560       LOG.debug("NotServingRegionException; " + t.getMessage());
2561       return t;
2562     }
2563     if (msg == null) {
2564       LOG.error("", RemoteExceptionHandler.checkThrowable(t));
2565     } else {
2566       LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
2567     }
2568     if (!checkOOME(t)) {
2569       checkFileSystem();
2570     }
2571     return t;
2572   }
2573 
2574   /*
2575    * @param t
2576    *
2577    * @return Make <code>t</code> an IOE if it isn't already.
2578    */
2579   protected IOException convertThrowableToIOE(final Throwable t) {
2580     return convertThrowableToIOE(t, null);
2581   }
2582 
2583   /*
2584    * @param t
2585    *
2586    * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
2587    *
2588    * @return Make <code>t</code> an IOE if it isn't already.
2589    */
2590   protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
2591     return (t instanceof IOException ? (IOException) t : msg == null
2592         || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
2593   }
2594 
2595   /*
2596    * Check if an OOME and, if so, abort immediately to avoid creating more objects.
2597    *
2598    * @param e
2599    *
2600    * @return True if we OOME'd and are aborting.
2601    */
2602   public boolean checkOOME(final Throwable e) {
2603     boolean stop = false;
2604     try {
2605       if (e instanceof OutOfMemoryError
2606           || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
2607           || (e.getMessage() != null && e.getMessage().contains(
2608               "java.lang.OutOfMemoryError"))) {
2609         stop = true;
2610         LOG.fatal(
2611           "Run out of memory; HRegionServer will abort itself immediately", e);
2612       }
2613     } finally {
2614       if (stop) {
2615         Runtime.getRuntime().halt(1);
2616       }
2617     }
2618     return stop;
2619   }
2620 
2621   /**
2622    * Checks to see if the file system is still accessible. If not, sets
2623    * abortRequested and stopRequested
2624    *
2625    * @return false if file system is not available
2626    */
2627   public boolean checkFileSystem() {
2628     if (this.fsOk && this.fs != null) {
2629       try {
2630         FSUtils.checkFileSystemAvailable(this.fs);
2631       } catch (IOException e) {
2632         abort("File System not available", e);
2633         this.fsOk = false;
2634       }
2635     }
2636     return this.fsOk;
2637   }
2638 
2639   protected long addScanner(RegionScanner s) throws LeaseStillHeldException {
2640     long scannerId = -1;
2641     while (true) {
2642       scannerId = rand.nextLong();
2643       if (scannerId == -1) continue;
2644       String scannerName = String.valueOf(scannerId);
2645       RegionScannerHolder existing = scanners.putIfAbsent(scannerName, new RegionScannerHolder(s));
2646       if (existing == null) {
2647         this.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
2648             new ScannerListener(scannerName));
2649         break;
2650       }
2651     }
2652     return scannerId;
2653   }
2654 
2655   /**
2656    * Generate a random positive long number
2657    *
2658    * @return a random positive long number
2659    */
2660   protected long nextLong() {
2661     long n = rand.nextLong();
2662     if (n == 0) {
2663       return nextLong();
2664     }
2665     if (n < 0) {
2666       n = -n;
2667     }
2668     return n;
2669   }
2670 
2671   // Start Client methods
2672 
2673   /**
2674    * Get data from a table.
2675    *
2676    * @param controller the RPC controller
2677    * @param request the get request
2678    * @throws ServiceException
2679    */
2680   @Override
2681   public GetResponse get(final RpcController controller,
2682       final GetRequest request) throws ServiceException {
2683     long before = EnvironmentEdgeManager.currentTimeMillis();
2684     try {
2685       requestCount.increment();
2686       HRegion region = getRegion(request.getRegion());
2687 
2688       GetResponse.Builder builder = GetResponse.newBuilder();
2689       ClientProtos.Get get = request.getGet();
2690       Boolean existence = null;
2691       Result r = null;
2692 
2693       if (request.getClosestRowBefore()) {
2694         if (get.getColumnCount() != 1) {
2695           throw new DoNotRetryIOException(
2696             "get ClosestRowBefore supports one and only one family now, not "
2697               + get.getColumnCount() + " families");
2698         }
2699         byte[] row = get.getRow().toByteArray();
2700         byte[] family = get.getColumn(0).getFamily().toByteArray();
2701         r = region.getClosestRowBefore(row, family);
2702       } else {
2703         Get clientGet = ProtobufUtil.toGet(get);
2704         if (request.getExistenceOnly() && region.getCoprocessorHost() != null) {
2705           existence = region.getCoprocessorHost().preExists(clientGet);
2706         }
2707         if (existence == null) {
2708           r = region.get(clientGet);
2709           if (request.getExistenceOnly()) {
2710             boolean exists = r != null && !r.isEmpty();
2711             if (region.getCoprocessorHost() != null) {
2712               exists = region.getCoprocessorHost().postExists(clientGet, exists);
2713             }
2714             existence = exists;
2715           }
2716         }
2717       }
2718       if (existence != null) {
2719         builder.setExists(existence.booleanValue());
2720       } else if (r != null) {
2721         builder.setResult(ProtobufUtil.toResult(r));
2722       }
2723       return builder.build();
2724     } catch (IOException ie) {
2725       throw new ServiceException(ie);
2726     } finally {
2727       metricsRegionServer.updateGet(EnvironmentEdgeManager.currentTimeMillis() - before);
2728     }
2729   }
2730 
2731   /**
2732    * Get multi data from a table.
2733    *
2734    * @param controller the RPC controller
2735    * @param request multi-the get request
2736    * @throws ServiceException
2737    */
2738   @Override
2739   public MultiGetResponse multiGet(final RpcController controller, final MultiGetRequest request)
2740       throws ServiceException {
2741     long before = EnvironmentEdgeManager.currentTimeMillis();
2742     try {
2743       requestCount.add(request.getGetCount());
2744       HRegion region = getRegion(request.getRegion());
2745       MultiGetResponse.Builder builder = MultiGetResponse.newBuilder();
2746       for (ClientProtos.Get get: request.getGetList())
2747       {
2748         Boolean existence = null;
2749         Result r = null;
2750         if (request.getClosestRowBefore()) {
2751           if (get.getColumnCount() != 1) {
2752             throw new DoNotRetryIOException(
2753               "get ClosestRowBefore supports one and only one family now, not "
2754                 + get.getColumnCount() + " families");
2755           }
2756           byte[] row = get.getRow().toByteArray();
2757           byte[] family = get.getColumn(0).getFamily().toByteArray();
2758           r = region.getClosestRowBefore(row, family);
2759         } else {
2760           Get clientGet = ProtobufUtil.toGet(get);
2761           if (request.getExistenceOnly() && region.getCoprocessorHost() != null) {
2762             existence = region.getCoprocessorHost().preExists(clientGet);
2763           }
2764           if (existence == null) {
2765             r = region.get(clientGet);
2766             if (request.getExistenceOnly()) {
2767               boolean exists = r != null && !r.isEmpty();
2768               if (region.getCoprocessorHost() != null) {
2769                 exists = region.getCoprocessorHost().postExists(clientGet, exists);
2770               }
2771               existence = exists;
2772             }
2773           }
2774         }
2775         if (existence != null) {
2776           builder.addExists(existence.booleanValue());
2777         } else if (r != null) {
2778           builder.addResult(ProtobufUtil.toResult(r));
2779         }
2780       }
2781       return builder.build();
2782     } catch (IOException ie) {
2783       throw new ServiceException(ie);
2784     } finally {
2785       metricsRegionServer.updateGet(EnvironmentEdgeManager.currentTimeMillis() - before);
2786     }
2787   }
2788 
2789   /**
2790    * Mutate data in a table.
2791    *
2792    * @param rpcc the RPC controller
2793    * @param request the mutate request
2794    * @throws ServiceException
2795    */
2796   @Override
2797   public MutateResponse mutate(final RpcController rpcc,
2798       final MutateRequest request) throws ServiceException {
2799     // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
2800     // It is also the conduit via which we pass back data.
2801     PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2802     CellScanner cellScanner = controller != null? controller.cellScanner(): null;
2803     // Clear scanner so we are not holding on to reference across call.
2804     controller.setCellScanner(null);
2805     try {
2806       requestCount.increment();
2807       HRegion region = getRegion(request.getRegion());
2808       MutateResponse.Builder builder = MutateResponse.newBuilder();
2809       MutationProto mutation = request.getMutation();
2810       if (!region.getRegionInfo().isMetaTable()) {
2811         cacheFlusher.reclaimMemStoreMemory();
2812       }
2813       Result r = null;
2814       Boolean processed = null;
2815       MutationType type = mutation.getMutateType();
2816       switch (type) {
2817       case APPEND:
2818         r = append(region, mutation, cellScanner);
2819         break;
2820       case INCREMENT:
2821         r = increment(region, mutation, cellScanner);
2822         break;
2823       case PUT:
2824         Put put = ProtobufUtil.toPut(mutation, cellScanner);
2825         if (request.hasCondition()) {
2826           Condition condition = request.getCondition();
2827           byte[] row = condition.getRow().toByteArray();
2828           byte[] family = condition.getFamily().toByteArray();
2829           byte[] qualifier = condition.getQualifier().toByteArray();
2830           CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2831           ByteArrayComparable comparator =
2832             ProtobufUtil.toComparator(condition.getComparator());
2833           if (region.getCoprocessorHost() != null) {
2834             processed = region.getCoprocessorHost().preCheckAndPut(
2835               row, family, qualifier, compareOp, comparator, put);
2836           }
2837           if (processed == null) {
2838             boolean result = region.checkAndMutate(row, family,
2839               qualifier, compareOp, comparator, put, true);
2840             if (region.getCoprocessorHost() != null) {
2841               result = region.getCoprocessorHost().postCheckAndPut(row, family,
2842                 qualifier, compareOp, comparator, put, result);
2843             }
2844             processed = result;
2845           }
2846         } else {
2847           region.put(put);
2848           processed = Boolean.TRUE;
2849         }
2850         break;
2851       case DELETE:
2852         Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
2853         if (request.hasCondition()) {
2854           Condition condition = request.getCondition();
2855           byte[] row = condition.getRow().toByteArray();
2856           byte[] family = condition.getFamily().toByteArray();
2857           byte[] qualifier = condition.getQualifier().toByteArray();
2858           CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2859           ByteArrayComparable comparator =
2860             ProtobufUtil.toComparator(condition.getComparator());
2861           if (region.getCoprocessorHost() != null) {
2862             processed = region.getCoprocessorHost().preCheckAndDelete(
2863               row, family, qualifier, compareOp, comparator, delete);
2864           }
2865           if (processed == null) {
2866             boolean result = region.checkAndMutate(row, family,
2867               qualifier, compareOp, comparator, delete, true);
2868             if (region.getCoprocessorHost() != null) {
2869               result = region.getCoprocessorHost().postCheckAndDelete(row, family,
2870                 qualifier, compareOp, comparator, delete, result);
2871             }
2872             processed = result;
2873           }
2874         } else {
2875           region.delete(delete);
2876           processed = Boolean.TRUE;
2877         }
2878         break;
2879         default:
2880           throw new DoNotRetryIOException(
2881             "Unsupported mutate type: " + type.name());
2882       }
2883       CellScannable cellsToReturn = null;
2884       if (processed != null) {
2885         builder.setProcessed(processed.booleanValue());
2886       } else if (r != null) {
2887         builder.setResult(ProtobufUtil.toResultNoData(r));
2888         cellsToReturn = r;
2889       }
2890       if (cellsToReturn != null) {
2891         controller.setCellScanner(cellsToReturn.cellScanner());
2892       }
2893       return builder.build();
2894     } catch (IOException ie) {
2895       checkFileSystem();
2896       throw new ServiceException(ie);
2897     }
2898   }
2899 
2900   //
2901   // remote scanner interface
2902   //
2903 
2904   /**
2905    * Scan data in a table.
2906    *
2907    * @param controller the RPC controller
2908    * @param request the scan request
2909    * @throws ServiceException
2910    */
2911   @Override
2912   public ScanResponse scan(final RpcController controller,
2913       final ScanRequest request) throws ServiceException {
2914     Leases.Lease lease = null;
2915     String scannerName = null;
2916     try {
2917       if (!request.hasScannerId() && !request.hasScan()) {
2918         throw new DoNotRetryIOException(
2919           "Missing required input: scannerId or scan");
2920       }
2921       long scannerId = -1;
2922       if (request.hasScannerId()) {
2923         scannerId = request.getScannerId();
2924         scannerName = String.valueOf(scannerId);
2925       }
2926       try {
2927         checkOpen();
2928       } catch (IOException e) {
2929         // If checkOpen failed, server not running or filesystem gone,
2930         // cancel this lease; filesystem is gone or we're closing or something.
2931         if (scannerName != null) {
2932           try {
2933             leases.cancelLease(scannerName);
2934           } catch (LeaseException le) {
2935             LOG.info("Server shutting down and client tried to access missing scanner " +
2936               scannerName);
2937           }
2938         }
2939         throw e;
2940       }
2941       requestCount.increment();
2942 
2943       try {
2944         int ttl = 0;
2945         HRegion region = null;
2946         RegionScanner scanner = null;
2947         RegionScannerHolder rsh = null;
2948         boolean moreResults = true;
2949         boolean closeScanner = false;
2950         Long resultsWireSize = null;
2951         ScanResponse.Builder builder = ScanResponse.newBuilder();
2952         if (request.hasCloseScanner()) {
2953           closeScanner = request.getCloseScanner();
2954         }
2955         int rows = 1;
2956         if (request.hasNumberOfRows()) {
2957           rows = request.getNumberOfRows();
2958         }
2959         if (request.hasScannerId()) {
2960           rsh = scanners.get(scannerName);
2961           if (rsh == null) {
2962             throw new UnknownScannerException(
2963               "Name: " + scannerName + ", already closed?");
2964           }
2965           scanner = rsh.s;
2966           region = getRegion(scanner.getRegionInfo().getRegionName());
2967         } else {
2968           region = getRegion(request.getRegion());
2969           ClientProtos.Scan protoScan = request.getScan();
2970           boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
2971           Scan scan = ProtobufUtil.toScan(protoScan);
2972           // if the request doesn't set this, get the default region setting.
2973           if (!isLoadingCfsOnDemandSet) {
2974             scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2975           }
2976           byte[] hasMetrics = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
2977           resultsWireSize = (hasMetrics != null && Bytes.toBoolean(hasMetrics)) ? 0L : null;
2978           region.prepareScanner(scan);
2979           if (region.getCoprocessorHost() != null) {
2980             scanner = region.getCoprocessorHost().preScannerOpen(scan);
2981           }
2982           if (scanner == null) {
2983             scanner = region.getScanner(scan);
2984           }
2985           if (region.getCoprocessorHost() != null) {
2986             scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
2987           }
2988           scannerId = addScanner(scanner);
2989           scannerName = String.valueOf(scannerId);
2990           ttl = this.scannerLeaseTimeoutPeriod;
2991         }
2992 
2993         if (rows > 0) {
2994           // if nextCallSeq does not match throw Exception straight away. This needs to be
2995           // performed even before checking of Lease.
2996           // See HBASE-5974
2997           if (request.hasNextCallSeq()) {
2998             if (rsh == null) {
2999               rsh = scanners.get(scannerName);
3000             }
3001             if (rsh != null) {
3002               if (request.getNextCallSeq() != rsh.nextCallSeq) {
3003                 throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
3004                   + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
3005                   "; request=" + TextFormat.shortDebugString(request));
3006               }
3007               // Increment the nextCallSeq value which is the next expected from client.
3008               rsh.nextCallSeq++;
3009             }
3010           }
3011           try {
3012             // Remove lease while its being processed in server; protects against case
3013             // where processing of request takes > lease expiration time.
3014             lease = leases.removeLease(scannerName);
3015             List<Result> results = new ArrayList<Result>(rows);
3016             long currentScanResultSize = 0;
3017 
3018             boolean done = false;
3019             // Call coprocessor. Get region info from scanner.
3020             if (region != null && region.getCoprocessorHost() != null) {
3021               Boolean bypass = region.getCoprocessorHost().preScannerNext(
3022                 scanner, results, rows);
3023               if (!results.isEmpty()) {
3024                 for (Result r : results) {
3025                   if (maxScannerResultSize < Long.MAX_VALUE){
3026                     for (KeyValue kv : r.raw()) {
3027                       currentScanResultSize += kv.heapSize();
3028                     }
3029                   }
3030                 }
3031               }
3032               if (bypass != null && bypass.booleanValue()) {
3033                 done = true;
3034               }
3035             }
3036 
3037             if (!done) {
3038               long maxResultSize = scanner.getMaxResultSize();
3039               if (maxResultSize <= 0) {
3040                 maxResultSize = maxScannerResultSize;
3041               }
3042               List<KeyValue> values = new ArrayList<KeyValue>();
3043               MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
3044               region.startRegionOperation(Operation.SCAN);
3045               try {
3046                 int i = 0;
3047                 synchronized(scanner) {
3048                   for (; i < rows
3049                       && currentScanResultSize < maxResultSize; i++) {
3050                     // Collect values to be returned here
3051                     boolean moreRows = scanner.nextRaw(values);
3052                     if (!values.isEmpty()) {
3053                       if (maxScannerResultSize < Long.MAX_VALUE){
3054                         for (KeyValue kv : values) {
3055                           currentScanResultSize += kv.heapSize();
3056                         }
3057                       }
3058                       results.add(new Result(values));
3059                     }
3060                     if (!moreRows) {
3061                       break;
3062                     }
3063                     values.clear();
3064                   }
3065                 }
3066                 region.readRequestsCount.add(i);
3067               } finally {
3068                 region.closeRegionOperation();
3069               }
3070 
3071               // coprocessor postNext hook
3072               if (region != null && region.getCoprocessorHost() != null) {
3073                 region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
3074               }
3075             }
3076 
3077             // If the scanner's filter - if any - is done with the scan
3078             // and wants to tell the client to stop the scan. This is done by passing
3079             // a null result, and setting moreResults to false.
3080             if (scanner.isFilterDone() && results.isEmpty()) {
3081               moreResults = false;
3082               results = null;
3083             } else {
3084               for (Result result: results) {
3085                 if (result != null) {
3086                   ClientProtos.Result pbResult = ProtobufUtil.toResult(result);
3087                   if (resultsWireSize != null) {
3088                     resultsWireSize += pbResult.getSerializedSize();
3089                   }
3090                   builder.addResult(pbResult);
3091                 }
3092               }
3093               if (resultsWireSize != null) {
3094                 builder.setResultSizeBytes(resultsWireSize.longValue());
3095               }
3096             }
3097           } finally {
3098             // We're done. On way out re-add the above removed lease.
3099             // Adding resets expiration time on lease.
3100             if (scanners.containsKey(scannerName)) {
3101               if (lease != null) leases.addLease(lease);
3102               ttl = this.scannerLeaseTimeoutPeriod;
3103             }
3104           }
3105         }
3106 
3107         if (!moreResults || closeScanner) {
3108           ttl = 0;
3109           moreResults = false;
3110           if (region != null && region.getCoprocessorHost() != null) {
3111             if (region.getCoprocessorHost().preScannerClose(scanner)) {
3112               return builder.build(); // bypass
3113             }
3114           }
3115           rsh = scanners.remove(scannerName);
3116           if (rsh != null) {
3117             scanner = rsh.s;
3118             scanner.close();
3119             leases.cancelLease(scannerName);
3120             if (region != null && region.getCoprocessorHost() != null) {
3121               region.getCoprocessorHost().postScannerClose(scanner);
3122             }
3123           }
3124         }
3125 
3126         if (ttl > 0) {
3127           builder.setTtl(ttl);
3128         }
3129         builder.setScannerId(scannerId);
3130         builder.setMoreResults(moreResults);
3131         return builder.build();
3132       } catch (Throwable t) {
3133         if (scannerName != null && t instanceof NotServingRegionException) {
3134           scanners.remove(scannerName);
3135         }
3136         throw convertThrowableToIOE(cleanup(t));
3137       }
3138     } catch (IOException ie) {
3139       throw new ServiceException(ie);
3140     }
3141   }
3142 
3143   /**
3144    * Atomically bulk load several HFiles into an open region
3145    * @return true if successful, false is failed but recoverably (no action)
3146    * @throws IOException if failed unrecoverably
3147    */
3148   @Override
3149   public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
3150       final BulkLoadHFileRequest request) throws ServiceException {
3151     try {
3152       requestCount.increment();
3153       HRegion region = getRegion(request.getRegion());
3154       List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
3155       for (FamilyPath familyPath: request.getFamilyPathList()) {
3156         familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
3157           familyPath.getPath()));
3158       }
3159       boolean bypass = false;
3160       if (region.getCoprocessorHost() != null) {
3161         bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
3162       }
3163       boolean loaded = false;
3164       if (!bypass) {
3165         loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum());
3166       }
3167       if (region.getCoprocessorHost() != null) {
3168         loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
3169       }
3170       BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
3171       builder.setLoaded(loaded);
3172       return builder.build();
3173     } catch (IOException ie) {
3174       throw new ServiceException(ie);
3175     }
3176   }
3177 
3178   @Override
3179   public CoprocessorServiceResponse execService(final RpcController controller,
3180       final CoprocessorServiceRequest request) throws ServiceException {
3181     try {
3182       requestCount.increment();
3183       HRegion region = getRegion(request.getRegion());
3184       // ignore the passed in controller (from the serialized call)
3185       ServerRpcController execController = new ServerRpcController();
3186       Message result = region.execService(execController, request.getCall());
3187       if (execController.getFailedOn() != null) {
3188         throw execController.getFailedOn();
3189       }
3190       CoprocessorServiceResponse.Builder builder =
3191           CoprocessorServiceResponse.newBuilder();
3192       builder.setRegion(RequestConverter.buildRegionSpecifier(
3193           RegionSpecifierType.REGION_NAME, region.getRegionName()));
3194       builder.setValue(
3195           builder.getValueBuilder().setName(result.getClass().getName())
3196               .setValue(result.toByteString()));
3197       return builder.build();
3198     } catch (IOException ie) {
3199       throw new ServiceException(ie);
3200     }
3201   }
3202 
3203   /**
3204    * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
3205    *
3206    * @param rpcc the RPC controller
3207    * @param request the multi request
3208    * @throws ServiceException
3209    */
3210   @Override
3211   public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
3212   throws ServiceException {
3213     // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
3214     // It is also the conduit via which we pass back data.
3215     PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
3216     CellScanner cellScanner = controller != null? controller.cellScanner(): null;
3217     // Clear scanner so we are not holding on to reference across call.
3218     controller.setCellScanner(null);
3219     List<CellScannable> cellsToReturn = null;
3220     try {
3221       HRegion region = getRegion(request.getRegion());
3222       MultiResponse.Builder builder = MultiResponse.newBuilder();
3223       List<MutationProto> mutations = new ArrayList<MutationProto>(request.getActionCount());
3224       // Do a bunch of mutations atomically.  Mutations are Puts and Deletes.  NOT Gets.
3225       if (request.hasAtomic() && request.getAtomic()) {
3226         // MultiAction is union type.  Has a Get or a Mutate.
3227         for (ClientProtos.MultiAction actionUnion : request.getActionList()) {
3228           if (actionUnion.hasMutation()) {
3229             mutations.add(actionUnion.getMutation());
3230           } else {
3231             throw new DoNotRetryIOException("Unsupported atomic action type: " + actionUnion);
3232           }
3233         }
3234         // TODO: We are not updating a metric here.  Should we up requestCount?
3235         if (!mutations.isEmpty()) mutateRows(region, mutations, cellScanner);
3236       } else {
3237         // Do a bunch of Actions.
3238         ActionResult.Builder resultBuilder = null;
3239         cellsToReturn = new ArrayList<CellScannable>(request.getActionCount());
3240         for (ClientProtos.MultiAction actionUnion : request.getActionList()) {
3241           this.requestCount.increment();
3242           ClientProtos.Result result = null;
3243           try {
3244             if (actionUnion.hasGet()) {
3245               Get get = ProtobufUtil.toGet(actionUnion.getGet());
3246               Result r = region.get(get);
3247               if (r != null) {
3248                 // Get a result with no data.  The data will be carried alongside pbs, not as pbs.
3249                 result = ProtobufUtil.toResultNoData(r);
3250                 // Add the Result to controller so it gets serialized apart from pb.  Get
3251                 // Results could be big so good if they are not serialized as pb.
3252                 cellsToReturn.add(r);
3253               }
3254             } else if (actionUnion.hasMutation()) {
3255               MutationProto mutation = actionUnion.getMutation();
3256               MutationType type = mutation.getMutateType();
3257               if (type != MutationType.PUT && type != MutationType.DELETE) {
3258                 if (!mutations.isEmpty()) {
3259                   doBatchOp(builder, region, mutations, cellScanner);
3260                   mutations.clear();
3261                 } else if (!region.getRegionInfo().isMetaTable()) {
3262                   cacheFlusher.reclaimMemStoreMemory();
3263                 }
3264               }
3265               Result r = null;
3266               switch (type) {
3267               case APPEND:
3268                 r = append(region, mutation, cellScanner);
3269                 break;
3270               case INCREMENT:
3271                 r = increment(region, mutation, cellScanner);
3272                 break;
3273               case PUT:
3274               case DELETE:
3275                 mutations.add(mutation);
3276                 break;
3277               default:
3278                 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
3279               }
3280               if (r != null) {
3281                 // Put the data into the cellsToReturn and the metadata about the result is all that
3282                 // we will pass back in the protobuf result.
3283                 result = ProtobufUtil.toResultNoData(r);
3284                 cellsToReturn.add(r);
3285               }
3286             } else {
3287               LOG.warn("Error: invalid action: " + actionUnion + ". "
3288                 + "it must be a Get, Mutate, or Exec.");
3289               throw new DoNotRetryIOException("Invalid action, "
3290                 + "it must be a Get, Mutate, or Exec.");
3291             }
3292             if (result != null) {
3293               if (resultBuilder == null) {
3294                 resultBuilder = ActionResult.newBuilder();
3295               } else {
3296                 resultBuilder.clear();
3297               }
3298               resultBuilder.setValue(result);
3299               builder.addResult(resultBuilder.build());
3300             }
3301           } catch (IOException ie) {
3302             builder.addResult(ResponseConverter.buildActionResult(ie));
3303           }
3304         }
3305         if (!mutations.isEmpty()) {
3306           doBatchOp(builder, region, mutations, cellScanner);
3307         }
3308       }
3309       // Load the controller with the Cells to return.
3310       if (cellsToReturn != null && !cellsToReturn.isEmpty()) {
3311         controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
3312       }
3313       return builder.build();
3314     } catch (IOException ie) {
3315       throw new ServiceException(ie);
3316     }
3317   }
3318 
3319 // End Client methods
3320 // Start Admin methods
3321 
3322   @Override
3323   @QosPriority(priority=HConstants.HIGH_QOS)
3324   public GetRegionInfoResponse getRegionInfo(final RpcController controller,
3325       final GetRegionInfoRequest request) throws ServiceException {
3326     try {
3327       checkOpen();
3328       requestCount.increment();
3329       HRegion region = getRegion(request.getRegion());
3330       HRegionInfo info = region.getRegionInfo();
3331       GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
3332       builder.setRegionInfo(HRegionInfo.convert(info));
3333       if (request.hasCompactionState() && request.getCompactionState()) {
3334         builder.setCompactionState(region.getCompactionState());
3335       }
3336       return builder.build();
3337     } catch (IOException ie) {
3338       throw new ServiceException(ie);
3339     }
3340   }
3341 
3342   @Override
3343   public GetStoreFileResponse getStoreFile(final RpcController controller,
3344       final GetStoreFileRequest request) throws ServiceException {
3345     try {
3346       HRegion region = getRegion(request.getRegion());
3347       requestCount.increment();
3348       Set<byte[]> columnFamilies;
3349       if (request.getFamilyCount() == 0) {
3350         columnFamilies = region.getStores().keySet();
3351       } else {
3352         columnFamilies = new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
3353         for (ByteString cf: request.getFamilyList()) {
3354           columnFamilies.add(cf.toByteArray());
3355         }
3356       }
3357       int nCF = columnFamilies.size();
3358       List<String>  fileList = region.getStoreFileList(
3359         columnFamilies.toArray(new byte[nCF][]));
3360       GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
3361       builder.addAllStoreFile(fileList);
3362       return builder.build();
3363     } catch (IOException ie) {
3364       throw new ServiceException(ie);
3365     }
3366   }
3367 
3368   @Override
3369   @QosPriority(priority=HConstants.HIGH_QOS)
3370   public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
3371       final GetOnlineRegionRequest request) throws ServiceException {
3372     try {
3373       checkOpen();
3374       requestCount.increment();
3375       List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
3376       for (HRegion region: this.onlineRegions.values()) {
3377         list.add(region.getRegionInfo());
3378       }
3379       Collections.sort(list);
3380       return ResponseConverter.buildGetOnlineRegionResponse(list);
3381     } catch (IOException ie) {
3382       throw new ServiceException(ie);
3383     }
3384   }
3385 
3386 
3387   // Region open/close direct RPCs
3388 
3389   /**
3390    * Open asynchronously a region or a set of regions on the region server.
3391    *
3392    * The opening is coordinated by ZooKeeper, and this method requires the znode to be created
3393    *  before being called. As a consequence, this method should be called only from the master.
3394    * <p>
3395    * Different manages states for the region are:<ul>
3396    *  <li>region not opened: the region opening will start asynchronously.</li>
3397    *  <li>a close is already in progress: this is considered as an error.</li>
3398    *  <li>an open is already in progress: this new open request will be ignored. This is important
3399    *  because the Master can do multiple requests if it crashes.</li>
3400    *  <li>the region is already opened:  this new open request will be ignored./li>
3401    *  </ul>
3402    * </p>
3403    * <p>
3404    * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
3405    * For a single region opening, errors are sent through a ServiceException. For bulk assign,
3406    * errors are put in the response as FAILED_OPENING.
3407    * </p>
3408    * @param controller the RPC controller
3409    * @param request the request
3410    * @throws ServiceException
3411    */
3412   @Override
3413   @QosPriority(priority=HConstants.HIGH_QOS)
3414   public OpenRegionResponse openRegion(final RpcController controller,
3415       final OpenRegionRequest request) throws ServiceException {
3416     try {
3417       checkOpen();
3418     } catch (IOException ie) {
3419       throw new ServiceException(ie);
3420     }
3421     requestCount.increment();
3422     OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
3423     final int regionCount = request.getOpenInfoCount();
3424     final Map<String, HTableDescriptor> htds = new HashMap<String, HTableDescriptor>(regionCount);
3425     final boolean isBulkAssign = regionCount > 1;
3426     for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
3427       final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
3428 
3429       int versionOfOfflineNode = -1;
3430       if (regionOpenInfo.hasVersionOfOfflineNode()) {
3431         versionOfOfflineNode = regionOpenInfo.getVersionOfOfflineNode();
3432       }
3433       HTableDescriptor htd;
3434       try {
3435         final HRegion onlineRegion = getFromOnlineRegions(region.getEncodedName());
3436         if (onlineRegion != null) {
3437           //Check if the region can actually be opened.
3438           if (onlineRegion.getCoprocessorHost() != null) {
3439             onlineRegion.getCoprocessorHost().preOpen();
3440           }
3441           // See HBASE-5094. Cross check with META if still this RS is owning
3442           // the region.
3443           Pair<HRegionInfo, ServerName> p = MetaReader.getRegion(
3444               this.catalogTracker, region.getRegionName());
3445           if (this.getServerName().equals(p.getSecond())) {
3446             LOG.warn("Attempted open of " + region.getEncodedName()
3447                 + " but already online on this server");
3448             builder.addOpeningState(RegionOpeningState.ALREADY_OPENED);
3449             continue;
3450           } else {
3451             LOG.warn("The region " + region.getEncodedName() + " is online on this server" +
3452                 " but META does not have this server - continue opening.");
3453             removeFromOnlineRegions(onlineRegion, null);
3454           }
3455         }
3456         LOG.info("Received request to open region: " + region.getRegionNameAsString() + " on "
3457             + this.serverNameFromMasterPOV);
3458         htd = htds.get(region.getTableNameAsString());
3459         if (htd == null) {
3460           htd = this.tableDescriptors.get(region.getTableName());
3461           htds.put(region.getTableNameAsString(), htd);
3462         }
3463 
3464         final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(
3465             region.getEncodedNameAsBytes(), Boolean.TRUE);
3466 
3467         if (Boolean.FALSE.equals(previous)) {
3468           // There is a close in progress. We need to mark this open as failed in ZK.
3469           OpenRegionHandler.
3470               tryTransitionFromOfflineToFailedOpen(this, region, versionOfOfflineNode);
3471 
3472           throw new RegionAlreadyInTransitionException("Received OPEN for the region:" +
3473               region.getRegionNameAsString() + " , which we are already trying to CLOSE ");
3474         }
3475 
3476         if (Boolean.TRUE.equals(previous)) {
3477           // An open is in progress. This is supported, but let's log this.
3478           LOG.info("Receiving OPEN for the region:" +
3479               region.getRegionNameAsString() + " , which we are already trying to OPEN" +
3480               " - ignoring this new request for this region.");
3481         }
3482 
3483         // We are opening this region. If it moves back and forth for whatever reason, we don't
3484         // want to keep returning the stale moved record while we are opening/if we close again.
3485         removeFromMovedRegions(region.getEncodedName());
3486 
3487         if (previous == null) {
3488           // check if the region to be opened is marked in recovering state in ZK
3489           if (isRegionMarkedRecoveringInZK(region.getEncodedName())) {
3490             this.recoveringRegions.put(region.getEncodedName(), null);
3491           }
3492           // If there is no action in progress, we can submit a specific handler.
3493           // Need to pass the expected version in the constructor.
3494           if (region.isMetaRegion()) {
3495             this.service.submit(new OpenMetaHandler(this, this, region, htd,
3496                 versionOfOfflineNode));
3497           } else {
3498             updateRegionFavoredNodesMapping(region.getEncodedName(),
3499                 regionOpenInfo.getFavoredNodesList());
3500             this.service.submit(new OpenRegionHandler(this, this, region, htd,
3501                 versionOfOfflineNode));
3502           }
3503         }
3504 
3505         builder.addOpeningState(RegionOpeningState.OPENED);
3506 
3507       } catch (KeeperException zooKeeperEx) {
3508         LOG.error("Can't retrieve recovering state from zookeeper", zooKeeperEx);
3509         throw new ServiceException(zooKeeperEx);
3510       } catch (IOException ie) {
3511         LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
3512         if (isBulkAssign) {
3513           builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
3514         } else {
3515           throw new ServiceException(ie);
3516         }
3517       }
3518     }
3519 
3520     return builder.build();
3521   }
3522 
3523   private void updateRegionFavoredNodesMapping(String encodedRegionName,
3524       List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3525     InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
3526     // Refer to the comment on the declaration of regionFavoredNodesMap on why
3527     // it is a map of region name to InetSocketAddress[]
3528     for (int i = 0; i < favoredNodes.size(); i++) {
3529       addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
3530           favoredNodes.get(i).getPort());
3531     }
3532     regionFavoredNodesMap.put(encodedRegionName, addr);
3533   }
3534 
3535   /**
3536    * Return the favored nodes for a region given its encoded name. Look at the
3537    * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[]
3538    * @param encodedRegionName
3539    * @return array of favored locations
3540    */
3541   public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3542     return regionFavoredNodesMap.get(encodedRegionName);
3543   }
3544 
3545   /**
3546    * Close a region on the region server.
3547    *
3548    * @param controller the RPC controller
3549    * @param request the request
3550    * @throws ServiceException
3551    */
3552   @Override
3553   @QosPriority(priority=HConstants.HIGH_QOS)
3554   public CloseRegionResponse closeRegion(final RpcController controller,
3555       final CloseRegionRequest request) throws ServiceException {
3556     int versionOfClosingNode = -1;
3557     if (request.hasVersionOfClosingNode()) {
3558       versionOfClosingNode = request.getVersionOfClosingNode();
3559     }
3560     boolean zk = request.getTransitionInZK();
3561     final ServerName sn = (request.hasDestinationServer() ?
3562       ProtobufUtil.toServerName(request.getDestinationServer()) : null);
3563 
3564     try {
3565       checkOpen();
3566       final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
3567 
3568       // Can be null if we're calling close on a region that's not online
3569       final HRegion region = this.getFromOnlineRegions(encodedRegionName);
3570       if ((region  != null) && (region .getCoprocessorHost() != null)) {
3571         region.getCoprocessorHost().preClose(false);
3572       }
3573 
3574       requestCount.increment();
3575       LOG.info("Received close region: " + encodedRegionName +
3576           "Transitioning in ZK: " + (zk ? "yes" : "no") +
3577           ". Version of ZK closing node:" + versionOfClosingNode +
3578         ". Destination server:" + sn);
3579 
3580       boolean closed = closeRegion(encodedRegionName, false, zk, versionOfClosingNode, sn);
3581       CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
3582       return builder.build();
3583     } catch (IOException ie) {
3584       throw new ServiceException(ie);
3585     }
3586   }
3587 
3588   /**
3589    * Flush a region on the region server.
3590    *
3591    * @param controller the RPC controller
3592    * @param request the request
3593    * @throws ServiceException
3594    */
3595   @Override
3596   @QosPriority(priority=HConstants.HIGH_QOS)
3597   public FlushRegionResponse flushRegion(final RpcController controller,
3598       final FlushRegionRequest request) throws ServiceException {
3599     try {
3600       checkOpen();
3601       requestCount.increment();
3602       HRegion region = getRegion(request.getRegion());
3603       LOG.info("Flushing " + region.getRegionNameAsString());
3604       boolean shouldFlush = true;
3605       if (request.hasIfOlderThanTs()) {
3606         shouldFlush = region.getLastFlushTime() < request.getIfOlderThanTs();
3607       }
3608       FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
3609       if (shouldFlush) {
3610         builder.setFlushed(region.flushcache());
3611       }
3612       builder.setLastFlushTime(region.getLastFlushTime());
3613       return builder.build();
3614     } catch (IOException ie) {
3615       throw new ServiceException(ie);
3616     }
3617   }
3618 
3619   /**
3620    * Split a region on the region server.
3621    *
3622    * @param controller the RPC controller
3623    * @param request the request
3624    * @throws ServiceException
3625    */
3626   @Override
3627   @QosPriority(priority=HConstants.HIGH_QOS)
3628   public SplitRegionResponse splitRegion(final RpcController controller,
3629       final SplitRegionRequest request) throws ServiceException {
3630     try {
3631       checkOpen();
3632       requestCount.increment();
3633       HRegion region = getRegion(request.getRegion());
3634       region.startRegionOperation(Operation.SPLIT_REGION);
3635       LOG.info("Splitting " + region.getRegionNameAsString());
3636       region.flushcache();
3637       byte[] splitPoint = null;
3638       if (request.hasSplitPoint()) {
3639         splitPoint = request.getSplitPoint().toByteArray();
3640       }
3641       region.forceSplit(splitPoint);
3642       compactSplitThread.requestSplit(region, region.checkSplit());
3643       return SplitRegionResponse.newBuilder().build();
3644     } catch (IOException ie) {
3645       throw new ServiceException(ie);
3646     }
3647   }
3648 
3649   /**
3650    * Merge regions on the region server.
3651    *
3652    * @param controller the RPC controller
3653    * @param request the request
3654    * @return merge regions response
3655    * @throws ServiceException
3656    */
3657   @Override
3658   @QosPriority(priority = HConstants.HIGH_QOS)
3659   public MergeRegionsResponse mergeRegions(final RpcController controller,
3660       final MergeRegionsRequest request) throws ServiceException {
3661     try {
3662       checkOpen();
3663       requestCount.increment();
3664       HRegion regionA = getRegion(request.getRegionA());
3665       HRegion regionB = getRegion(request.getRegionB());
3666       boolean forcible = request.getForcible();
3667       regionA.startRegionOperation(Operation.MERGE_REGION);
3668       regionB.startRegionOperation(Operation.MERGE_REGION);
3669       LOG.info("Receiving merging request for  " + regionA + ", " + regionB
3670           + ",forcible=" + forcible);
3671       regionA.flushcache();
3672       regionB.flushcache();
3673       compactSplitThread.requestRegionsMerge(regionA, regionB, forcible);
3674       return MergeRegionsResponse.newBuilder().build();
3675     } catch (IOException ie) {
3676       throw new ServiceException(ie);
3677     }
3678   }
3679 
3680   /**
3681    * Compact a region on the region server.
3682    *
3683    * @param controller the RPC controller
3684    * @param request the request
3685    * @throws ServiceException
3686    */
3687   @Override
3688   @QosPriority(priority=HConstants.HIGH_QOS)
3689   public CompactRegionResponse compactRegion(final RpcController controller,
3690       final CompactRegionRequest request) throws ServiceException {
3691     try {
3692       checkOpen();
3693       requestCount.increment();
3694       HRegion region = getRegion(request.getRegion());
3695       LOG.info("Compacting " + region.getRegionNameAsString());
3696       boolean major = false;
3697       byte [] family = null;
3698       Store store = null;
3699       if (request.hasFamily()) {
3700         family = request.getFamily().toByteArray();
3701         store = region.getStore(family);
3702         if (store == null) {
3703           throw new ServiceException(new IOException("column family " + Bytes.toString(family) +
3704             " does not exist in region " + region.getRegionNameAsString()));
3705         }
3706       }
3707       if (request.hasMajor()) {
3708         major = request.getMajor();
3709       }
3710       if (major) {
3711         if (family != null) {
3712           store.triggerMajorCompaction();
3713         } else {
3714           region.triggerMajorCompaction();
3715         }
3716       }
3717 
3718       String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):"";
3719       LOG.trace("User-triggered compaction requested for region " +
3720         region.getRegionNameAsString() + familyLogMsg);
3721       String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
3722       if(family != null) {
3723         compactSplitThread.requestCompaction(region, store, log,
3724           Store.PRIORITY_USER, null);
3725       } else {
3726         compactSplitThread.requestCompaction(region, log,
3727           Store.PRIORITY_USER, null);
3728       }
3729       return CompactRegionResponse.newBuilder().build();
3730     } catch (IOException ie) {
3731       throw new ServiceException(ie);
3732     }
3733   }
3734 
3735   /**
3736    * Replicate WAL entries on the region server.
3737    *
3738    * @param controller the RPC controller
3739    * @param request the request
3740    * @throws ServiceException
3741    */
3742   @Override
3743   @QosPriority(priority=HConstants.REPLICATION_QOS)
3744   public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
3745       final ReplicateWALEntryRequest request) throws ServiceException {
3746     try {
3747       if (replicationSinkHandler != null) {
3748         checkOpen();
3749         requestCount.increment();
3750         HLog.Entry[] entries = ReplicationProtbufUtil.toHLogEntries(request.getEntryList());
3751         if (entries != null && entries.length > 0) {
3752           replicationSinkHandler.replicateLogEntries(entries);
3753         }
3754       }
3755       return ReplicateWALEntryResponse.newBuilder().build();
3756     } catch (IOException ie) {
3757       throw new ServiceException(ie);
3758     }
3759   }
3760 
3761   /**
3762    * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
3763    * that the given mutations will be durable on the receiving RS if this method returns without any
3764    * exception.
3765    * @param rpcc the RPC controller
3766    * @param request the request
3767    * @throws ServiceException
3768    */
3769   @Override
3770   @QosPriority(priority = HConstants.REPLAY_QOS)
3771   public MultiResponse replay(final RpcController rpcc, final MultiRequest request)
3772       throws ServiceException {
3773     long before = EnvironmentEdgeManager.currentTimeMillis();  
3774     PayloadCarryingRpcController controller = (PayloadCarryingRpcController) rpcc;
3775     CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
3776     // Clear scanner so we are not holding on to reference across call.
3777     controller.setCellScanner(null);
3778     try {
3779       checkOpen();
3780       HRegion region = getRegion(request.getRegion());
3781       MultiResponse.Builder builder = MultiResponse.newBuilder();
3782       List<MutationProto> mutates = new ArrayList<MutationProto>();
3783       for (ClientProtos.MultiAction actionUnion : request.getActionList()) {
3784         if (actionUnion.hasMutation()) {
3785           MutationProto mutate = actionUnion.getMutation();
3786           MutationType type = mutate.getMutateType();
3787           switch (type) {
3788           case PUT:
3789           case DELETE:
3790             mutates.add(mutate);
3791             break;
3792           default:
3793             throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
3794           }
3795         } else {
3796           LOG.warn("Error: invalid action: " + actionUnion + ". " + "it must be a Mutation.");
3797           throw new DoNotRetryIOException("Invalid action, " + "it must be a Mutation.");
3798         }
3799       }
3800       if (!mutates.isEmpty()) {
3801         doBatchOp(builder, region, mutates, cellScanner, true);
3802       }
3803       return builder.build();
3804     } catch (IOException ie) {
3805       throw new ServiceException(ie);
3806     } finally {
3807       metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTimeMillis() - before); 
3808     }
3809   }
3810 
3811   /**
3812    * Roll the WAL writer of the region server.
3813    * @param controller the RPC controller
3814    * @param request the request
3815    * @throws ServiceException
3816    */
3817   @Override
3818   public RollWALWriterResponse rollWALWriter(final RpcController controller,
3819       final RollWALWriterRequest request) throws ServiceException {
3820     try {
3821       requestCount.increment();
3822       HLog wal = this.getWAL();
3823       byte[][] regionsToFlush = wal.rollWriter(true);
3824       RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
3825       if (regionsToFlush != null) {
3826         for (byte[] region: regionsToFlush) {
3827           builder.addRegionToFlush(ByteString.copyFrom(region));
3828         }
3829       }
3830       return builder.build();
3831     } catch (IOException ie) {
3832       throw new ServiceException(ie);
3833     }
3834   }
3835 
3836   /**
3837    * Stop the region server.
3838    *
3839    * @param controller the RPC controller
3840    * @param request the request
3841    * @throws ServiceException
3842    */
3843   @Override
3844   public StopServerResponse stopServer(final RpcController controller,
3845       final StopServerRequest request) throws ServiceException {
3846     requestCount.increment();
3847     String reason = request.getReason();
3848     stop(reason);
3849     return StopServerResponse.newBuilder().build();
3850   }
3851 
3852   /**
3853    * Get some information of the region server.
3854    *
3855    * @param controller the RPC controller
3856    * @param request the request
3857    * @throws ServiceException
3858    */
3859   @Override
3860   public GetServerInfoResponse getServerInfo(final RpcController controller,
3861       final GetServerInfoRequest request) throws ServiceException {
3862     ServerName serverName = getServerName();
3863     requestCount.increment();
3864     return ResponseConverter.buildGetServerInfoResponse(serverName, webuiport);
3865   }
3866 
3867 // End Admin methods
3868 
3869   /**
3870    * Find the HRegion based on a region specifier
3871    *
3872    * @param regionSpecifier the region specifier
3873    * @return the corresponding region
3874    * @throws IOException if the specifier is not null,
3875    *    but failed to find the region
3876    */
3877   protected HRegion getRegion(
3878       final RegionSpecifier regionSpecifier) throws IOException {
3879     return getRegionByEncodedName(
3880         ProtobufUtil.getRegionEncodedName(regionSpecifier));
3881   }
3882 
3883   /**
3884    * Execute an append mutation.
3885    *
3886    * @param region
3887    * @param m
3888    * @param cellScanner
3889    * @return result to return to client if default operation should be
3890    * bypassed as indicated by RegionObserver, null otherwise
3891    * @throws IOException
3892    */
3893   protected Result append(final HRegion region,
3894       final MutationProto m, final CellScanner cellScanner) throws IOException {
3895     long before = EnvironmentEdgeManager.currentTimeMillis();
3896     Append append = ProtobufUtil.toAppend(m, cellScanner);
3897     Result r = null;
3898     if (region.getCoprocessorHost() != null) {
3899       r = region.getCoprocessorHost().preAppend(append);
3900     }
3901     if (r == null) {
3902       r = region.append(append);
3903       if (region.getCoprocessorHost() != null) {
3904         region.getCoprocessorHost().postAppend(append, r);
3905       }
3906     }
3907     metricsRegionServer.updateAppend(EnvironmentEdgeManager.currentTimeMillis() - before);
3908     return r;
3909   }
3910 
3911   /**
3912    * Execute an increment mutation.
3913    *
3914    * @param region
3915    * @param mutation
3916    * @return the Result
3917    * @throws IOException
3918    */
3919   protected Result increment(final HRegion region, final MutationProto mutation,
3920       final CellScanner cells)
3921   throws IOException {
3922     long before = EnvironmentEdgeManager.currentTimeMillis();
3923     Increment increment = ProtobufUtil.toIncrement(mutation, cells);
3924     Result r = null;
3925     if (region.getCoprocessorHost() != null) {
3926       r = region.getCoprocessorHost().preIncrement(increment);
3927     }
3928     if (r == null) {
3929       r = region.increment(increment);
3930       if (region.getCoprocessorHost() != null) {
3931         r = region.getCoprocessorHost().postIncrement(increment, r);
3932       }
3933     }
3934     metricsRegionServer.updateIncrement(EnvironmentEdgeManager.currentTimeMillis() - before);
3935     return r;
3936   }
3937 
3938   /**
3939    * Execute a list of Put/Delete mutations.
3940    */
3941   protected void doBatchOp(final MultiResponse.Builder builder,
3942       final HRegion region, final List<MutationProto> mutates, final CellScanner cells) {
3943     doBatchOp(builder, region, mutates, cells, false);
3944   }
3945   
3946   /**
3947    * Execute a list of Put/Delete mutations.
3948    *
3949    * @param builder
3950    * @param region
3951    * @param mutations
3952    */
3953   protected void doBatchOp(final MultiResponse.Builder builder, final HRegion region,
3954       final List<MutationProto> mutations, final CellScanner cells, boolean isReplay) {
3955     @SuppressWarnings("unchecked")
3956     Pair<Mutation, Integer>[] mutationsWithLocks = new Pair[mutations.size()];
3957     long before = EnvironmentEdgeManager.currentTimeMillis();
3958     boolean batchContainsPuts = false, batchContainsDelete = false;
3959     try {
3960       ActionResult.Builder resultBuilder = ActionResult.newBuilder();
3961       resultBuilder.setValue(ClientProtos.Result.newBuilder().build());
3962       ActionResult result = resultBuilder.build();
3963       int i = 0;
3964       for (MutationProto m : mutations) {
3965         Mutation mutation;
3966         if (m.getMutateType() == MutationType.PUT) {
3967           mutation = ProtobufUtil.toPut(m, cells);
3968           batchContainsPuts = true;
3969         } else {
3970           mutation = ProtobufUtil.toDelete(m, cells);
3971           batchContainsDelete = true;
3972         }
3973         mutationsWithLocks[i++] = new Pair<Mutation, Integer>(mutation, null);
3974         builder.addResult(result);
3975       }
3976 
3977       requestCount.add(mutations.size());
3978       if (!region.getRegionInfo().isMetaTable()) {
3979         cacheFlusher.reclaimMemStoreMemory();
3980       }
3981 
3982       OperationStatus codes[] = region.batchMutate(mutationsWithLocks, isReplay);
3983       for (i = 0; i < codes.length; i++) {
3984         switch (codes[i].getOperationStatusCode()) {
3985           case BAD_FAMILY:
3986             result = ResponseConverter.buildActionResult(
3987                 new NoSuchColumnFamilyException(codes[i].getExceptionMsg()));
3988             builder.setResult(i, result);
3989             break;
3990 
3991           case SANITY_CHECK_FAILURE:
3992             result = ResponseConverter.buildActionResult(
3993                 new FailedSanityCheckException(codes[i].getExceptionMsg()));
3994             builder.setResult(i, result);
3995             break;
3996 
3997           default:
3998             result = ResponseConverter.buildActionResult(
3999                 new DoNotRetryIOException(codes[i].getExceptionMsg()));
4000             builder.setResult(i, result);
4001             break;
4002 
4003           case SUCCESS:
4004             break;
4005         }
4006       }
4007     } catch (IOException ie) {
4008       ActionResult result = ResponseConverter.buildActionResult(ie);
4009       for (int i = 0; i < mutations.size(); i++) {
4010         builder.setResult(i, result);
4011       }
4012     }
4013     long after = EnvironmentEdgeManager.currentTimeMillis();
4014     if (batchContainsPuts) {
4015       metricsRegionServer.updatePut(after - before);
4016     }
4017     if (batchContainsDelete) {
4018       metricsRegionServer.updateDelete(after - before);
4019     }
4020   }
4021 
4022   /**
4023    * Mutate a list of rows atomically.
4024    *
4025    * @param region
4026    * @param mutations
4027  * @param cellScanner if non-null, the mutation data -- the Cell content.
4028    * @throws IOException
4029    */
4030   protected void mutateRows(final HRegion region, final List<MutationProto> mutations,
4031       final CellScanner cellScanner)
4032   throws IOException {
4033     MutationProto firstMutate = mutations.get(0);
4034     if (!region.getRegionInfo().isMetaTable()) {
4035       cacheFlusher.reclaimMemStoreMemory();
4036     }
4037     byte [] row = firstMutate.getRow().toByteArray();
4038     RowMutations rm = new RowMutations(row);
4039     for (MutationProto mutate: mutations) {
4040       MutationType type = mutate.getMutateType();
4041       switch (mutate.getMutateType()) {
4042       case PUT:
4043         rm.add(ProtobufUtil.toPut(mutate, cellScanner));
4044         break;
4045       case DELETE:
4046         rm.add(ProtobufUtil.toDelete(mutate, cellScanner));
4047         break;
4048         default:
4049           throw new DoNotRetryIOException(
4050             "mutate supports atomic put and/or delete, not "
4051               + type.name());
4052       }
4053     }
4054     region.mutateRow(rm);
4055   }
4056 
4057   private static class MovedRegionInfo {
4058     private final ServerName serverName;
4059     private final long seqNum;
4060     private final long ts;
4061 
4062     public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
4063       this.serverName = serverName;
4064       this.seqNum = closeSeqNum;
4065       ts = EnvironmentEdgeManager.currentTimeMillis();
4066      }
4067 
4068     public ServerName getServerName() {
4069       return serverName;
4070     }
4071 
4072     public long getSeqNum() {
4073       return seqNum;
4074     }
4075 
4076     public long getMoveTime() {
4077       return ts;
4078     }
4079   }
4080 
4081   // This map will contains all the regions that we closed for a move.
4082   //  We add the time it was moved as we don't want to keep too old information
4083   protected Map<String, MovedRegionInfo> movedRegions =
4084       new ConcurrentHashMap<String, MovedRegionInfo>(3000);
4085 
4086   // We need a timeout. If not there is a risk of giving a wrong information: this would double
4087   //  the number of network calls instead of reducing them.
4088   private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
4089 
4090   protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
4091     if (ServerName.isSameHostnameAndPort(destination, this.getServerName())) {
4092       LOG.warn("Not adding moved region record: " + encodedName + " to self.");
4093       return;
4094     }
4095     LOG.info("Adding moved region record: " + encodedName + " to "
4096         + destination.getServerName() + ":" + destination.getPort()
4097         + " as of " + closeSeqNum);
4098     movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
4099   }
4100 
4101   private void removeFromMovedRegions(String encodedName) {
4102     movedRegions.remove(encodedName);
4103   }
4104 
4105   private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
4106     MovedRegionInfo dest = movedRegions.get(encodedRegionName);
4107 
4108     long now = EnvironmentEdgeManager.currentTimeMillis();
4109     if (dest != null) {
4110       if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
4111         return dest;
4112       } else {
4113         movedRegions.remove(encodedRegionName);
4114       }
4115     }
4116 
4117     return null;
4118   }
4119 
4120   /**
4121    * Remove the expired entries from the moved regions list.
4122    */
4123   protected void cleanMovedRegions() {
4124     final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
4125     Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();
4126 
4127     while (it.hasNext()){
4128       Map.Entry<String, MovedRegionInfo> e = it.next();
4129       if (e.getValue().getMoveTime() < cutOff) {
4130         it.remove();
4131       }
4132     }
4133   }
4134 
4135   /**
4136    * Creates a Chore thread to clean the moved region cache.
4137    */
4138   protected static class MovedRegionsCleaner extends Chore implements Stoppable {
4139     private HRegionServer regionServer;
4140     Stoppable stoppable;
4141 
4142     private MovedRegionsCleaner(
4143       HRegionServer regionServer, Stoppable stoppable){
4144       super("MovedRegionsCleaner for region "+regionServer, TIMEOUT_REGION_MOVED, stoppable);
4145       this.regionServer = regionServer;
4146       this.stoppable = stoppable;
4147     }
4148 
4149     static MovedRegionsCleaner createAndStart(HRegionServer rs){
4150       Stoppable stoppable = new Stoppable() {
4151         private volatile boolean isStopped = false;
4152         @Override public void stop(String why) { isStopped = true;}
4153         @Override public boolean isStopped() {return isStopped;}
4154       };
4155 
4156       return new MovedRegionsCleaner(rs, stoppable);
4157     }
4158 
4159     @Override
4160     protected void chore() {
4161       regionServer.cleanMovedRegions();
4162     }
4163 
4164     @Override
4165     public void stop(String why) {
4166       stoppable.stop(why);
4167     }
4168 
4169     @Override
4170     public boolean isStopped() {
4171       return stoppable.isStopped();
4172     }
4173   }
4174 
4175   private String getMyEphemeralNodePath() {
4176     return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
4177   }
4178 
4179   /**
4180    * Holder class which holds the RegionScanner and nextCallSeq together.
4181    */
4182   private static class RegionScannerHolder {
4183     private RegionScanner s;
4184     private long nextCallSeq = 0L;
4185 
4186     public RegionScannerHolder(RegionScanner s) {
4187       this.s = s;
4188     }
4189   }
4190 
4191   private boolean isHealthCheckerConfigured() {
4192     String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
4193     return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
4194   }
4195 
4196   /**
4197    * @return the underlying {@link CompactSplitThread} for the servers
4198    */
4199   public CompactSplitThread getCompactSplitThread() {
4200     return this.compactSplitThread;
4201   }
4202 
4203   /**
4204    * check if /hbase/recovering-regions/<current region encoded name> exists. Returns true if exists
4205    * and set watcher as well.
4206    * @param regionEncodedName region encode name
4207    * @return true when /hbase/recovering-regions/<current region encoded name> exists
4208    * @throws KeeperException
4209    */
4210   private boolean isRegionMarkedRecoveringInZK(String regionEncodedName) throws KeeperException {
4211     boolean result = false;
4212     String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode, regionEncodedName);
4213 
4214     byte[] node = ZKUtil.getDataAndWatch(this.zooKeeper, nodePath);
4215     if (node != null) {
4216       result = true;
4217     }
4218 
4219     return result;
4220   }
4221 
4222   /**
4223    * A helper function to store the last flushed sequence Id with the previous failed RS for a
4224    * recovering region. The Id is used to skip wal edits which are flushed. Since the flushed
4225    * sequence id is only valid for each RS, we associate the Id with corresponding failed RS.
4226    * @throws KeeperException
4227    * @throws IOException
4228    */
4229   private void updateRecoveringRegionLastFlushedSequenceId(HRegion r) throws KeeperException,
4230       IOException {
4231     if (!r.isRecovering()) {
4232       // return immdiately for non-recovering regions
4233       return;
4234     }
4235 
4236     HRegionInfo region = r.getRegionInfo();
4237     ZooKeeperWatcher zkw = getZooKeeper();
4238     String previousRSName = this.getLastFailedRSFromZK(region.getEncodedName());
4239     long minSeqIdForLogReplay = r.getMinSeqIdForLogReplay();
4240     long lastRecordedFlushedSequenceId = -1;
4241     String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode,
4242       region.getEncodedName());
4243     // recovering-region level
4244     byte[] data = ZKUtil.getData(zkw, nodePath);
4245     if (data != null) {
4246       lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
4247     }
4248     if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
4249       ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
4250     }
4251     if (previousRSName != null) {
4252       // one level deeper for failed RS
4253       nodePath = ZKUtil.joinZNode(nodePath, previousRSName);
4254       ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
4255       LOG.debug("Update last flushed sequence id of region " + region.getEncodedName() + " for " 
4256           + previousRSName);
4257     } else {
4258       LOG.warn("Can't find failed region server for recovering region " + region.getEncodedName());
4259     }
4260   }
4261 
4262   /**
4263    * Return the last failed RS name under /hbase/recovering-regions/encodedRegionName
4264    * @param encodedRegionName
4265    * @return
4266    * @throws IOException
4267    * @throws KeeperException
4268    */
4269   private String getLastFailedRSFromZK(String encodedRegionName) throws KeeperException {
4270     String result = null;
4271     long maxZxid = 0;
4272     ZooKeeperWatcher zkw = this.getZooKeeper();
4273     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
4274     List<String> failedServers = ZKUtil.listChildrenNoWatch(zkw, nodePath);
4275     if (failedServers == null || failedServers.isEmpty()) {
4276       return result;
4277     }
4278     for (String failedServer : failedServers) {
4279       String rsPath = ZKUtil.joinZNode(nodePath, failedServer);
4280       Stat stat = new Stat();
4281       ZKUtil.getDataNoWatch(zkw, rsPath, stat);
4282       if (maxZxid < stat.getCzxid()) {
4283         maxZxid = stat.getCzxid();
4284         result = failedServer;
4285       }
4286     }
4287     return result;
4288   }
4289 }