View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.FileNotFoundException;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.net.BindException;
25  import java.net.InetSocketAddress;
26  import java.net.UnknownHostException;
27  import java.util.ArrayList;
28  import java.util.Collections;
29  import java.util.HashMap;
30  import java.util.Iterator;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.Map.Entry;
34  import java.util.NavigableMap;
35  import java.util.Set;
36  import java.util.TreeSet;
37  import java.util.concurrent.ConcurrentHashMap;
38  import java.util.concurrent.atomic.AtomicLong;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.hbase.Cell;
44  import org.apache.hadoop.hbase.CellScannable;
45  import org.apache.hadoop.hbase.CellScanner;
46  import org.apache.hadoop.hbase.CellUtil;
47  import org.apache.hadoop.hbase.DoNotRetryIOException;
48  import org.apache.hadoop.hbase.DroppedSnapshotException;
49  import org.apache.hadoop.hbase.HBaseIOException;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.HTableDescriptor;
53  import org.apache.hadoop.hbase.MetaTableAccessor;
54  import org.apache.hadoop.hbase.MultiActionResultTooLarge;
55  import org.apache.hadoop.hbase.NotServingRegionException;
56  import org.apache.hadoop.hbase.ServerName;
57  import org.apache.hadoop.hbase.TableName;
58  import org.apache.hadoop.hbase.UnknownScannerException;
59  import org.apache.hadoop.hbase.classification.InterfaceAudience;
60  import org.apache.hadoop.hbase.client.Append;
61  import org.apache.hadoop.hbase.client.ConnectionUtils;
62  import org.apache.hadoop.hbase.client.Delete;
63  import org.apache.hadoop.hbase.client.Durability;
64  import org.apache.hadoop.hbase.client.Get;
65  import org.apache.hadoop.hbase.client.Increment;
66  import org.apache.hadoop.hbase.client.Mutation;
67  import org.apache.hadoop.hbase.client.Put;
68  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
69  import org.apache.hadoop.hbase.client.Result;
70  import org.apache.hadoop.hbase.client.RowMutations;
71  import org.apache.hadoop.hbase.client.Scan;
72  import org.apache.hadoop.hbase.client.VersionInfoUtil;
73  import org.apache.hadoop.hbase.conf.ConfigurationObserver;
74  import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
75  import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
76  import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
77  import org.apache.hadoop.hbase.exceptions.MergeRegionException;
78  import org.apache.hadoop.hbase.exceptions.OperationConflictException;
79  import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
80  import org.apache.hadoop.hbase.exceptions.ScannerResetException;
81  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
82  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
83  import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
84  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
85  import org.apache.hadoop.hbase.ipc.PriorityFunction;
86  import org.apache.hadoop.hbase.ipc.QosPriority;
87  import org.apache.hadoop.hbase.ipc.RpcCallContext;
88  import org.apache.hadoop.hbase.ipc.RpcServer;
89  import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
90  import org.apache.hadoop.hbase.ipc.RpcServerInterface;
91  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
92  import org.apache.hadoop.hbase.ipc.ServerRpcController;
93  import org.apache.hadoop.hbase.master.MasterRpcServices;
94  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
95  import org.apache.hadoop.hbase.protobuf.RequestConverter;
96  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
97  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
98  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
99  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
100 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
101 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
102 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
103 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
104 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
105 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
106 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
107 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
108 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
109 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
110 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
111 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
112 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
113 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
114 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
115 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
116 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
117 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
118 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
119 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
120 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
121 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
122 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
123 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
124 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
125 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
126 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
127 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
128 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
129 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
130 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
131 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
132 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse;
133 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
134 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
135 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
136 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
137 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
138 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
139 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
140 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
141 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
142 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
143 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
144 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
145 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
146 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
147 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
148 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
149 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
150 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
151 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
152 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
153 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
154 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
155 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair;
156 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
157 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
158 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
159 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics;
160 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
161 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
162 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
163 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
164 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
165 import org.apache.hadoop.hbase.quotas.OperationQuota;
166 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
167 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
168 import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
169 import org.apache.hadoop.hbase.regionserver.Region.Operation;
170 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
171 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
172 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
173 import org.apache.hadoop.hbase.wal.WAL;
174 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
175 import org.apache.hadoop.hbase.security.User;
176 import org.apache.hadoop.hbase.util.Bytes;
177 import org.apache.hadoop.hbase.util.Counter;
178 import org.apache.hadoop.hbase.util.DNS;
179 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
180 import org.apache.hadoop.hbase.util.Pair;
181 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
182 import org.apache.hadoop.hbase.util.Strings;
183 import org.apache.hadoop.hbase.wal.WALKey;
184 import org.apache.hadoop.hbase.wal.WALSplitter;
185 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
186 import org.apache.zookeeper.KeeperException;
187 
188 import com.google.common.annotations.VisibleForTesting;
189 import com.google.protobuf.ByteString;
190 import com.google.protobuf.Message;
191 import com.google.protobuf.RpcController;
192 import com.google.protobuf.ServiceException;
193 import com.google.protobuf.TextFormat;
194 
195 /**
196  * Implements the regionserver RPC services.
197  */
198 @InterfaceAudience.Private
199 @SuppressWarnings("deprecation")
200 public class RSRpcServices implements HBaseRPCErrorHandler,
201     AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
202     ConfigurationObserver {
203   protected static final Log LOG = LogFactory.getLog(RSRpcServices.class);
204 
205   /** RPC scheduler to use for the region server. */
206   public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
207     "hbase.region.server.rpc.scheduler.factory.class";
208 
209   /**
210    * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
211    * configuration exists to prevent the scenario where a time limit is specified to be so
212    * restrictive that the time limit is reached immediately (before any cells are scanned).
213    */
214   private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
215       "hbase.region.server.rpc.minimum.scan.time.limit.delta";
216   /**
217    * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
218    */
219   private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
220 
221   // Request counter. (Includes requests that are not serviced by regions.)
222   final Counter requestCount = new Counter();
223   // Server to handle client requests.
224   final RpcServerInterface rpcServer;
225   final InetSocketAddress isa;
226 
227   private final HRegionServer regionServer;
228   private final long maxScannerResultSize;
229 
230   // The reference to the priority extraction function
231   private final PriorityFunction priority;
232 
233   private ScannerIdGenerator scannerIdGenerator;
234   private final ConcurrentHashMap<String, RegionScannerHolder> scanners =
235     new ConcurrentHashMap<String, RegionScannerHolder>();
236 
237   /**
238    * The lease timeout period for client scanners (milliseconds).
239    */
240   private final int scannerLeaseTimeoutPeriod;
241 
242   /**
243    * The RPC timeout period (milliseconds)
244    */
245   private final int rpcTimeout;
246 
247   /**
248    * The minimum allowable delta to use for the scan limit
249    */
250   private final long minimumScanTimeLimitDelta;
251 
252   /**
253    * Holder class which holds the RegionScanner and nextCallSeq together.
254    */
255   private static class RegionScannerHolder {
256     private AtomicLong nextCallSeq = new AtomicLong(0);
257     private RegionScanner s;
258     private Region r;
259 
260     public RegionScannerHolder(RegionScanner s, Region r) {
261       this.s = s;
262       this.r = r;
263     }
264 
265     private long getNextCallSeq() {
266       return nextCallSeq.get();
267     }
268 
269     private void incNextCallSeq() {
270       nextCallSeq.incrementAndGet();
271     }
272 
273     private void rollbackNextCallSeq() {
274       nextCallSeq.decrementAndGet();
275     }
276   }
277 
278   /**
279    * Instantiated as a scanner lease. If the lease times out, the scanner is
280    * closed
281    */
282   private class ScannerListener implements LeaseListener {
283     private final String scannerName;
284 
285     ScannerListener(final String n) {
286       this.scannerName = n;
287     }
288 
289     @Override
290     public void leaseExpired() {
291       RegionScannerHolder rsh = scanners.remove(this.scannerName);
292       if (rsh != null) {
293         RegionScanner s = rsh.s;
294         LOG.info("Scanner " + this.scannerName + " lease expired on region "
295           + s.getRegionInfo().getRegionNameAsString());
296         Region region = null;
297         try {
298           region = regionServer.getRegion(s.getRegionInfo().getRegionName());
299           if (region != null && region.getCoprocessorHost() != null) {
300             region.getCoprocessorHost().preScannerClose(s);
301           }
302         } catch (IOException e) {
303           LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
304         } finally {
305           try {
306             s.close();
307             if (region != null && region.getCoprocessorHost() != null) {
308               region.getCoprocessorHost().postScannerClose(s);
309             }
310           } catch (IOException e) {
311             LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
312           }
313         }
314       } else {
315         LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" +
316           " scanner found, hence no chance to close that related scanner!");
317       }
318     }
319   }
320 
321   private static ResultOrException getResultOrException(
322       final ClientProtos.Result r, final int index, final ClientProtos.RegionLoadStats stats) {
323     return getResultOrException(ResponseConverter.buildActionResult(r, stats), index);
324   }
325 
326   private static ResultOrException getResultOrException(final Exception e, final int index) {
327     return getResultOrException(ResponseConverter.buildActionResult(e), index);
328   }
329 
330   private static ResultOrException getResultOrException(
331       final ResultOrException.Builder builder, final int index) {
332     return builder.setIndex(index).build();
333   }
334 
335   /**
336    * Starts the nonce operation for a mutation, if needed.
337    * @param mutation Mutation.
338    * @param nonceGroup Nonce group from the request.
339    * @returns Nonce used (can be NO_NONCE).
340    */
341   private long startNonceOperation(final MutationProto mutation, long nonceGroup)
342       throws IOException, OperationConflictException {
343     if (regionServer.nonceManager == null || !mutation.hasNonce()) return HConstants.NO_NONCE;
344     boolean canProceed = false;
345     try {
346       canProceed = regionServer.nonceManager.startOperation(
347         nonceGroup, mutation.getNonce(), regionServer);
348     } catch (InterruptedException ex) {
349       throw new InterruptedIOException("Nonce start operation interrupted");
350     }
351     if (!canProceed) {
352       // TODO: instead, we could convert append/increment to get w/mvcc
353       String message = "The operation with nonce {" + nonceGroup + ", " + mutation.getNonce()
354         + "} on row [" + Bytes.toString(mutation.getRow().toByteArray())
355         + "] may have already completed";
356       throw new OperationConflictException(message);
357     }
358     return mutation.getNonce();
359   }
360 
361   /**
362    * Ends nonce operation for a mutation, if needed.
363    * @param mutation Mutation.
364    * @param nonceGroup Nonce group from the request. Always 0 in initial implementation.
365    * @param success Whether the operation for this nonce has succeeded.
366    */
367   private void endNonceOperation(final MutationProto mutation,
368       long nonceGroup, boolean success) {
369     if (regionServer.nonceManager != null && mutation.hasNonce()) {
370       regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success);
371     }
372   }
373 
374   /**
375    * @return True if current call supports cellblocks
376    */
377   private boolean isClientCellBlockSupport() {
378     RpcCallContext context = RpcServer.getCurrentCall();
379     return context != null && context.isClientCellBlockSupported();
380   }
381 
382   private void addResult(final MutateResponse.Builder builder,
383       final Result result, final PayloadCarryingRpcController rpcc) {
384     if (result == null) return;
385     if (isClientCellBlockSupport()) {
386       builder.setResult(ProtobufUtil.toResultNoData(result));
387       rpcc.setCellScanner(result.cellScanner());
388     } else {
389       ClientProtos.Result pbr = ProtobufUtil.toResult(result);
390       builder.setResult(pbr);
391     }
392   }
393 
394   private void addResults(final ScanResponse.Builder builder, final List<Result> results,
395       final RpcController controller, boolean isDefaultRegion) {
396     builder.setStale(!isDefaultRegion);
397     if (results == null || results.isEmpty()) return;
398     if (isClientCellBlockSupport()) {
399       for (Result res : results) {
400         builder.addCellsPerResult(res.size());
401         builder.addPartialFlagPerResult(res.isPartial());
402       }
403       ((PayloadCarryingRpcController)controller).
404         setCellScanner(CellUtil.createCellScanner(results));
405     } else {
406       for (Result res: results) {
407         ClientProtos.Result pbr = ProtobufUtil.toResult(res);
408         builder.addResults(pbr);
409       }
410     }
411   }
412 
413   /**
414    * Mutate a list of rows atomically.
415    *
416    * @param region
417    * @param actions
418    * @param cellScanner if non-null, the mutation data -- the Cell content.
419    * @throws IOException
420    */
421   private ClientProtos.RegionLoadStats mutateRows(final Region region,
422       final List<ClientProtos.Action> actions,
423       final CellScanner cellScanner) throws IOException {
424     int countOfCompleteMutation = 0;
425     try {
426       if (!region.getRegionInfo().isMetaTable()) {
427         regionServer.cacheFlusher.reclaimMemStoreMemory();
428       }
429       RowMutations rm = null;
430       for (ClientProtos.Action action: actions) {
431         if (action.hasGet()) {
432           throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
433             action.getGet());
434         }
435         MutationType type = action.getMutation().getMutateType();
436         if (rm == null) {
437           rm = new RowMutations(action.getMutation().getRow().toByteArray());
438         }
439         switch (type) {
440           case PUT:
441             Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
442             ++countOfCompleteMutation;
443             rm.add(put);
444             break;
445           case DELETE:
446             Delete delete = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
447             ++countOfCompleteMutation;
448             rm.add(delete);
449             break;
450           default:
451             throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
452         }
453       }
454       region.mutateRow(rm);
455       return ((HRegion)region).getRegionStats();
456     } finally {
457       // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
458       // even if the malformed cells are not skipped.
459       for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
460         skipCellsForMutation(actions.get(i), cellScanner);
461       }
462     }
463   }
464 
465   /**
466    * Mutate a list of rows atomically.
467    *
468    * @param region
469    * @param actions
470    * @param cellScanner if non-null, the mutation data -- the Cell content.
471    * @param row
472    * @param family
473    * @param qualifier
474    * @param compareOp
475    * @param comparator @throws IOException
476    */
477   private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions,
478       final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
479       CompareOp compareOp, ByteArrayComparable comparator) throws IOException {
480     int countOfCompleteMutation = 0;
481     try {
482       if (!region.getRegionInfo().isMetaTable()) {
483         regionServer.cacheFlusher.reclaimMemStoreMemory();
484       }
485       RowMutations rm = null;
486       for (ClientProtos.Action action: actions) {
487         if (action.hasGet()) {
488           throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
489             action.getGet());
490         }
491         MutationType type = action.getMutation().getMutateType();
492         if (rm == null) {
493           rm = new RowMutations(action.getMutation().getRow().toByteArray());
494         }
495         switch (type) {
496           case PUT:
497             Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
498             ++countOfCompleteMutation;
499             rm.add(put);
500             break;
501           case DELETE:
502             Delete delete = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
503             ++countOfCompleteMutation;
504             rm.add(delete);
505             break;
506           default:
507             throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
508         }
509       }
510       return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, true);
511     } finally {
512       // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
513       // even if the malformed cells are not skipped.
514       for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
515         skipCellsForMutation(actions.get(i), cellScanner);
516       }
517     }
518   }
519 
520   /**
521    * Execute an append mutation.
522    *
523    * @param region
524    * @param m
525    * @param cellScanner
526    * @return result to return to client if default operation should be
527    * bypassed as indicated by RegionObserver, null otherwise
528    * @throws IOException
529    */
530   private Result append(final Region region, final OperationQuota quota, final MutationProto m,
531       final CellScanner cellScanner, long nonceGroup) throws IOException {
532     long before = EnvironmentEdgeManager.currentTime();
533     Append append = ProtobufUtil.toAppend(m, cellScanner);
534     quota.addMutation(append);
535     Result r = null;
536     if (region.getCoprocessorHost() != null) {
537       r = region.getCoprocessorHost().preAppend(append);
538     }
539     if (r == null) {
540       long nonce = startNonceOperation(m, nonceGroup);
541       boolean success = false;
542       try {
543         r = region.append(append, nonceGroup, nonce);
544         success = true;
545       } finally {
546         endNonceOperation(m, nonceGroup, success);
547       }
548       if (region.getCoprocessorHost() != null) {
549         region.getCoprocessorHost().postAppend(append, r);
550       }
551     }
552     if (regionServer.metricsRegionServer != null) {
553       regionServer.metricsRegionServer.updateAppend(
554         EnvironmentEdgeManager.currentTime() - before);
555     }
556     return r;
557   }
558 
559   /**
560    * Execute an increment mutation.
561    *
562    * @param region
563    * @param mutation
564    * @return the Result
565    * @throws IOException
566    */
567   private Result increment(final Region region, final OperationQuota quota,
568       final MutationProto mutation, final CellScanner cells, long nonceGroup) throws IOException {
569     long before = EnvironmentEdgeManager.currentTime();
570     Increment increment = ProtobufUtil.toIncrement(mutation, cells);
571     quota.addMutation(increment);
572     Result r = null;
573     if (region.getCoprocessorHost() != null) {
574       r = region.getCoprocessorHost().preIncrement(increment);
575     }
576     if (r == null) {
577       long nonce = startNonceOperation(mutation, nonceGroup);
578       boolean success = false;
579       try {
580         r = region.increment(increment, nonceGroup, nonce);
581         success = true;
582       } finally {
583         endNonceOperation(mutation, nonceGroup, success);
584       }
585       if (region.getCoprocessorHost() != null) {
586         r = region.getCoprocessorHost().postIncrement(increment, r);
587       }
588     }
589     if (regionServer.metricsRegionServer != null) {
590       regionServer.metricsRegionServer.updateIncrement(
591         EnvironmentEdgeManager.currentTime() - before);
592     }
593     return r;
594   }
595 
596   /**
597    * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
598    * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
599    * @param region
600    * @param actions
601    * @param cellScanner
602    * @param builder
603    * @param cellsToReturn  Could be null. May be allocated in this method.  This is what this
604    * method returns as a 'result'.
605    * @return Return the <code>cellScanner</code> passed
606    */
607   private List<CellScannable> doNonAtomicRegionMutation(final Region region,
608       final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
609       final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup) {
610     // Gather up CONTIGUOUS Puts and Deletes in this mutations List.  Idea is that rather than do
611     // one at a time, we instead pass them in batch.  Be aware that the corresponding
612     // ResultOrException instance that matches each Put or Delete is then added down in the
613     // doBatchOp call.  We should be staying aligned though the Put and Delete are deferred/batched
614     List<ClientProtos.Action> mutations = null;
615     long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
616     RpcCallContext context = RpcServer.getCurrentCall();
617     IOException sizeIOE = null;
618     Object lastBlock = null;
619     for (ClientProtos.Action action : actions.getActionList()) {
620       ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
621       try {
622         Result r = null;
623 
624         if (context != null
625             && context.isRetryImmediatelySupported()
626             && (context.getResponseCellSize() > maxQuotaResultSize
627               || context.getResponseBlockSize() > maxQuotaResultSize)) {
628 
629           // We're storing the exception since the exception and reason string won't
630           // change after the response size limit is reached.
631           if (sizeIOE == null ) {
632             // We don't need the stack un-winding do don't throw the exception.
633             // Throwing will kill the JVM's JIT.
634             //
635             // Instead just create the exception and then store it.
636             sizeIOE = new MultiActionResultTooLarge("Max size exceeded"
637                 + " CellSize: " + context.getResponseCellSize()
638                 + " BlockSize: " + context.getResponseBlockSize());
639 
640             // Only report the exception once since there's only one request that
641             // caused the exception. Otherwise this number will dominate the exceptions count.
642             rpcServer.getMetrics().exception(sizeIOE);
643           }
644 
645           // Now that there's an exception is known to be created
646           // use it for the response.
647           //
648           // This will create a copy in the builder.
649           resultOrExceptionBuilder = ResultOrException.newBuilder().
650               setException(ResponseConverter.buildException(sizeIOE));
651           resultOrExceptionBuilder.setIndex(action.getIndex());
652           builder.addResultOrException(resultOrExceptionBuilder.build());
653           skipCellsForMutation(action, cellScanner);
654           continue;
655         }
656         if (action.hasGet()) {
657           long before = EnvironmentEdgeManager.currentTime();
658           try {
659             Get get = ProtobufUtil.toGet(action.getGet());
660             r = region.get(get);
661           } finally {
662             if (regionServer.metricsRegionServer != null) {
663               regionServer.metricsRegionServer.updateGet(
664                 EnvironmentEdgeManager.currentTime() - before);
665             }
666           }
667         } else if (action.hasServiceCall()) {
668           resultOrExceptionBuilder = ResultOrException.newBuilder();
669           try {
670             Message result = execServiceOnRegion(region, action.getServiceCall());
671             ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
672                 ClientProtos.CoprocessorServiceResult.newBuilder();
673             resultOrExceptionBuilder.setServiceResult(
674                 serviceResultBuilder.setValue(
675                   serviceResultBuilder.getValueBuilder()
676                     .setName(result.getClass().getName())
677                     .setValue(result.toByteString())));
678           } catch (IOException ioe) {
679             rpcServer.getMetrics().exception(ioe);
680             resultOrExceptionBuilder.setException(ResponseConverter.buildException(ioe));
681           }
682         } else if (action.hasMutation()) {
683           MutationType type = action.getMutation().getMutateType();
684           if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
685               !mutations.isEmpty()) {
686             // Flush out any Puts or Deletes already collected.
687             doBatchOp(builder, region, quota, mutations, cellScanner);
688             mutations.clear();
689           }
690           switch (type) {
691           case APPEND:
692             r = append(region, quota, action.getMutation(), cellScanner, nonceGroup);
693             break;
694           case INCREMENT:
695             r = increment(region, quota, action.getMutation(), cellScanner,  nonceGroup);
696             break;
697           case PUT:
698           case DELETE:
699             // Collect the individual mutations and apply in a batch
700             if (mutations == null) {
701               mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount());
702             }
703             mutations.add(action);
704             break;
705           default:
706             throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
707           }
708         } else {
709           throw new HBaseIOException("Unexpected Action type");
710         }
711         if (r != null) {
712           ClientProtos.Result pbResult = null;
713           if (isClientCellBlockSupport()) {
714             pbResult = ProtobufUtil.toResultNoData(r);
715             //  Hard to guess the size here.  Just make a rough guess.
716             if (cellsToReturn == null) {
717               cellsToReturn = new ArrayList<CellScannable>();
718             }
719             cellsToReturn.add(r);
720           } else {
721             pbResult = ProtobufUtil.toResult(r);
722           }
723           lastBlock = addSize(context, r, lastBlock);
724           resultOrExceptionBuilder =
725             ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
726         }
727         // Could get to here and there was no result and no exception.  Presumes we added
728         // a Put or Delete to the collecting Mutations List for adding later.  In this
729         // case the corresponding ResultOrException instance for the Put or Delete will be added
730         // down in the doBatchOp method call rather than up here.
731       } catch (IOException ie) {
732         rpcServer.getMetrics().exception(ie);
733         resultOrExceptionBuilder = ResultOrException.newBuilder().
734           setException(ResponseConverter.buildException(ie));
735       }
736       if (resultOrExceptionBuilder != null) {
737         // Propagate index.
738         resultOrExceptionBuilder.setIndex(action.getIndex());
739         builder.addResultOrException(resultOrExceptionBuilder.build());
740       }
741     }
742     // Finish up any outstanding mutations
743     if (mutations != null && !mutations.isEmpty()) {
744       doBatchOp(builder, region, quota, mutations, cellScanner);
745     }
746     return cellsToReturn;
747   }
748 
749   /**
750    * Execute a list of Put/Delete mutations.
751    *
752    * @param builder
753    * @param region
754    * @param mutations
755    */
756   private void doBatchOp(final RegionActionResult.Builder builder, final Region region,
757       final OperationQuota quota,
758       final List<ClientProtos.Action> mutations, final CellScanner cells) {
759     Mutation[] mArray = new Mutation[mutations.size()];
760     long before = EnvironmentEdgeManager.currentTime();
761     boolean batchContainsPuts = false, batchContainsDelete = false;
762     try {
763       int i = 0;
764       for (ClientProtos.Action action: mutations) {
765         MutationProto m = action.getMutation();
766         Mutation mutation;
767         if (m.getMutateType() == MutationType.PUT) {
768           mutation = ProtobufUtil.toPut(m, cells);
769           batchContainsPuts = true;
770         } else {
771           mutation = ProtobufUtil.toDelete(m, cells);
772           batchContainsDelete = true;
773         }
774         mArray[i++] = mutation;
775         quota.addMutation(mutation);
776       }
777 
778       if (!region.getRegionInfo().isMetaTable()) {
779         regionServer.cacheFlusher.reclaimMemStoreMemory();
780       }
781 
782       OperationStatus[] codes = region.batchMutate(mArray, HConstants.NO_NONCE,
783         HConstants.NO_NONCE);
784       for (i = 0; i < codes.length; i++) {
785         int index = mutations.get(i).getIndex();
786         Exception e = null;
787         switch (codes[i].getOperationStatusCode()) {
788           case BAD_FAMILY:
789             e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
790             builder.addResultOrException(getResultOrException(e, index));
791             break;
792 
793           case SANITY_CHECK_FAILURE:
794             e = new FailedSanityCheckException(codes[i].getExceptionMsg());
795             builder.addResultOrException(getResultOrException(e, index));
796             break;
797 
798           default:
799             e = new DoNotRetryIOException(codes[i].getExceptionMsg());
800             builder.addResultOrException(getResultOrException(e, index));
801             break;
802 
803           case SUCCESS:
804             builder.addResultOrException(getResultOrException(
805                 ClientProtos.Result.getDefaultInstance(), index,
806                 ((HRegion) region).getRegionStats()));
807             break;
808         }
809       }
810     } catch (IOException ie) {
811       int processedMutationIndex = 0;
812       for (Action mutation : mutations) {
813         // The non-null mArray[i] means the cell scanner has been read.
814         if (mArray[processedMutationIndex++] == null) {
815           skipCellsForMutation(mutation, cells);
816         }
817         builder.addResultOrException(getResultOrException(ie, mutation.getIndex()));
818       }
819     }
820     if (regionServer.metricsRegionServer != null) {
821       long after = EnvironmentEdgeManager.currentTime();
822       if (batchContainsPuts) {
823         regionServer.metricsRegionServer.updatePut(after - before);
824       }
825       if (batchContainsDelete) {
826         regionServer.metricsRegionServer.updateDelete(after - before);
827       }
828     }
829   }
830 
831   /**
832    * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
833    * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
834    * @param region
835    * @param mutations
836    * @param replaySeqId
837    * @return an array of OperationStatus which internally contains the OperationStatusCode and the
838    *         exceptionMessage if any
839    * @throws IOException
840    */
841   private OperationStatus [] doReplayBatchOp(final Region region,
842       final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
843     long before = EnvironmentEdgeManager.currentTime();
844     boolean batchContainsPuts = false, batchContainsDelete = false;
845     try {
846       for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) {
847         WALSplitter.MutationReplay m = it.next();
848 
849         if (m.type == MutationType.PUT) {
850           batchContainsPuts = true;
851         } else {
852           batchContainsDelete = true;
853         }
854 
855         NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
856         List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
857         if (metaCells != null && !metaCells.isEmpty()) {
858           for (Cell metaCell : metaCells) {
859             CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
860             boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
861             HRegion hRegion = (HRegion)region;
862             if (compactionDesc != null) {
863               // replay the compaction. Remove the files from stores only if we are the primary
864               // region replica (thus own the files)
865               hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
866                 replaySeqId);
867               continue;
868             }
869             FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
870             if (flushDesc != null && !isDefaultReplica) {
871               hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
872               continue;
873             }
874             RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
875             if (regionEvent != null && !isDefaultReplica) {
876               hRegion.replayWALRegionEventMarker(regionEvent);
877               continue;
878             }
879             BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
880             if (bulkLoadEvent != null) {
881               hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
882               continue;
883             }
884           }
885           it.remove();
886         }
887       }
888       requestCount.add(mutations.size());
889       if (!region.getRegionInfo().isMetaTable()) {
890         regionServer.cacheFlusher.reclaimMemStoreMemory();
891       }
892       return region.batchReplay(mutations.toArray(
893         new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
894     } finally {
895       if (regionServer.metricsRegionServer != null) {
896         long after = EnvironmentEdgeManager.currentTime();
897           if (batchContainsPuts) {
898           regionServer.metricsRegionServer.updatePut(after - before);
899         }
900         if (batchContainsDelete) {
901           regionServer.metricsRegionServer.updateDelete(after - before);
902         }
903       }
904     }
905   }
906 
907   private void closeAllScanners() {
908     // Close any outstanding scanners. Means they'll get an UnknownScanner
909     // exception next time they come in.
910     for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
911       try {
912         e.getValue().s.close();
913       } catch (IOException ioe) {
914         LOG.warn("Closing scanner " + e.getKey(), ioe);
915       }
916     }
917   }
918 
919   public RSRpcServices(HRegionServer rs) throws IOException {
920     regionServer = rs;
921 
922     RpcSchedulerFactory rpcSchedulerFactory;
923     try {
924       Class<?> rpcSchedulerFactoryClass = rs.conf.getClass(
925           REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
926           SimpleRpcSchedulerFactory.class);
927       rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());
928     } catch (InstantiationException e) {
929       throw new IllegalArgumentException(e);
930     } catch (IllegalAccessException e) {
931       throw new IllegalArgumentException(e);
932     }
933     // Server to handle client requests.
934     InetSocketAddress initialIsa;
935     InetSocketAddress bindAddress;
936     if(this instanceof MasterRpcServices) {
937       String hostname = getHostname(rs.conf, true);
938       int port = rs.conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
939       // Creation of a HSA will force a resolve.
940       initialIsa = new InetSocketAddress(hostname, port);
941       bindAddress = new InetSocketAddress(rs.conf.get("hbase.master.ipc.address", hostname), port);
942     } else {
943       String hostname = getHostname(rs.conf, false);
944       int port = rs.conf.getInt(HConstants.REGIONSERVER_PORT,
945         HConstants.DEFAULT_REGIONSERVER_PORT);
946       // Creation of a HSA will force a resolve.
947       initialIsa = new InetSocketAddress(hostname, port);
948       bindAddress = new InetSocketAddress(
949         rs.conf.get("hbase.regionserver.ipc.address", hostname), port);
950     }
951     if (initialIsa.getAddress() == null) {
952       throw new IllegalArgumentException("Failed resolve of " + initialIsa);
953     }
954     priority = createPriority();
955     String name = rs.getProcessName() + "/" + initialIsa.toString();
956     // Set how many times to retry talking to another server over HConnection.
957     ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
958     try {
959       rpcServer = new RpcServer(rs, name, getServices(),
960           bindAddress, // use final bindAddress for this server.
961           rs.conf,
962           rpcSchedulerFactory.create(rs.conf, this, rs));
963       rpcServer.setRsRpcServices(this);
964     } catch (BindException be) {
965       String configName = (this instanceof MasterRpcServices) ? HConstants.MASTER_PORT :
966           HConstants.REGIONSERVER_PORT;
967       throw new IOException(be.getMessage() + ". To switch ports use the '" + configName +
968           "' configuration property.", be.getCause() != null ? be.getCause() : be);
969     }
970 
971     scannerLeaseTimeoutPeriod = rs.conf.getInt(
972       HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
973       HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
974     maxScannerResultSize = rs.conf.getLong(
975       HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
976       HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
977     rpcTimeout = rs.conf.getInt(
978       HConstants.HBASE_RPC_TIMEOUT_KEY,
979       HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
980     minimumScanTimeLimitDelta = rs.conf.getLong(
981       REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
982       DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
983 
984     InetSocketAddress address = rpcServer.getListenerAddress();
985     if (address == null) {
986       throw new IOException("Listener channel is closed");
987     }
988     // Set our address, however we need the final port that was given to rpcServer
989     isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
990     rpcServer.setErrorHandler(this);
991     rs.setName(name);
992   }
993 
994   @Override
995   public void onConfigurationChange(Configuration newConf) {
996     if (rpcServer instanceof ConfigurationObserver) {
997       ((ConfigurationObserver)rpcServer).onConfigurationChange(newConf);
998     }
999   }
1000 
1001   protected PriorityFunction createPriority() {
1002     return new AnnotationReadingPriorityFunction(this);
1003   }
1004 
1005   public static String getHostname(Configuration conf, boolean isMaster)
1006       throws UnknownHostException {
1007     String hostname = conf.get(isMaster? HRegionServer.MASTER_HOSTNAME_KEY :
1008       HRegionServer.RS_HOSTNAME_KEY);
1009     if (hostname == null || hostname.isEmpty()) {
1010       String masterOrRS = isMaster ? "master" : "regionserver";
1011       return Strings.domainNamePointerToHostName(DNS.getDefaultHost(
1012         conf.get("hbase." + masterOrRS + ".dns.interface", "default"),
1013         conf.get("hbase." + masterOrRS + ".dns.nameserver", "default")));
1014     } else {
1015       LOG.info("hostname is configured to be " + hostname);
1016       return hostname;
1017     }
1018   }
1019 
1020   RegionScanner getScanner(long scannerId) {
1021     String scannerIdString = Long.toString(scannerId);
1022     RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
1023     if (scannerHolder != null) {
1024       return scannerHolder.s;
1025     }
1026     return null;
1027   }
1028 
1029   public String getScanDetailsWithId(long scannerId) {
1030     RegionScanner scanner = getScanner(scannerId);
1031     if (scanner == null) {
1032       return null;
1033     }
1034     StringBuilder builder = new StringBuilder();
1035     builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString());
1036     builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString());
1037     return builder.toString();
1038   }
1039 
1040   /**
1041    * Get the vtime associated with the scanner.
1042    * Currently the vtime is the number of "next" calls.
1043    */
1044   long getScannerVirtualTime(long scannerId) {
1045     String scannerIdString = Long.toString(scannerId);
1046     RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
1047     if (scannerHolder != null) {
1048       return scannerHolder.getNextCallSeq();
1049     }
1050     return 0L;
1051   }
1052 
1053   long addScanner(RegionScanner s, Region r) throws LeaseStillHeldException {
1054     long scannerId = this.scannerIdGenerator.generateNewScannerId();
1055     String scannerName = String.valueOf(scannerId);
1056 
1057     RegionScannerHolder existing =
1058       scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r));
1059     assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
1060 
1061     regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
1062         new ScannerListener(scannerName));
1063     return scannerId;
1064   }
1065 
1066   /**
1067    * Method to account for the size of retained cells and retained data blocks.
1068    * @return an object that represents the last referenced block from this response.
1069    */
1070   Object addSize(RpcCallContext context, Result r, Object lastBlock) {
1071     if (context != null && !r.isEmpty()) {
1072       for (Cell c : r.rawCells()) {
1073         context.incrementResponseCellSize(CellUtil.estimatedHeapSizeOf(c));
1074         // We're using the last block being the same as the current block as
1075         // a proxy for pointing to a new block. This won't be exact.
1076         // If there are multiple gets that bounce back and forth
1077         // Then it's possible that this will over count the size of
1078         // referenced blocks. However it's better to over count and
1079         // use two RPC's than to OOME the RegionServer.
1080         byte[] valueArray = c.getValueArray();
1081         if (valueArray != lastBlock) {
1082           context.incrementResponseBlockSize(valueArray.length);
1083           lastBlock = valueArray;
1084         }
1085       }
1086     }
1087     return lastBlock;
1088   }
1089 
1090 
1091   /**
1092    * Find the HRegion based on a region specifier
1093    *
1094    * @param regionSpecifier the region specifier
1095    * @return the corresponding region
1096    * @throws IOException if the specifier is not null,
1097    *    but failed to find the region
1098    */
1099   Region getRegion(
1100       final RegionSpecifier regionSpecifier) throws IOException {
1101     return regionServer.getRegionByEncodedName(regionSpecifier.getValue().toByteArray(),
1102         ProtobufUtil.getRegionEncodedName(regionSpecifier));
1103   }
1104 
1105   @VisibleForTesting
1106   public PriorityFunction getPriority() {
1107     return priority;
1108   }
1109 
1110   @VisibleForTesting
1111   public Configuration getConfiguration() {
1112     return regionServer.getConfiguration();
1113   }
1114 
1115   private RegionServerQuotaManager getQuotaManager() {
1116     return regionServer.getRegionServerQuotaManager();
1117   }
1118 
1119   void start() {
1120     this.scannerIdGenerator = new ScannerIdGenerator(this.regionServer.serverName);
1121     rpcServer.start();
1122   }
1123 
1124   void stop() {
1125     closeAllScanners();
1126     rpcServer.stop();
1127   }
1128 
1129   /**
1130    * Called to verify that this server is up and running.
1131    *
1132    * @throws IOException
1133    */
1134   protected void checkOpen() throws IOException {
1135     if (regionServer.isAborted()) {
1136       throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting");
1137     }
1138     if (regionServer.isStopped()) {
1139       throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping");
1140     }
1141     if (!regionServer.fsOk) {
1142       throw new RegionServerStoppedException("File system not available");
1143     }
1144     if (!regionServer.isOnline()) {
1145       throw new ServerNotRunningYetException("Server is not running yet");
1146     }
1147   }
1148 
1149   /**
1150    * @return list of blocking services and their security info classes that this server supports
1151    */
1152   protected List<BlockingServiceAndInterface> getServices() {
1153     List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(2);
1154     bssi.add(new BlockingServiceAndInterface(
1155       ClientService.newReflectiveBlockingService(this),
1156       ClientService.BlockingInterface.class));
1157     bssi.add(new BlockingServiceAndInterface(
1158       AdminService.newReflectiveBlockingService(this),
1159       AdminService.BlockingInterface.class));
1160     return bssi;
1161   }
1162 
1163   public InetSocketAddress getSocketAddress() {
1164     return isa;
1165   }
1166 
1167   @Override
1168   public int getPriority(RequestHeader header, Message param, User user) {
1169     return priority.getPriority(header, param, user);
1170   }
1171 
1172   @Override
1173   public long getDeadline(RequestHeader header, Message param) {
1174     return priority.getDeadline(header, param);
1175   }
1176 
1177   /*
1178    * Check if an OOME and, if so, abort immediately to avoid creating more objects.
1179    *
1180    * @param e
1181    *
1182    * @return True if we OOME'd and are aborting.
1183    */
1184   @Override
1185   public boolean checkOOME(final Throwable e) {
1186     return exitIfOOME(e);
1187   }
1188 
1189   public static boolean exitIfOOME(final Throwable e ){
1190     boolean stop = false;
1191     try {
1192       if (e instanceof OutOfMemoryError
1193           || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
1194           || (e.getMessage() != null && e.getMessage().contains(
1195               "java.lang.OutOfMemoryError"))) {
1196         stop = true;
1197         LOG.fatal("Run out of memory; " + RSRpcServices.class.getSimpleName()
1198           + " will abort itself immediately", e);
1199       }
1200     } finally {
1201       if (stop) {
1202         Runtime.getRuntime().halt(1);
1203       }
1204     }
1205     return stop;
1206   }
1207 
1208   /**
1209    * Close a region on the region server.
1210    *
1211    * @param controller the RPC controller
1212    * @param request the request
1213    * @throws ServiceException
1214    */
1215   @Override
1216   @QosPriority(priority=HConstants.ADMIN_QOS)
1217   public CloseRegionResponse closeRegion(final RpcController controller,
1218       final CloseRegionRequest request) throws ServiceException {
1219     final ServerName sn = (request.hasDestinationServer() ?
1220       ProtobufUtil.toServerName(request.getDestinationServer()) : null);
1221 
1222     try {
1223       checkOpen();
1224       if (request.hasServerStartCode()) {
1225         // check that we are the same server that this RPC is intended for.
1226         long serverStartCode = request.getServerStartCode();
1227         if (regionServer.serverName.getStartcode() !=  serverStartCode) {
1228           throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1229               "different server with startCode: " + serverStartCode + ", this server is: "
1230               + regionServer.serverName));
1231         }
1232       }
1233       final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1234 
1235       requestCount.increment();
1236       LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1237       CloseRegionCoordination.CloseRegionDetails crd = regionServer.getCoordinatedStateManager()
1238         .getCloseRegionCoordination().parseFromProtoRequest(request);
1239 
1240       boolean closed = regionServer.closeRegion(encodedRegionName, false, crd, sn);
1241       CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1242       return builder.build();
1243     } catch (IOException ie) {
1244       throw new ServiceException(ie);
1245     }
1246   }
1247 
1248   /**
1249    * Compact a region on the region server.
1250    *
1251    * @param controller the RPC controller
1252    * @param request the request
1253    * @throws ServiceException
1254    */
1255   @Override
1256   @QosPriority(priority=HConstants.ADMIN_QOS)
1257   public CompactRegionResponse compactRegion(final RpcController controller,
1258       final CompactRegionRequest request) throws ServiceException {
1259     try {
1260       checkOpen();
1261       requestCount.increment();
1262       Region region = getRegion(request.getRegion());
1263       region.startRegionOperation(Operation.COMPACT_REGION);
1264       LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1265       boolean major = false;
1266       byte [] family = null;
1267       Store store = null;
1268       if (request.hasFamily()) {
1269         family = request.getFamily().toByteArray();
1270         store = region.getStore(family);
1271         if (store == null) {
1272           throw new ServiceException(new IOException("column family " + Bytes.toString(family)
1273             + " does not exist in region " + region.getRegionInfo().getRegionNameAsString()));
1274         }
1275       }
1276       if (request.hasMajor()) {
1277         major = request.getMajor();
1278       }
1279       if (major) {
1280         if (family != null) {
1281           store.triggerMajorCompaction();
1282         } else {
1283           region.triggerMajorCompaction();
1284         }
1285       }
1286 
1287       String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):"";
1288       if (LOG.isTraceEnabled()) {
1289         LOG.trace("User-triggered compaction requested for region "
1290           + region.getRegionInfo().getRegionNameAsString() + familyLogMsg);
1291       }
1292       String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
1293       if(family != null) {
1294         regionServer.compactSplitThread.requestCompaction(region, store, log,
1295           Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1296       } else {
1297         regionServer.compactSplitThread.requestCompaction(region, log,
1298           Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1299       }
1300       return CompactRegionResponse.newBuilder().build();
1301     } catch (IOException ie) {
1302       throw new ServiceException(ie);
1303     }
1304   }
1305 
1306   /**
1307    * Flush a region on the region server.
1308    *
1309    * @param controller the RPC controller
1310    * @param request the request
1311    * @throws ServiceException
1312    */
1313   @Override
1314   @QosPriority(priority=HConstants.ADMIN_QOS)
1315   public FlushRegionResponse flushRegion(final RpcController controller,
1316       final FlushRegionRequest request) throws ServiceException {
1317     try {
1318       checkOpen();
1319       requestCount.increment();
1320       Region region = getRegion(request.getRegion());
1321       LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1322       boolean shouldFlush = true;
1323       if (request.hasIfOlderThanTs()) {
1324         shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1325       }
1326       FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1327       if (shouldFlush) {
1328         boolean writeFlushWalMarker =  request.hasWriteFlushWalMarker() ?
1329             request.getWriteFlushWalMarker() : false;
1330         long startTime = EnvironmentEdgeManager.currentTime();
1331         // Go behind the curtain so we can manage writing of the flush WAL marker
1332         HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl)
1333             ((HRegion)region).flushcache(true, writeFlushWalMarker);
1334         if (flushResult.isFlushSucceeded()) {
1335           long endTime = EnvironmentEdgeManager.currentTime();
1336           regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1337         }
1338         boolean compactionNeeded = flushResult.isCompactionNeeded();
1339         if (compactionNeeded) {
1340           regionServer.compactSplitThread.requestSystemCompaction(region,
1341             "Compaction through user triggered flush");
1342         }
1343         builder.setFlushed(flushResult.isFlushSucceeded());
1344         builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1345       }
1346       builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1347       return builder.build();
1348     } catch (DroppedSnapshotException ex) {
1349       // Cache flush can fail in a few places. If it fails in a critical
1350       // section, we get a DroppedSnapshotException and a replay of wal
1351       // is required. Currently the only way to do this is a restart of
1352       // the server.
1353       regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1354       throw new ServiceException(ex);
1355     } catch (IOException ie) {
1356       throw new ServiceException(ie);
1357     }
1358   }
1359 
1360   @Override
1361   @QosPriority(priority=HConstants.ADMIN_QOS)
1362   public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1363       final GetOnlineRegionRequest request) throws ServiceException {
1364     try {
1365       checkOpen();
1366       requestCount.increment();
1367       Map<String, Region> onlineRegions = regionServer.onlineRegions;
1368       List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
1369       for (Region region: onlineRegions.values()) {
1370         list.add(region.getRegionInfo());
1371       }
1372       Collections.sort(list);
1373       return ResponseConverter.buildGetOnlineRegionResponse(list);
1374     } catch (IOException ie) {
1375       throw new ServiceException(ie);
1376     }
1377   }
1378 
1379   @Override
1380   @QosPriority(priority=HConstants.ADMIN_QOS)
1381   public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1382       final GetRegionInfoRequest request) throws ServiceException {
1383     try {
1384       checkOpen();
1385       requestCount.increment();
1386       Region region = getRegion(request.getRegion());
1387       HRegionInfo info = region.getRegionInfo();
1388       GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1389       builder.setRegionInfo(HRegionInfo.convert(info));
1390       if (request.hasCompactionState() && request.getCompactionState()) {
1391         builder.setCompactionState(region.getCompactionState());
1392       }
1393       builder.setIsRecovering(region.isRecovering());
1394       return builder.build();
1395     } catch (IOException ie) {
1396       throw new ServiceException(ie);
1397     }
1398   }
1399 
1400   /**
1401    * Get some information of the region server.
1402    *
1403    * @param controller the RPC controller
1404    * @param request the request
1405    * @throws ServiceException
1406    */
1407   @Override
1408   @QosPriority(priority=HConstants.ADMIN_QOS)
1409   public GetServerInfoResponse getServerInfo(final RpcController controller,
1410       final GetServerInfoRequest request) throws ServiceException {
1411     try {
1412       checkOpen();
1413     } catch (IOException ie) {
1414       throw new ServiceException(ie);
1415     }
1416     requestCount.increment();
1417     int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1;
1418     return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort);
1419   }
1420 
1421   @Override
1422   @QosPriority(priority=HConstants.ADMIN_QOS)
1423   public GetStoreFileResponse getStoreFile(final RpcController controller,
1424       final GetStoreFileRequest request) throws ServiceException {
1425     try {
1426       checkOpen();
1427       Region region = getRegion(request.getRegion());
1428       requestCount.increment();
1429       Set<byte[]> columnFamilies;
1430       if (request.getFamilyCount() == 0) {
1431         columnFamilies = region.getTableDesc().getFamiliesKeys();
1432       } else {
1433         columnFamilies = new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
1434         for (ByteString cf: request.getFamilyList()) {
1435           columnFamilies.add(cf.toByteArray());
1436         }
1437       }
1438       int nCF = columnFamilies.size();
1439       List<String>  fileList = region.getStoreFileList(
1440         columnFamilies.toArray(new byte[nCF][]));
1441       GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1442       builder.addAllStoreFile(fileList);
1443       return builder.build();
1444     } catch (IOException ie) {
1445       throw new ServiceException(ie);
1446     }
1447   }
1448 
1449   /**
1450    * Merge regions on the region server.
1451    *
1452    * @param controller the RPC controller
1453    * @param request the request
1454    * @return merge regions response
1455    * @throws ServiceException
1456    */
1457   @Override
1458   @QosPriority(priority = HConstants.ADMIN_QOS)
1459   public MergeRegionsResponse mergeRegions(final RpcController controller,
1460       final MergeRegionsRequest request) throws ServiceException {
1461     try {
1462       checkOpen();
1463       requestCount.increment();
1464       Region regionA = getRegion(request.getRegionA());
1465       Region regionB = getRegion(request.getRegionB());
1466       boolean forcible = request.getForcible();
1467       long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1468       regionA.startRegionOperation(Operation.MERGE_REGION);
1469       regionB.startRegionOperation(Operation.MERGE_REGION);
1470       if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
1471           regionB.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1472         throw new ServiceException(new MergeRegionException("Can't merge non-default replicas"));
1473       }
1474       LOG.info("Receiving merging request for  " + regionA + ", " + regionB
1475           + ",forcible=" + forcible);
1476       long startTime = EnvironmentEdgeManager.currentTime();
1477       FlushResult flushResult = regionA.flush(true);
1478       if (flushResult.isFlushSucceeded()) {
1479         long endTime = EnvironmentEdgeManager.currentTime();
1480         regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1481       }
1482       startTime = EnvironmentEdgeManager.currentTime();
1483       flushResult = regionB.flush(true);
1484       if (flushResult.isFlushSucceeded()) {
1485         long endTime = EnvironmentEdgeManager.currentTime();
1486         regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1487       }
1488       regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
1489           masterSystemTime, RpcServer.getRequestUser());
1490       return MergeRegionsResponse.newBuilder().build();
1491     } catch (DroppedSnapshotException ex) {
1492       regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1493       throw new ServiceException(ex);
1494     } catch (IOException ie) {
1495       throw new ServiceException(ie);
1496     }
1497   }
1498 
1499   /**
1500    * Open asynchronously a region or a set of regions on the region server.
1501    *
1502    * The opening is coordinated by ZooKeeper, and this method requires the znode to be created
1503    *  before being called. As a consequence, this method should be called only from the master.
1504    * <p>
1505    * Different manages states for the region are:
1506    * </p><ul>
1507    *  <li>region not opened: the region opening will start asynchronously.</li>
1508    *  <li>a close is already in progress: this is considered as an error.</li>
1509    *  <li>an open is already in progress: this new open request will be ignored. This is important
1510    *  because the Master can do multiple requests if it crashes.</li>
1511    *  <li>the region is already opened:  this new open request will be ignored.</li>
1512    *  </ul>
1513    * <p>
1514    * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
1515    * For a single region opening, errors are sent through a ServiceException. For bulk assign,
1516    * errors are put in the response as FAILED_OPENING.
1517    * </p>
1518    * @param controller the RPC controller
1519    * @param request the request
1520    * @throws ServiceException
1521    */
1522   @Override
1523   @QosPriority(priority=HConstants.ADMIN_QOS)
1524   public OpenRegionResponse openRegion(final RpcController controller,
1525       final OpenRegionRequest request) throws ServiceException {
1526     requestCount.increment();
1527     if (request.hasServerStartCode()) {
1528       // check that we are the same server that this RPC is intended for.
1529       long serverStartCode = request.getServerStartCode();
1530       if (regionServer.serverName.getStartcode() !=  serverStartCode) {
1531         throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1532             "different server with startCode: " + serverStartCode + ", this server is: "
1533             + regionServer.serverName));
1534       }
1535     }
1536 
1537     OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
1538     final int regionCount = request.getOpenInfoCount();
1539     final Map<TableName, HTableDescriptor> htds =
1540         new HashMap<TableName, HTableDescriptor>(regionCount);
1541     final boolean isBulkAssign = regionCount > 1;
1542     try {
1543       checkOpen();
1544     } catch (IOException ie) {
1545       TableName tableName = null;
1546       if (regionCount == 1) {
1547         RegionInfo ri = request.getOpenInfo(0).getRegion();
1548         if (ri != null) {
1549           tableName = ProtobufUtil.toTableName(ri.getTableName());
1550         }
1551       }
1552       if (!TableName.META_TABLE_NAME.equals(tableName)) {
1553         throw new ServiceException(ie);
1554       }
1555       // We are assigning meta, wait a little for regionserver to finish initialization.
1556       int timeout = regionServer.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1557         HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; // Quarter of RPC timeout
1558       long endTime = System.currentTimeMillis() + timeout;
1559       synchronized (regionServer.online) {
1560         try {
1561           while (System.currentTimeMillis() <= endTime
1562               && !regionServer.isStopped() && !regionServer.isOnline()) {
1563             regionServer.online.wait(regionServer.msgInterval);
1564           }
1565           checkOpen();
1566         } catch (InterruptedException t) {
1567           Thread.currentThread().interrupt();
1568           throw new ServiceException(t);
1569         } catch (IOException e) {
1570           throw new ServiceException(e);
1571         }
1572       }
1573     }
1574 
1575     long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1576 
1577     for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
1578       final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
1579       OpenRegionCoordination coordination = regionServer.getCoordinatedStateManager().
1580         getOpenRegionCoordination();
1581       OpenRegionCoordination.OpenRegionDetails ord =
1582         coordination.parseFromProtoRequest(regionOpenInfo);
1583 
1584       HTableDescriptor htd;
1585       try {
1586         final Region onlineRegion = regionServer.getFromOnlineRegions(region.getEncodedName());
1587         if (onlineRegion != null) {
1588           //Check if the region can actually be opened.
1589           if (onlineRegion.getCoprocessorHost() != null) {
1590             onlineRegion.getCoprocessorHost().preOpen();
1591           }
1592           // See HBASE-5094. Cross check with hbase:meta if still this RS is owning
1593           // the region.
1594           Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
1595             regionServer.getConnection(), region.getRegionName());
1596           if (regionServer.serverName.equals(p.getSecond())) {
1597             Boolean closing = regionServer.regionsInTransitionInRS.get(region.getEncodedNameAsBytes());
1598             // Map regionsInTransitionInRSOnly has an entry for a region only if the region
1599             // is in transition on this RS, so here closing can be null. If not null, it can
1600             // be true or false. True means the region is opening on this RS; while false
1601             // means the region is closing. Only return ALREADY_OPENED if not closing (i.e.
1602             // not in transition any more, or still transition to open.
1603             if (!Boolean.FALSE.equals(closing)
1604                 && regionServer.getFromOnlineRegions(region.getEncodedName()) != null) {
1605               LOG.warn("Attempted open of " + region.getEncodedName()
1606                 + " but already online on this server");
1607               builder.addOpeningState(RegionOpeningState.ALREADY_OPENED);
1608               continue;
1609             }
1610           } else {
1611             LOG.warn("The region " + region.getEncodedName() + " is online on this server"
1612               + " but hbase:meta does not have this server - continue opening.");
1613             regionServer.removeFromOnlineRegions(onlineRegion, null);
1614           }
1615         }
1616         LOG.info("Open " + region.getRegionNameAsString());
1617         htd = htds.get(region.getTable());
1618         if (htd == null) {
1619           htd = regionServer.tableDescriptors.get(region.getTable());
1620           if (htd == null) {
1621             throw new IOException("missing table descriptor for " + region.getEncodedName());
1622           }
1623           htds.put(region.getTable(), htd);
1624         }
1625 
1626         final Boolean previous = regionServer.regionsInTransitionInRS.putIfAbsent(
1627           region.getEncodedNameAsBytes(), Boolean.TRUE);
1628 
1629         if (Boolean.FALSE.equals(previous)) {
1630           // There is a close in progress. We need to mark this open as failed in ZK.
1631 
1632           coordination.tryTransitionFromOfflineToFailedOpen(regionServer, region, ord);
1633 
1634           throw new RegionAlreadyInTransitionException("Received OPEN for the region:"
1635             + region.getRegionNameAsString() + " , which we are already trying to CLOSE ");
1636         }
1637 
1638         if (Boolean.TRUE.equals(previous)) {
1639           // An open is in progress. This is supported, but let's log this.
1640           LOG.info("Receiving OPEN for the region:" +
1641             region.getRegionNameAsString() + " , which we are already trying to OPEN"
1642               + " - ignoring this new request for this region.");
1643         }
1644 
1645         // We are opening this region. If it moves back and forth for whatever reason, we don't
1646         // want to keep returning the stale moved record while we are opening/if we close again.
1647         regionServer.removeFromMovedRegions(region.getEncodedName());
1648 
1649         if (previous == null) {
1650           // check if the region to be opened is marked in recovering state in ZK
1651           if (ZKSplitLog.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(),
1652               region.getEncodedName())) {
1653             // Check if current region open is for distributedLogReplay. This check is to support
1654             // rolling restart/upgrade where we want to Master/RS see same configuration
1655             if (!regionOpenInfo.hasOpenForDistributedLogReplay()
1656                   || regionOpenInfo.getOpenForDistributedLogReplay()) {
1657               regionServer.recoveringRegions.put(region.getEncodedName(), null);
1658             } else {
1659               // Remove stale recovery region from ZK when we open region not for recovering which
1660               // could happen when turn distributedLogReplay off from on.
1661               List<String> tmpRegions = new ArrayList<String>();
1662               tmpRegions.add(region.getEncodedName());
1663               ZKSplitLog.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(),
1664                 tmpRegions);
1665             }
1666           }
1667           // If there is no action in progress, we can submit a specific handler.
1668           // Need to pass the expected version in the constructor.
1669           if (region.isMetaRegion()) {
1670             regionServer.service.submit(new OpenMetaHandler(
1671               regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1672           } else {
1673             regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
1674               regionOpenInfo.getFavoredNodesList());
1675             regionServer.service.submit(new OpenRegionHandler(
1676               regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1677           }
1678         }
1679 
1680         builder.addOpeningState(RegionOpeningState.OPENED);
1681 
1682       } catch (KeeperException zooKeeperEx) {
1683         LOG.error("Can't retrieve recovering state from zookeeper", zooKeeperEx);
1684         throw new ServiceException(zooKeeperEx);
1685       } catch (IOException ie) {
1686         LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
1687         if (isBulkAssign) {
1688           builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
1689         } else {
1690           throw new ServiceException(ie);
1691         }
1692       }
1693     }
1694     return builder.build();
1695   }
1696 
1697   /**
1698    *  Wamrmup a region on this server.
1699    *
1700    * This method should only be called by Master. It synchrnously opens the region and
1701    * closes the region bringing the most important pages in cache.
1702    * <p>
1703    *
1704    * @param controller the RPC controller
1705    * @param request the request
1706    * @throws ServiceException
1707    */
1708   @Override
1709   public WarmupRegionResponse warmupRegion(final RpcController controller,
1710       final WarmupRegionRequest request) throws ServiceException {
1711 
1712     RegionInfo regionInfo = request.getRegionInfo();
1713     final HRegionInfo region = HRegionInfo.convert(regionInfo);
1714     HTableDescriptor htd;
1715     WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
1716 
1717     try {
1718       checkOpen();
1719       String encodedName = region.getEncodedName();
1720       byte[] encodedNameBytes = region.getEncodedNameAsBytes();
1721       final Region onlineRegion = regionServer.getFromOnlineRegions(encodedName);
1722 
1723       if (onlineRegion != null) {
1724         LOG.info("Region already online. Skipping warming up " + region);
1725         return response;
1726       }
1727 
1728       if (LOG.isDebugEnabled()) {
1729         LOG.debug("Warming up Region " + region.getRegionNameAsString());
1730       }
1731 
1732       htd = regionServer.tableDescriptors.get(region.getTable());
1733 
1734       if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
1735         LOG.info("Region is in transition. Skipping warmup " + region);
1736         return response;
1737       }
1738 
1739       HRegion.warmupHRegion(region, htd, regionServer.getWAL(region),
1740           regionServer.getConfiguration(), regionServer, null);
1741 
1742     } catch (IOException ie) {
1743       LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie);
1744       throw new ServiceException(ie);
1745     }
1746 
1747     return response;
1748   }
1749 
1750   /**
1751    * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
1752    * that the given mutations will be durable on the receiving RS if this method returns without any
1753    * exception.
1754    * @param controller the RPC controller
1755    * @param request the request
1756    * @throws ServiceException
1757    */
1758   @Override
1759   @QosPriority(priority = HConstants.REPLAY_QOS)
1760   public ReplicateWALEntryResponse replay(final RpcController controller,
1761       final ReplicateWALEntryRequest request) throws ServiceException {
1762     long before = EnvironmentEdgeManager.currentTime();
1763     CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
1764     try {
1765       checkOpen();
1766       List<WALEntry> entries = request.getEntryList();
1767       if (entries == null || entries.isEmpty()) {
1768         // empty input
1769         return ReplicateWALEntryResponse.newBuilder().build();
1770       }
1771       ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
1772       Region region = regionServer.getRegionByEncodedName(regionName.toStringUtf8());
1773       RegionCoprocessorHost coprocessorHost =
1774           ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
1775             ? region.getCoprocessorHost()
1776             : null; // do not invoke coprocessors if this is a secondary region replica
1777       List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>();
1778 
1779       // Skip adding the edits to WAL if this is a secondary region replica
1780       boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1781       Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
1782 
1783       for (WALEntry entry : entries) {
1784         if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
1785           throw new NotServingRegionException("Replay request contains entries from multiple " +
1786               "regions. First region:" + regionName.toStringUtf8() + " , other region:"
1787               + entry.getKey().getEncodedRegionName());
1788         }
1789         if (regionServer.nonceManager != null && isPrimary) {
1790           long nonceGroup = entry.getKey().hasNonceGroup()
1791             ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
1792           long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
1793           regionServer.nonceManager.reportOperationFromWal(
1794               nonceGroup,
1795               nonce,
1796               entry.getKey().getWriteTime());
1797         }
1798         Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
1799           new Pair<WALKey, WALEdit>();
1800         List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
1801           cells, walEntry, durability);
1802         if (coprocessorHost != null) {
1803           // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
1804           // KeyValue.
1805           if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
1806             walEntry.getSecond())) {
1807             // if bypass this log entry, ignore it ...
1808             continue;
1809           }
1810           walEntries.add(walEntry);
1811         }
1812         if(edits!=null && !edits.isEmpty()) {
1813           long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
1814             entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
1815           OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
1816           // check if it's a partial success
1817           for (int i = 0; result != null && i < result.length; i++) {
1818             if (result[i] != OperationStatus.SUCCESS) {
1819               throw new IOException(result[i].getExceptionMsg());
1820             }
1821           }
1822         }
1823       }
1824 
1825       //sync wal at the end because ASYNC_WAL is used above
1826       WAL wal = getWAL(region);
1827       if (wal != null) {
1828         wal.sync();
1829       }
1830 
1831       if (coprocessorHost != null) {
1832         for (Pair<WALKey, WALEdit> entry : walEntries) {
1833           coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(),
1834             entry.getSecond());
1835         }
1836       }
1837       return ReplicateWALEntryResponse.newBuilder().build();
1838     } catch (IOException ie) {
1839       throw new ServiceException(ie);
1840     } finally {
1841       if (regionServer.metricsRegionServer != null) {
1842         regionServer.metricsRegionServer.updateReplay(
1843           EnvironmentEdgeManager.currentTime() - before);
1844       }
1845     }
1846   }
1847 
1848   WAL getWAL(Region region) {
1849     return ((HRegion)region).getWAL();
1850   }
1851 
1852   /**
1853    * Replicate WAL entries on the region server.
1854    *
1855    * @param controller the RPC controller
1856    * @param request the request
1857    * @throws ServiceException
1858    */
1859   @Override
1860   @QosPriority(priority=HConstants.REPLICATION_QOS)
1861   public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
1862       final ReplicateWALEntryRequest request) throws ServiceException {
1863     try {
1864       checkOpen();
1865       if (regionServer.replicationSinkHandler != null) {
1866         requestCount.increment();
1867         List<WALEntry> entries = request.getEntryList();
1868         CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner();
1869         regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
1870         regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner);
1871         regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner);
1872         return ReplicateWALEntryResponse.newBuilder().build();
1873       } else {
1874         throw new ServiceException("Replication services are not initialized yet");
1875       }
1876     } catch (IOException ie) {
1877       throw new ServiceException(ie);
1878     }
1879   }
1880 
1881   /**
1882    * Roll the WAL writer of the region server.
1883    * @param controller the RPC controller
1884    * @param request the request
1885    * @throws ServiceException
1886    */
1887   @Override
1888   public RollWALWriterResponse rollWALWriter(final RpcController controller,
1889       final RollWALWriterRequest request) throws ServiceException {
1890     try {
1891       checkOpen();
1892       requestCount.increment();
1893       regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest();
1894       regionServer.walRoller.requestRollAll();
1895       regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest();
1896       RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
1897       return builder.build();
1898     } catch (IOException ie) {
1899       throw new ServiceException(ie);
1900     }
1901   }
1902 
1903   /**
1904    * Split a region on the region server.
1905    *
1906    * @param controller the RPC controller
1907    * @param request the request
1908    * @throws ServiceException
1909    */
1910   @Override
1911   @QosPriority(priority=HConstants.ADMIN_QOS)
1912   public SplitRegionResponse splitRegion(final RpcController controller,
1913       final SplitRegionRequest request) throws ServiceException {
1914     try {
1915       checkOpen();
1916       requestCount.increment();
1917       Region region = getRegion(request.getRegion());
1918       region.startRegionOperation(Operation.SPLIT_REGION);
1919       if (region.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1920         throw new IOException("Can't split replicas directly. "
1921             + "Replicas are auto-split when their primary is split.");
1922       }
1923       LOG.info("Splitting " + region.getRegionInfo().getRegionNameAsString());
1924       long startTime = EnvironmentEdgeManager.currentTime();
1925       FlushResult flushResult = region.flush(true);
1926       if (flushResult.isFlushSucceeded()) {
1927         long endTime = EnvironmentEdgeManager.currentTime();
1928         regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1929       }
1930       byte[] splitPoint = null;
1931       if (request.hasSplitPoint()) {
1932         splitPoint = request.getSplitPoint().toByteArray();
1933       }
1934       ((HRegion)region).forceSplit(splitPoint);
1935       regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit(),
1936         RpcServer.getRequestUser());
1937       return SplitRegionResponse.newBuilder().build();
1938     } catch (DroppedSnapshotException ex) {
1939       regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1940       throw new ServiceException(ex);
1941     } catch (IOException ie) {
1942       throw new ServiceException(ie);
1943     }
1944   }
1945 
1946   /**
1947    * Stop the region server.
1948    *
1949    * @param controller the RPC controller
1950    * @param request the request
1951    * @throws ServiceException
1952    */
1953   @Override
1954   @QosPriority(priority=HConstants.ADMIN_QOS)
1955   public StopServerResponse stopServer(final RpcController controller,
1956       final StopServerRequest request) throws ServiceException {
1957     requestCount.increment();
1958     String reason = request.getReason();
1959     regionServer.stop(reason);
1960     return StopServerResponse.newBuilder().build();
1961   }
1962 
1963   @Override
1964   public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
1965       UpdateFavoredNodesRequest request) throws ServiceException {
1966     List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
1967     UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
1968     for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
1969       HRegionInfo hri = HRegionInfo.convert(regionUpdateInfo.getRegion());
1970       regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(),
1971         regionUpdateInfo.getFavoredNodesList());
1972     }
1973     respBuilder.setResponse(openInfoList.size());
1974     return respBuilder.build();
1975   }
1976 
1977   /**
1978    * Atomically bulk load several HFiles into an open region
1979    * @return true if successful, false is failed but recoverably (no action)
1980    * @throws ServiceException if failed unrecoverably
1981    */
1982   @Override
1983   public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
1984       final BulkLoadHFileRequest request) throws ServiceException {
1985     try {
1986       checkOpen();
1987       requestCount.increment();
1988       Region region = getRegion(request.getRegion());
1989       List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
1990       for (FamilyPath familyPath: request.getFamilyPathList()) {
1991         familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
1992           familyPath.getPath()));
1993       }
1994       boolean bypass = false;
1995       if (region.getCoprocessorHost() != null) {
1996         bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
1997       }
1998       boolean loaded = false;
1999       if (!bypass) {
2000         loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
2001       }
2002       if (region.getCoprocessorHost() != null) {
2003         loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
2004       }
2005       BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
2006       builder.setLoaded(loaded);
2007       return builder.build();
2008     } catch (IOException ie) {
2009       throw new ServiceException(ie);
2010     }
2011   }
2012 
2013   @Override
2014   public CoprocessorServiceResponse execService(final RpcController controller,
2015       final CoprocessorServiceRequest request) throws ServiceException {
2016     try {
2017       checkOpen();
2018       requestCount.increment();
2019       Region region = getRegion(request.getRegion());
2020       Message result = execServiceOnRegion(region, request.getCall());
2021       CoprocessorServiceResponse.Builder builder =
2022         CoprocessorServiceResponse.newBuilder();
2023       builder.setRegion(RequestConverter.buildRegionSpecifier(
2024         RegionSpecifierType.REGION_NAME, region.getRegionInfo().getRegionName()));
2025       builder.setValue(
2026         builder.getValueBuilder().setName(result.getClass().getName())
2027           .setValue(result.toByteString()));
2028       return builder.build();
2029     } catch (IOException ie) {
2030       throw new ServiceException(ie);
2031     }
2032   }
2033 
2034   private Message execServiceOnRegion(Region region,
2035       final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
2036     // ignore the passed in controller (from the serialized call)
2037     ServerRpcController execController = new ServerRpcController();
2038     return region.execService(execController, serviceCall);
2039   }
2040 
2041   /**
2042    * Get data from a table.
2043    *
2044    * @param controller the RPC controller
2045    * @param request the get request
2046    * @throws ServiceException
2047    */
2048   @Override
2049   public GetResponse get(final RpcController controller,
2050       final GetRequest request) throws ServiceException {
2051     long before = EnvironmentEdgeManager.currentTime();
2052     OperationQuota quota = null;
2053     try {
2054       checkOpen();
2055       requestCount.increment();
2056       Region region = getRegion(request.getRegion());
2057 
2058       GetResponse.Builder builder = GetResponse.newBuilder();
2059       ClientProtos.Get get = request.getGet();
2060       Boolean existence = null;
2061       Result r = null;
2062       quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
2063 
2064       if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2065         if (get.getColumnCount() != 1) {
2066           throw new DoNotRetryIOException(
2067             "get ClosestRowBefore supports one and only one family now, not "
2068               + get.getColumnCount() + " families");
2069         }
2070         byte[] row = get.getRow().toByteArray();
2071         byte[] family = get.getColumn(0).getFamily().toByteArray();
2072         r = region.getClosestRowBefore(row, family);
2073       } else {
2074         Get clientGet = ProtobufUtil.toGet(get);
2075         if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2076           existence = region.getCoprocessorHost().preExists(clientGet);
2077         }
2078         if (existence == null) {
2079           r = region.get(clientGet);
2080           if (get.getExistenceOnly()) {
2081             boolean exists = r.getExists();
2082             if (region.getCoprocessorHost() != null) {
2083               exists = region.getCoprocessorHost().postExists(clientGet, exists);
2084             }
2085             existence = exists;
2086           }
2087         }
2088       }
2089       if (existence != null){
2090         ClientProtos.Result pbr =
2091             ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2092         builder.setResult(pbr);
2093       } else  if (r != null) {
2094         ClientProtos.Result pbr = ProtobufUtil.toResult(r);
2095         builder.setResult(pbr);
2096       }
2097       //r.cells is null when an table.exists(get) call
2098       if (r != null && r.rawCells() != null) {
2099         quota.addGetResult(r);
2100       }
2101       return builder.build();
2102     } catch (IOException ie) {
2103       throw new ServiceException(ie);
2104     } finally {
2105       if (regionServer.metricsRegionServer != null) {
2106         regionServer.metricsRegionServer.updateGet(
2107           EnvironmentEdgeManager.currentTime() - before);
2108       }
2109       if (quota != null) {
2110         quota.close();
2111       }
2112     }
2113   }
2114 
2115   /**
2116    * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
2117    *
2118    * @param rpcc the RPC controller
2119    * @param request the multi request
2120    * @throws ServiceException
2121    */
2122   @Override
2123   public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2124   throws ServiceException {
2125     try {
2126       checkOpen();
2127     } catch (IOException ie) {
2128       throw new ServiceException(ie);
2129     }
2130 
2131     // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
2132     // It is also the conduit via which we pass back data.
2133     PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2134     CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
2135     if (controller != null) {
2136       controller.setCellScanner(null);
2137     }
2138 
2139     long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2140 
2141     // this will contain all the cells that we need to return. It's created later, if needed.
2142     List<CellScannable> cellsToReturn = null;
2143     MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2144     RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2145     Boolean processed = null;
2146 
2147     for (RegionAction regionAction : request.getRegionActionList()) {
2148       this.requestCount.add(regionAction.getActionCount());
2149       OperationQuota quota;
2150       Region region;
2151       regionActionResultBuilder.clear();
2152       try {
2153         region = getRegion(regionAction.getRegion());
2154         quota = getQuotaManager().checkQuota(region, regionAction.getActionList());
2155       } catch (IOException e) {
2156         rpcServer.getMetrics().exception(e);
2157         regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2158         responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2159         // All Mutations in this RegionAction not executed as we can not see the Region online here
2160         // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
2161         // corresponding to these Mutations.
2162         skipCellsForMutations(regionAction.getActionList(), cellScanner);
2163         continue;  // For this region it's a failure.
2164       }
2165 
2166       if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2167         // How does this call happen?  It may need some work to play well w/ the surroundings.
2168         // Need to return an item per Action along w/ Action index.  TODO.
2169         try {
2170           if (request.hasCondition()) {
2171             Condition condition = request.getCondition();
2172             byte[] row = condition.getRow().toByteArray();
2173             byte[] family = condition.getFamily().toByteArray();
2174             byte[] qualifier = condition.getQualifier().toByteArray();
2175             CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2176             ByteArrayComparable comparator =
2177                 ProtobufUtil.toComparator(condition.getComparator());
2178             processed = checkAndRowMutate(region, regionAction.getActionList(),
2179                   cellScanner, row, family, qualifier, compareOp, comparator);
2180           } else {
2181             ClientProtos.RegionLoadStats stats = mutateRows(region, regionAction.getActionList(),
2182                 cellScanner);
2183             // add the stats to the request
2184             if(stats != null) {
2185               responseBuilder.addRegionActionResult(RegionActionResult.newBuilder()
2186                   .addResultOrException(ResultOrException.newBuilder().setLoadStats(stats)));
2187             }
2188             processed = Boolean.TRUE;
2189           }
2190         } catch (IOException e) {
2191           rpcServer.getMetrics().exception(e);
2192           // As it's atomic, we may expect it's a global failure.
2193           regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2194         }
2195       } else {
2196         // doNonAtomicRegionMutation manages the exception internally
2197         cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2198             regionActionResultBuilder, cellsToReturn, nonceGroup);
2199       }
2200       responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2201       quota.close();
2202     }
2203     // Load the controller with the Cells to return.
2204     if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2205       controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
2206     }
2207     if (processed != null) {
2208       responseBuilder.setProcessed(processed);
2209     }
2210     return responseBuilder.build();
2211   }
2212 
2213   private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2214     if (cellScanner == null) {
2215       return;
2216     }
2217     for (Action action : actions) {
2218       skipCellsForMutation(action, cellScanner);
2219     }
2220   }
2221 
2222   private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2223     if (cellScanner == null) {
2224       return;
2225     }
2226     try {
2227       if (action.hasMutation()) {
2228         MutationProto m = action.getMutation();
2229         if (m.hasAssociatedCellCount()) {
2230           for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2231             cellScanner.advance();
2232           }
2233         }
2234       }
2235     } catch (IOException e) {
2236       // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
2237       // marked as failed as we could not see the Region here. At client side the top level
2238       // RegionAction exception will be considered first.
2239       LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2240     }
2241   }
2242 
2243   /**
2244    * Mutate data in a table.
2245    *
2246    * @param rpcc the RPC controller
2247    * @param request the mutate request
2248    * @throws ServiceException
2249    */
2250   @Override
2251   public MutateResponse mutate(final RpcController rpcc,
2252       final MutateRequest request) throws ServiceException {
2253     // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
2254     // It is also the conduit via which we pass back data.
2255     PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2256     CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2257     OperationQuota quota = null;
2258     // Clear scanner so we are not holding on to reference across call.
2259     if (controller != null) {
2260       controller.setCellScanner(null);
2261     }
2262     try {
2263       checkOpen();
2264       requestCount.increment();
2265       Region region = getRegion(request.getRegion());
2266       MutateResponse.Builder builder = MutateResponse.newBuilder();
2267       MutationProto mutation = request.getMutation();
2268       if (!region.getRegionInfo().isMetaTable()) {
2269         regionServer.cacheFlusher.reclaimMemStoreMemory();
2270       }
2271       long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2272       Result r = null;
2273       Boolean processed = null;
2274       MutationType type = mutation.getMutateType();
2275       long mutationSize = 0;
2276       quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
2277       switch (type) {
2278       case APPEND:
2279         // TODO: this doesn't actually check anything.
2280         r = append(region, quota, mutation, cellScanner, nonceGroup);
2281         break;
2282       case INCREMENT:
2283         // TODO: this doesn't actually check anything.
2284         r = increment(region, quota, mutation, cellScanner, nonceGroup);
2285         break;
2286       case PUT:
2287         Put put = ProtobufUtil.toPut(mutation, cellScanner);
2288         quota.addMutation(put);
2289         if (request.hasCondition()) {
2290           Condition condition = request.getCondition();
2291           byte[] row = condition.getRow().toByteArray();
2292           byte[] family = condition.getFamily().toByteArray();
2293           byte[] qualifier = condition.getQualifier().toByteArray();
2294           CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2295           ByteArrayComparable comparator =
2296             ProtobufUtil.toComparator(condition.getComparator());
2297           if (region.getCoprocessorHost() != null) {
2298             processed = region.getCoprocessorHost().preCheckAndPut(
2299               row, family, qualifier, compareOp, comparator, put);
2300           }
2301           if (processed == null) {
2302             boolean result = region.checkAndMutate(row, family,
2303               qualifier, compareOp, comparator, put, true);
2304             if (region.getCoprocessorHost() != null) {
2305               result = region.getCoprocessorHost().postCheckAndPut(row, family,
2306                 qualifier, compareOp, comparator, put, result);
2307             }
2308             processed = result;
2309           }
2310         } else {
2311           region.put(put);
2312           processed = Boolean.TRUE;
2313         }
2314         break;
2315       case DELETE:
2316         Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
2317         quota.addMutation(delete);
2318         if (request.hasCondition()) {
2319           Condition condition = request.getCondition();
2320           byte[] row = condition.getRow().toByteArray();
2321           byte[] family = condition.getFamily().toByteArray();
2322           byte[] qualifier = condition.getQualifier().toByteArray();
2323           CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2324           ByteArrayComparable comparator =
2325             ProtobufUtil.toComparator(condition.getComparator());
2326           if (region.getCoprocessorHost() != null) {
2327             processed = region.getCoprocessorHost().preCheckAndDelete(
2328               row, family, qualifier, compareOp, comparator, delete);
2329           }
2330           if (processed == null) {
2331             boolean result = region.checkAndMutate(row, family,
2332               qualifier, compareOp, comparator, delete, true);
2333             if (region.getCoprocessorHost() != null) {
2334               result = region.getCoprocessorHost().postCheckAndDelete(row, family,
2335                 qualifier, compareOp, comparator, delete, result);
2336             }
2337             processed = result;
2338           }
2339         } else {
2340           region.delete(delete);
2341           processed = Boolean.TRUE;
2342         }
2343         break;
2344       default:
2345           throw new DoNotRetryIOException(
2346             "Unsupported mutate type: " + type.name());
2347       }
2348       if (processed != null) builder.setProcessed(processed.booleanValue());
2349       addResult(builder, r, controller);
2350       return builder.build();
2351     } catch (IOException ie) {
2352       regionServer.checkFileSystem();
2353       throw new ServiceException(ie);
2354     } finally {
2355       if (quota != null) {
2356         quota.close();
2357       }
2358     }
2359   }
2360 
2361   /**
2362    * Scan data in a table.
2363    *
2364    * @param controller the RPC controller
2365    * @param request the scan request
2366    * @throws ServiceException
2367    */
2368   @Override
2369   public ScanResponse scan(final RpcController controller, final ScanRequest request)
2370   throws ServiceException {
2371     OperationQuota quota = null;
2372     Leases.Lease lease = null;
2373     String scannerName = null;
2374     try {
2375       if (!request.hasScannerId() && !request.hasScan()) {
2376         throw new DoNotRetryIOException(
2377           "Missing required input: scannerId or scan");
2378       }
2379       long scannerId = -1;
2380       if (request.hasScannerId()) {
2381         scannerId = request.getScannerId();
2382         scannerName = String.valueOf(scannerId);
2383       }
2384       try {
2385         checkOpen();
2386       } catch (IOException e) {
2387         // If checkOpen failed, server not running or filesystem gone,
2388         // cancel this lease; filesystem is gone or we're closing or something.
2389         if (scannerName != null) {
2390           LOG.debug("Server shutting down and client tried to access missing scanner "
2391             + scannerName);
2392           if (regionServer.leases != null) {
2393             try {
2394               regionServer.leases.cancelLease(scannerName);
2395             } catch (LeaseException le) {
2396               // No problem, ignore
2397               if (LOG.isTraceEnabled()) {
2398                 LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
2399               }
2400              }
2401           }
2402         }
2403         throw e;
2404       }
2405       requestCount.increment();
2406 
2407       int ttl = 0;
2408       Region region = null;
2409       RegionScanner scanner = null;
2410       RegionScannerHolder rsh = null;
2411       boolean moreResults = true;
2412       boolean closeScanner = false;
2413       boolean isSmallScan = false;
2414       RpcCallContext context = RpcServer.getCurrentCall();
2415       Object lastBlock = null;
2416 
2417       ScanResponse.Builder builder = ScanResponse.newBuilder();
2418       if (request.hasCloseScanner()) {
2419         closeScanner = request.getCloseScanner();
2420       }
2421       int rows = closeScanner ? 0 : 1;
2422       if (request.hasNumberOfRows()) {
2423         rows = request.getNumberOfRows();
2424       }
2425       if (request.hasScannerId()) {
2426         rsh = scanners.get(scannerName);
2427         if (rsh == null) {
2428           LOG.warn("Client tried to access missing scanner " + scannerName);
2429           throw new UnknownScannerException(
2430             "Unknown scanner '" + scannerName + "'. This can happen due to any of the following "
2431                 + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of "
2432                 + "long wait between consecutive client checkins, c) Server may be closing down, "
2433                 + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a "
2434                 + "possible fix would be increasing the value of"
2435                 + "'hbase.client.scanner.timeout.period' configuration.");
2436         }
2437         scanner = rsh.s;
2438         HRegionInfo hri = scanner.getRegionInfo();
2439         region = regionServer.getRegion(hri.getRegionName());
2440         if (region != rsh.r) { // Yes, should be the same instance
2441           throw new NotServingRegionException("Region was re-opened after the scanner"
2442             + scannerName + " was created: " + hri.getRegionNameAsString());
2443         }
2444       } else {
2445         region = getRegion(request.getRegion());
2446         ClientProtos.Scan protoScan = request.getScan();
2447         boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
2448         Scan scan = ProtobufUtil.toScan(protoScan);
2449         // if the request doesn't set this, get the default region setting.
2450         if (!isLoadingCfsOnDemandSet) {
2451           scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2452         }
2453 
2454         isSmallScan = scan.isSmall();
2455         if (!scan.hasFamilies()) {
2456           // Adding all families to scanner
2457           for (byte[] family: region.getTableDesc().getFamiliesKeys()) {
2458             scan.addFamily(family);
2459           }
2460         }
2461 
2462         if (region.getCoprocessorHost() != null) {
2463           scanner = region.getCoprocessorHost().preScannerOpen(scan);
2464         }
2465         if (scanner == null) {
2466           scanner = region.getScanner(scan);
2467         }
2468         if (region.getCoprocessorHost() != null) {
2469           scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
2470         }
2471         scannerId = addScanner(scanner, region);
2472         scannerName = String.valueOf(scannerId);
2473         ttl = this.scannerLeaseTimeoutPeriod;
2474       }
2475       if (request.hasRenew() && request.getRenew()) {
2476         rsh = scanners.get(scannerName);
2477         lease = regionServer.leases.removeLease(scannerName);
2478         if (lease != null && rsh != null) {
2479           regionServer.leases.addLease(lease);
2480           // Increment the nextCallSeq value which is the next expected from client.
2481           rsh.incNextCallSeq();
2482         }
2483         return builder.build();
2484       }
2485 
2486       quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
2487       long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
2488       if (rows > 0) {
2489         // if nextCallSeq does not match throw Exception straight away. This needs to be
2490         // performed even before checking of Lease.
2491         // See HBASE-5974
2492         if (request.hasNextCallSeq()) {
2493           if (rsh == null) {
2494             rsh = scanners.get(scannerName);
2495           }
2496           if (rsh != null) {
2497             if (request.getNextCallSeq() != rsh.getNextCallSeq()) {
2498               throw new OutOfOrderScannerNextException(
2499                 "Expected nextCallSeq: " + rsh.getNextCallSeq()
2500                 + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
2501                 "; request=" + TextFormat.shortDebugString(request));
2502             }
2503             // Increment the nextCallSeq value which is the next expected from client.
2504             rsh.incNextCallSeq();
2505           }
2506         }
2507         try {
2508           // Remove lease while its being processed in server; protects against case
2509           // where processing of request takes > lease expiration time.
2510           lease = regionServer.leases.removeLease(scannerName);
2511           List<Result> results = new ArrayList<Result>();
2512 
2513           boolean done = false;
2514           // Call coprocessor. Get region info from scanner.
2515           if (region != null && region.getCoprocessorHost() != null) {
2516             Boolean bypass = region.getCoprocessorHost().preScannerNext(
2517               scanner, results, rows);
2518             if (!results.isEmpty()) {
2519               for (Result r : results) {
2520                 lastBlock = addSize(context, r, lastBlock);
2521               }
2522             }
2523             if (bypass != null && bypass.booleanValue()) {
2524               done = true;
2525             }
2526           }
2527 
2528           if (!done) {
2529             long maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
2530             if (maxResultSize <= 0) {
2531               maxResultSize = maxQuotaResultSize;
2532             }
2533             // This is cells inside a row. Default size is 10 so if many versions or many cfs,
2534             // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
2535             // arbitrary 32. TODO: keep record of general size of results being returned.
2536             List<Cell> values = new ArrayList<Cell>(32);
2537             region.startRegionOperation(Operation.SCAN);
2538             try {
2539               int i = 0;
2540               synchronized(scanner) {
2541                 boolean stale = (region.getRegionInfo().getReplicaId() != 0);
2542                 boolean clientHandlesPartials =
2543                     request.hasClientHandlesPartials() && request.getClientHandlesPartials();
2544                 boolean clientHandlesHeartbeats =
2545                     request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
2546 
2547                 // On the server side we must ensure that the correct ordering of partial results is
2548                 // returned to the client to allow them to properly reconstruct the partial results.
2549                 // If the coprocessor host is adding to the result list, we cannot guarantee the
2550                 // correct ordering of partial results and so we prevent partial results from being
2551                 // formed.
2552                 boolean serverGuaranteesOrderOfPartials = results.isEmpty();
2553                 boolean allowPartialResults =
2554                     clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
2555                 boolean moreRows = false;
2556 
2557                 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
2558                 // certain time threshold on the server. When the time threshold is exceeded, the
2559                 // server stops the scan and sends back whatever Results it has accumulated within
2560                 // that time period (may be empty). Since heartbeat messages have the potential to
2561                 // create partial Results (in the event that the timeout occurs in the middle of a
2562                 // row), we must only generate heartbeat messages when the client can handle both
2563                 // heartbeats AND partials
2564                 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
2565 
2566                 // Default value of timeLimit is negative to indicate no timeLimit should be
2567                 // enforced.
2568                 long timeLimit = -1;
2569 
2570                 // Set the time limit to be half of the more restrictive timeout value (one of the
2571                 // timeout values must be positive). In the event that both values are positive, the
2572                 // more restrictive of the two is used to calculate the limit.
2573                 if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
2574                   long timeLimitDelta;
2575                   if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
2576                     timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
2577                   } else {
2578                     timeLimitDelta =
2579                         scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
2580                   }
2581                   // Use half of whichever timeout value was more restrictive... But don't allow
2582                   // the time limit to be less than the allowable minimum (could cause an
2583                   // immediatate timeout before scanning any data).
2584                   timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
2585                   timeLimit = System.currentTimeMillis() + timeLimitDelta;
2586                 }
2587 
2588                 final LimitScope sizeScope =
2589                     allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2590                 final LimitScope timeScope =
2591                     allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2592 
2593                 boolean trackMetrics =
2594                     request.hasTrackScanMetrics() && request.getTrackScanMetrics();
2595 
2596                 // Configure with limits for this RPC. Set keep progress true since size progress
2597                 // towards size limit should be kept between calls to nextRaw
2598                 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
2599                 contextBuilder.setSizeLimit(sizeScope, maxResultSize);
2600                 contextBuilder.setBatchLimit(scanner.getBatch());
2601                 contextBuilder.setTimeLimit(timeScope, timeLimit);
2602                 contextBuilder.setTrackMetrics(trackMetrics);
2603                 ScannerContext scannerContext = contextBuilder.build();
2604 
2605                 boolean limitReached = false;
2606                 while (i < rows) {
2607                   // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
2608                   // batch limit is a limit on the number of cells per Result. Thus, if progress is
2609                   // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
2610                   // reset the batch progress between nextRaw invocations since we don't want the
2611                   // batch progress from previous calls to affect future calls
2612                   scannerContext.setBatchProgress(0);
2613 
2614                   // Collect values to be returned here
2615                   moreRows = scanner.nextRaw(values, scannerContext);
2616 
2617                   if (!values.isEmpty()) {
2618                     final boolean partial = scannerContext.partialResultFormed();
2619                     Result r = Result.create(values, null, stale, partial);
2620                     lastBlock = addSize(context, r, lastBlock);
2621                     results.add(r);
2622                     i++;
2623                   }
2624 
2625                   boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
2626                   boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
2627                   boolean rowLimitReached = i >= rows;
2628                   limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
2629 
2630                   if (limitReached || !moreRows) {
2631                     if (LOG.isTraceEnabled()) {
2632                       LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: "
2633                           + moreRows + " scannerContext: " + scannerContext);
2634                     }
2635                     // We only want to mark a ScanResponse as a heartbeat message in the event that
2636                     // there are more values to be read server side. If there aren't more values,
2637                     // marking it as a heartbeat is wasteful because the client will need to issue
2638                     // another ScanRequest only to realize that they already have all the values
2639                     if (moreRows) {
2640                       // Heartbeat messages occur when the time limit has been reached.
2641                       builder.setHeartbeatMessage(timeLimitReached);
2642                     }
2643                     break;
2644                   }
2645                   values.clear();
2646                 }
2647 
2648                 if (limitReached || moreRows) {
2649                   // We stopped prematurely
2650                   builder.setMoreResultsInRegion(true);
2651                 } else {
2652                   // We didn't get a single batch
2653                   builder.setMoreResultsInRegion(false);
2654                 }
2655 
2656                 // Check to see if the client requested that we track metrics server side. If the
2657                 // client requested metrics, retrieve the metrics from the scanner context.
2658                 if (trackMetrics) {
2659                   Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
2660                   ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
2661                   NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
2662 
2663                   for (Entry<String, Long> entry : metrics.entrySet()) {
2664                     pairBuilder.setName(entry.getKey());
2665                     pairBuilder.setValue(entry.getValue());
2666                     metricBuilder.addMetrics(pairBuilder.build());
2667                   }
2668 
2669                   builder.setScanMetrics(metricBuilder.build());
2670                 }
2671               }
2672               region.updateReadRequestsCount(i);
2673               long responseCellSize = context != null ? context.getResponseCellSize() : 0;
2674               region.getMetrics().updateScanNext(responseCellSize);
2675               if (regionServer.metricsRegionServer != null) {
2676                 regionServer.metricsRegionServer.updateScannerNext(responseCellSize);
2677               }
2678             } finally {
2679               region.closeRegionOperation();
2680             }
2681 
2682             // coprocessor postNext hook
2683             if (region != null && region.getCoprocessorHost() != null) {
2684               region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
2685             }
2686           }
2687 
2688           quota.addScanResult(results);
2689 
2690           // If the scanner's filter - if any - is done with the scan
2691           // and wants to tell the client to stop the scan. This is done by passing
2692           // a null result, and setting moreResults to false.
2693           if (scanner.isFilterDone() && results.isEmpty()) {
2694             moreResults = false;
2695             results = null;
2696           } else {
2697             addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
2698           }
2699         } catch (IOException e) {
2700           // The scanner state might be left in a dirty state, so we will tell the Client to
2701           // fail this RPC and close the scanner while opening up another one from the start of
2702           // row that the client has last seen.
2703           closeScanner(region, scanner, scannerName);
2704 
2705           // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
2706           // used in two different semantics.
2707           // (1) The first is to close the client scanner and bubble up the exception all the way
2708           // to the application. This is preferred when the exception is really un-recoverable
2709           // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
2710           // bucket usually.
2711           // (2) Second semantics is to close the current region scanner only, but continue the
2712           // client scanner by overriding the exception. This is usually UnknownScannerException,
2713           // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
2714           // application-level ClientScanner has to continue without bubbling up the exception to
2715           // the client. See ClientScanner code to see how it deals with these special exceptions.
2716           if (e instanceof DoNotRetryIOException) {
2717             throw e;
2718           }
2719 
2720           // If it is a FileNotFoundException, wrap as a
2721           // DoNotRetryIOException. This can avoid the retry in ClientScanner.
2722           if (e instanceof FileNotFoundException) {
2723             throw new DoNotRetryIOException(e);
2724           }
2725 
2726           // We closed the scanner already. Instead of throwing the IOException, and client
2727           // retrying with the same scannerId only to get USE on the next RPC, we directly throw
2728           // a special exception to save an RPC.
2729           if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
2730             // 1.4.0+ clients know how to handle
2731             throw new ScannerResetException("Scanner is closed on the server-side", e);
2732           } else {
2733             // older clients do not know about SRE. Just throw USE, which they will handle
2734             throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
2735                 + " scanner state for clients older than 1.3.", e);
2736           }
2737         } finally {
2738           // We're done. On way out re-add the above removed lease.
2739           // Adding resets expiration time on lease.
2740           if (scanners.containsKey(scannerName)) {
2741             if (lease != null) regionServer.leases.addLease(lease);
2742             ttl = this.scannerLeaseTimeoutPeriod;
2743           }
2744         }
2745       }
2746 
2747       if (!moreResults || closeScanner) {
2748         ttl = 0;
2749         moreResults = false;
2750         closeScanner(region, scanner, scannerName);
2751       }
2752 
2753       if (ttl > 0) {
2754         builder.setTtl(ttl);
2755       }
2756       builder.setScannerId(scannerId);
2757       builder.setMoreResults(moreResults);
2758       return builder.build();
2759     } catch (IOException ie) {
2760       if (scannerName != null && ie instanceof NotServingRegionException) {
2761         RegionScannerHolder rsh = scanners.remove(scannerName);
2762         if (rsh != null) {
2763           try {
2764             RegionScanner scanner = rsh.s;
2765             LOG.warn(scannerName + " encountered " + ie.getMessage() + ", closing ...");
2766             scanner.close();
2767             regionServer.leases.cancelLease(scannerName);
2768           } catch (IOException e) {
2769            LOG.warn("Getting exception closing " + scannerName, e);
2770           }
2771         }
2772       }
2773       throw new ServiceException(ie);
2774     } finally {
2775       if (quota != null) {
2776         quota.close();
2777       }
2778     }
2779   }
2780 
2781   private boolean closeScanner(Region region, RegionScanner scanner, String scannerName)
2782       throws IOException {
2783     if (region != null && region.getCoprocessorHost() != null) {
2784       if (region.getCoprocessorHost().preScannerClose(scanner)) {
2785         return true; // bypass
2786       }
2787     }
2788     RegionScannerHolder rsh = scanners.remove(scannerName);
2789     if (rsh != null) {
2790       scanner = rsh.s;
2791       scanner.close();
2792       try {
2793         regionServer.leases.cancelLease(scannerName);
2794       } catch (LeaseException le) {
2795         // No problem, ignore
2796         if (LOG.isTraceEnabled()) {
2797           LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
2798         }
2799       }
2800       if (region != null && region.getCoprocessorHost() != null) {
2801         region.getCoprocessorHost().postScannerClose(scanner);
2802       }
2803     }
2804     return false;
2805   }
2806 
2807   @Override
2808   public CoprocessorServiceResponse execRegionServerService(RpcController controller,
2809       CoprocessorServiceRequest request) throws ServiceException {
2810     return regionServer.execRegionServerService(controller, request);
2811   }
2812 
2813   @Override
2814   public UpdateConfigurationResponse updateConfiguration(
2815       RpcController controller, UpdateConfigurationRequest request)
2816       throws ServiceException {
2817     try {
2818       this.regionServer.updateConfiguration();
2819     } catch (Exception e) {
2820       throw new ServiceException(e);
2821     }
2822     return UpdateConfigurationResponse.getDefaultInstance();
2823   }
2824 }