View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.net.BindException;
24  import java.net.InetSocketAddress;
25  import java.net.UnknownHostException;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.HashMap;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Map.Entry;
33  import java.util.NavigableMap;
34  import java.util.Set;
35  import java.util.TreeSet;
36  import java.util.concurrent.ConcurrentHashMap;
37  import java.util.concurrent.atomic.AtomicLong;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.hbase.Cell;
43  import org.apache.hadoop.hbase.CellScannable;
44  import org.apache.hadoop.hbase.CellScanner;
45  import org.apache.hadoop.hbase.CellUtil;
46  import org.apache.hadoop.hbase.DoNotRetryIOException;
47  import org.apache.hadoop.hbase.DroppedSnapshotException;
48  import org.apache.hadoop.hbase.HBaseIOException;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.HRegionInfo;
51  import org.apache.hadoop.hbase.HTableDescriptor;
52  import org.apache.hadoop.hbase.MetaTableAccessor;
53  import org.apache.hadoop.hbase.MultiActionResultTooLarge;
54  import org.apache.hadoop.hbase.NotServingRegionException;
55  import org.apache.hadoop.hbase.ServerName;
56  import org.apache.hadoop.hbase.TableName;
57  import org.apache.hadoop.hbase.UnknownScannerException;
58  import org.apache.hadoop.hbase.classification.InterfaceAudience;
59  import org.apache.hadoop.hbase.client.Append;
60  import org.apache.hadoop.hbase.client.ConnectionUtils;
61  import org.apache.hadoop.hbase.client.Delete;
62  import org.apache.hadoop.hbase.client.Durability;
63  import org.apache.hadoop.hbase.client.Get;
64  import org.apache.hadoop.hbase.client.Increment;
65  import org.apache.hadoop.hbase.client.Mutation;
66  import org.apache.hadoop.hbase.client.Put;
67  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
68  import org.apache.hadoop.hbase.client.Result;
69  import org.apache.hadoop.hbase.client.RowMutations;
70  import org.apache.hadoop.hbase.client.Scan;
71  import org.apache.hadoop.hbase.client.VersionInfoUtil;
72  import org.apache.hadoop.hbase.conf.ConfigurationObserver;
73  import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
74  import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
75  import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
76  import org.apache.hadoop.hbase.exceptions.MergeRegionException;
77  import org.apache.hadoop.hbase.exceptions.OperationConflictException;
78  import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
79  import org.apache.hadoop.hbase.exceptions.ScannerResetException;
80  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
81  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
82  import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
83  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
84  import org.apache.hadoop.hbase.ipc.PriorityFunction;
85  import org.apache.hadoop.hbase.ipc.QosPriority;
86  import org.apache.hadoop.hbase.ipc.RpcCallContext;
87  import org.apache.hadoop.hbase.ipc.RpcServer;
88  import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
89  import org.apache.hadoop.hbase.ipc.RpcServerInterface;
90  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
91  import org.apache.hadoop.hbase.ipc.ServerRpcController;
92  import org.apache.hadoop.hbase.master.MasterRpcServices;
93  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
94  import org.apache.hadoop.hbase.protobuf.RequestConverter;
95  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
96  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
97  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
98  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
99  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
100 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
101 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
102 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
103 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
104 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
105 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
106 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
107 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
108 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
109 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
110 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
111 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
112 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
113 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
114 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
115 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
116 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
117 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
118 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
119 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
120 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
121 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
122 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
123 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
124 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
125 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
126 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
127 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
128 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
129 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
130 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
131 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse;
132 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
133 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
134 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
135 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
136 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
137 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
138 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
139 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
140 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
141 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
142 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
143 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
144 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
145 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
146 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
147 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
148 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
149 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
150 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
151 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
152 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
153 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
154 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair;
155 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
156 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
157 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
158 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics;
159 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
160 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
161 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
162 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
163 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
164 import org.apache.hadoop.hbase.quotas.OperationQuota;
165 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
166 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
167 import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
168 import org.apache.hadoop.hbase.regionserver.Region.Operation;
169 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
170 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
171 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
172 import org.apache.hadoop.hbase.wal.WAL;
173 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
174 import org.apache.hadoop.hbase.security.User;
175 import org.apache.hadoop.hbase.util.Bytes;
176 import org.apache.hadoop.hbase.util.Counter;
177 import org.apache.hadoop.hbase.util.DNS;
178 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
179 import org.apache.hadoop.hbase.util.Pair;
180 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
181 import org.apache.hadoop.hbase.util.Strings;
182 import org.apache.hadoop.hbase.wal.WALKey;
183 import org.apache.hadoop.hbase.wal.WALSplitter;
184 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
185 import org.apache.zookeeper.KeeperException;
186 
187 import com.google.common.annotations.VisibleForTesting;
188 import com.google.protobuf.ByteString;
189 import com.google.protobuf.Message;
190 import com.google.protobuf.RpcController;
191 import com.google.protobuf.ServiceException;
192 import com.google.protobuf.TextFormat;
193 
194 /**
195  * Implements the regionserver RPC services.
196  */
197 @InterfaceAudience.Private
198 @SuppressWarnings("deprecation")
199 public class RSRpcServices implements HBaseRPCErrorHandler,
200     AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
201     ConfigurationObserver {
202   protected static final Log LOG = LogFactory.getLog(RSRpcServices.class);
203 
204   /** RPC scheduler to use for the region server. */
205   public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
206     "hbase.region.server.rpc.scheduler.factory.class";
207 
208   /**
209    * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
210    * configuration exists to prevent the scenario where a time limit is specified to be so
211    * restrictive that the time limit is reached immediately (before any cells are scanned).
212    */
213   private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
214       "hbase.region.server.rpc.minimum.scan.time.limit.delta";
215   /**
216    * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
217    */
218   private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
219 
220   // Request counter. (Includes requests that are not serviced by regions.)
221   final Counter requestCount = new Counter();
222   // Server to handle client requests.
223   final RpcServerInterface rpcServer;
224   final InetSocketAddress isa;
225 
226   private final HRegionServer regionServer;
227   private final long maxScannerResultSize;
228 
229   // The reference to the priority extraction function
230   private final PriorityFunction priority;
231 
232   private final AtomicLong scannerIdGen = new AtomicLong(0L);
233   private final ConcurrentHashMap<String, RegionScannerHolder> scanners =
234     new ConcurrentHashMap<String, RegionScannerHolder>();
235 
236   /**
237    * The lease timeout period for client scanners (milliseconds).
238    */
239   private final int scannerLeaseTimeoutPeriod;
240 
241   /**
242    * The RPC timeout period (milliseconds)
243    */
244   private final int rpcTimeout;
245 
246   /**
247    * The minimum allowable delta to use for the scan limit
248    */
249   private final long minimumScanTimeLimitDelta;
250 
251   /**
252    * Holder class which holds the RegionScanner and nextCallSeq together.
253    */
254   private static class RegionScannerHolder {
255     private AtomicLong nextCallSeq = new AtomicLong(0);
256     private RegionScanner s;
257     private Region r;
258 
259     public RegionScannerHolder(RegionScanner s, Region r) {
260       this.s = s;
261       this.r = r;
262     }
263 
264     private long getNextCallSeq() {
265       return nextCallSeq.get();
266     }
267 
268     private void incNextCallSeq() {
269       nextCallSeq.incrementAndGet();
270     }
271 
272     private void rollbackNextCallSeq() {
273       nextCallSeq.decrementAndGet();
274     }
275   }
276 
277   /**
278    * Instantiated as a scanner lease. If the lease times out, the scanner is
279    * closed
280    */
281   private class ScannerListener implements LeaseListener {
282     private final String scannerName;
283 
284     ScannerListener(final String n) {
285       this.scannerName = n;
286     }
287 
288     @Override
289     public void leaseExpired() {
290       RegionScannerHolder rsh = scanners.remove(this.scannerName);
291       if (rsh != null) {
292         RegionScanner s = rsh.s;
293         LOG.info("Scanner " + this.scannerName + " lease expired on region "
294           + s.getRegionInfo().getRegionNameAsString());
295         Region region = null;
296         try {
297           region = regionServer.getRegion(s.getRegionInfo().getRegionName());
298           if (region != null && region.getCoprocessorHost() != null) {
299             region.getCoprocessorHost().preScannerClose(s);
300           }
301         } catch (IOException e) {
302           LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
303         } finally {
304           try {
305             s.close();
306             if (region != null && region.getCoprocessorHost() != null) {
307               region.getCoprocessorHost().postScannerClose(s);
308             }
309           } catch (IOException e) {
310             LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
311           }
312         }
313       } else {
314         LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" +
315           " scanner found, hence no chance to close that related scanner!");
316       }
317     }
318   }
319 
320   private static ResultOrException getResultOrException(
321       final ClientProtos.Result r, final int index, final ClientProtos.RegionLoadStats stats) {
322     return getResultOrException(ResponseConverter.buildActionResult(r, stats), index);
323   }
324 
325   private static ResultOrException getResultOrException(final Exception e, final int index) {
326     return getResultOrException(ResponseConverter.buildActionResult(e), index);
327   }
328 
329   private static ResultOrException getResultOrException(
330       final ResultOrException.Builder builder, final int index) {
331     return builder.setIndex(index).build();
332   }
333 
334   /**
335    * Starts the nonce operation for a mutation, if needed.
336    * @param mutation Mutation.
337    * @param nonceGroup Nonce group from the request.
338    * @returns Nonce used (can be NO_NONCE).
339    */
340   private long startNonceOperation(final MutationProto mutation, long nonceGroup)
341       throws IOException, OperationConflictException {
342     if (regionServer.nonceManager == null || !mutation.hasNonce()) return HConstants.NO_NONCE;
343     boolean canProceed = false;
344     try {
345       canProceed = regionServer.nonceManager.startOperation(
346         nonceGroup, mutation.getNonce(), regionServer);
347     } catch (InterruptedException ex) {
348       throw new InterruptedIOException("Nonce start operation interrupted");
349     }
350     if (!canProceed) {
351       // TODO: instead, we could convert append/increment to get w/mvcc
352       String message = "The operation with nonce {" + nonceGroup + ", " + mutation.getNonce()
353         + "} on row [" + Bytes.toString(mutation.getRow().toByteArray())
354         + "] may have already completed";
355       throw new OperationConflictException(message);
356     }
357     return mutation.getNonce();
358   }
359 
360   /**
361    * Ends nonce operation for a mutation, if needed.
362    * @param mutation Mutation.
363    * @param nonceGroup Nonce group from the request. Always 0 in initial implementation.
364    * @param success Whether the operation for this nonce has succeeded.
365    */
366   private void endNonceOperation(final MutationProto mutation,
367       long nonceGroup, boolean success) {
368     if (regionServer.nonceManager != null && mutation.hasNonce()) {
369       regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success);
370     }
371   }
372 
373   /**
374    * @return True if current call supports cellblocks
375    */
376   private boolean isClientCellBlockSupport() {
377     RpcCallContext context = RpcServer.getCurrentCall();
378     return context != null && context.isClientCellBlockSupported();
379   }
380 
381   private void addResult(final MutateResponse.Builder builder,
382       final Result result, final PayloadCarryingRpcController rpcc) {
383     if (result == null) return;
384     if (isClientCellBlockSupport()) {
385       builder.setResult(ProtobufUtil.toResultNoData(result));
386       rpcc.setCellScanner(result.cellScanner());
387     } else {
388       ClientProtos.Result pbr = ProtobufUtil.toResult(result);
389       builder.setResult(pbr);
390     }
391   }
392 
393   private void addResults(final ScanResponse.Builder builder, final List<Result> results,
394       final RpcController controller, boolean isDefaultRegion) {
395     builder.setStale(!isDefaultRegion);
396     if (results == null || results.isEmpty()) return;
397     if (isClientCellBlockSupport()) {
398       for (Result res : results) {
399         builder.addCellsPerResult(res.size());
400         builder.addPartialFlagPerResult(res.isPartial());
401       }
402       ((PayloadCarryingRpcController)controller).
403         setCellScanner(CellUtil.createCellScanner(results));
404     } else {
405       for (Result res: results) {
406         ClientProtos.Result pbr = ProtobufUtil.toResult(res);
407         builder.addResults(pbr);
408       }
409     }
410   }
411 
412   /**
413    * Mutate a list of rows atomically.
414    *
415    * @param region
416    * @param actions
417    * @param cellScanner if non-null, the mutation data -- the Cell content.
418    * @throws IOException
419    */
420   private ClientProtos.RegionLoadStats mutateRows(final Region region,
421       final List<ClientProtos.Action> actions,
422       final CellScanner cellScanner) throws IOException {
423     if (!region.getRegionInfo().isMetaTable()) {
424       regionServer.cacheFlusher.reclaimMemStoreMemory();
425     }
426     RowMutations rm = null;
427     for (ClientProtos.Action action: actions) {
428       if (action.hasGet()) {
429         throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
430           action.getGet());
431       }
432       MutationType type = action.getMutation().getMutateType();
433       if (rm == null) {
434         rm = new RowMutations(action.getMutation().getRow().toByteArray());
435       }
436       switch (type) {
437         case PUT:
438           rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
439           break;
440         case DELETE:
441           rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
442           break;
443         default:
444           throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
445       }
446     }
447     region.mutateRow(rm);
448     return ((HRegion)region).getRegionStats();
449   }
450 
451   /**
452    * Mutate a list of rows atomically.
453    *
454    * @param region
455    * @param actions
456    * @param cellScanner if non-null, the mutation data -- the Cell content.
457    * @param row
458    * @param family
459    * @param qualifier
460    * @param compareOp
461    * @param comparator @throws IOException
462    */
463   private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions,
464       final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
465       CompareOp compareOp, ByteArrayComparable comparator) throws IOException {
466     if (!region.getRegionInfo().isMetaTable()) {
467       regionServer.cacheFlusher.reclaimMemStoreMemory();
468     }
469     RowMutations rm = null;
470     for (ClientProtos.Action action: actions) {
471       if (action.hasGet()) {
472         throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
473             action.getGet());
474       }
475       MutationType type = action.getMutation().getMutateType();
476       if (rm == null) {
477         rm = new RowMutations(action.getMutation().getRow().toByteArray());
478       }
479       switch (type) {
480         case PUT:
481           rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
482           break;
483         case DELETE:
484           rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
485           break;
486         default:
487           throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
488       }
489     }
490     return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, Boolean.TRUE);
491   }
492 
493   /**
494    * Execute an append mutation.
495    *
496    * @param region
497    * @param m
498    * @param cellScanner
499    * @return result to return to client if default operation should be
500    * bypassed as indicated by RegionObserver, null otherwise
501    * @throws IOException
502    */
503   private Result append(final Region region, final OperationQuota quota, final MutationProto m,
504       final CellScanner cellScanner, long nonceGroup) throws IOException {
505     long before = EnvironmentEdgeManager.currentTime();
506     Append append = ProtobufUtil.toAppend(m, cellScanner);
507     quota.addMutation(append);
508     Result r = null;
509     if (region.getCoprocessorHost() != null) {
510       r = region.getCoprocessorHost().preAppend(append);
511     }
512     if (r == null) {
513       long nonce = startNonceOperation(m, nonceGroup);
514       boolean success = false;
515       try {
516         r = region.append(append, nonceGroup, nonce);
517         success = true;
518       } finally {
519         endNonceOperation(m, nonceGroup, success);
520       }
521       if (region.getCoprocessorHost() != null) {
522         region.getCoprocessorHost().postAppend(append, r);
523       }
524     }
525     if (regionServer.metricsRegionServer != null) {
526       regionServer.metricsRegionServer.updateAppend(
527         EnvironmentEdgeManager.currentTime() - before);
528     }
529     return r;
530   }
531 
532   /**
533    * Execute an increment mutation.
534    *
535    * @param region
536    * @param mutation
537    * @return the Result
538    * @throws IOException
539    */
540   private Result increment(final Region region, final OperationQuota quota,
541       final MutationProto mutation, final CellScanner cells, long nonceGroup) throws IOException {
542     long before = EnvironmentEdgeManager.currentTime();
543     Increment increment = ProtobufUtil.toIncrement(mutation, cells);
544     quota.addMutation(increment);
545     Result r = null;
546     if (region.getCoprocessorHost() != null) {
547       r = region.getCoprocessorHost().preIncrement(increment);
548     }
549     if (r == null) {
550       long nonce = startNonceOperation(mutation, nonceGroup);
551       boolean success = false;
552       try {
553         r = region.increment(increment, nonceGroup, nonce);
554         success = true;
555       } finally {
556         endNonceOperation(mutation, nonceGroup, success);
557       }
558       if (region.getCoprocessorHost() != null) {
559         r = region.getCoprocessorHost().postIncrement(increment, r);
560       }
561     }
562     if (regionServer.metricsRegionServer != null) {
563       regionServer.metricsRegionServer.updateIncrement(
564         EnvironmentEdgeManager.currentTime() - before);
565     }
566     return r;
567   }
568 
569   /**
570    * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
571    * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
572    * @param region
573    * @param actions
574    * @param cellScanner
575    * @param builder
576    * @param cellsToReturn  Could be null. May be allocated in this method.  This is what this
577    * method returns as a 'result'.
578    * @return Return the <code>cellScanner</code> passed
579    */
580   private List<CellScannable> doNonAtomicRegionMutation(final Region region,
581       final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
582       final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup) {
583     // Gather up CONTIGUOUS Puts and Deletes in this mutations List.  Idea is that rather than do
584     // one at a time, we instead pass them in batch.  Be aware that the corresponding
585     // ResultOrException instance that matches each Put or Delete is then added down in the
586     // doBatchOp call.  We should be staying aligned though the Put and Delete are deferred/batched
587     List<ClientProtos.Action> mutations = null;
588     long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
589     RpcCallContext context = RpcServer.getCurrentCall();
590     IOException sizeIOE = null;
591     Object lastBlock = null;
592     for (ClientProtos.Action action : actions.getActionList()) {
593       ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
594       try {
595         Result r = null;
596 
597         if (context != null
598             && context.isRetryImmediatelySupported()
599             && (context.getResponseCellSize() > maxQuotaResultSize
600               || context.getResponseBlockSize() > maxQuotaResultSize)) {
601 
602           // We're storing the exception since the exception and reason string won't
603           // change after the response size limit is reached.
604           if (sizeIOE == null ) {
605             // We don't need the stack un-winding do don't throw the exception.
606             // Throwing will kill the JVM's JIT.
607             //
608             // Instead just create the exception and then store it.
609             sizeIOE = new MultiActionResultTooLarge("Max size exceeded"
610                 + " CellSize: " + context.getResponseCellSize()
611                 + " BlockSize: " + context.getResponseBlockSize());
612 
613             // Only report the exception once since there's only one request that
614             // caused the exception. Otherwise this number will dominate the exceptions count.
615             rpcServer.getMetrics().exception(sizeIOE);
616           }
617 
618           // Now that there's an exception is known to be created
619           // use it for the response.
620           //
621           // This will create a copy in the builder.
622           resultOrExceptionBuilder = ResultOrException.newBuilder().
623               setException(ResponseConverter.buildException(sizeIOE));
624           resultOrExceptionBuilder.setIndex(action.getIndex());
625           builder.addResultOrException(resultOrExceptionBuilder.build());
626           if (cellScanner != null) {
627             skipCellsForMutation(action, cellScanner);
628           }
629           continue;
630         }
631         if (action.hasGet()) {
632           long before = EnvironmentEdgeManager.currentTime();
633           try {
634             Get get = ProtobufUtil.toGet(action.getGet());
635             r = region.get(get);
636           } finally {
637             if (regionServer.metricsRegionServer != null) {
638               regionServer.metricsRegionServer.updateGet(
639                 EnvironmentEdgeManager.currentTime() - before);
640             }
641           }
642         } else if (action.hasServiceCall()) {
643           resultOrExceptionBuilder = ResultOrException.newBuilder();
644           try {
645             Message result = execServiceOnRegion(region, action.getServiceCall());
646             ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
647                 ClientProtos.CoprocessorServiceResult.newBuilder();
648             resultOrExceptionBuilder.setServiceResult(
649                 serviceResultBuilder.setValue(
650                   serviceResultBuilder.getValueBuilder()
651                     .setName(result.getClass().getName())
652                     .setValue(result.toByteString())));
653           } catch (IOException ioe) {
654             rpcServer.getMetrics().exception(ioe);
655             resultOrExceptionBuilder.setException(ResponseConverter.buildException(ioe));
656           }
657         } else if (action.hasMutation()) {
658           MutationType type = action.getMutation().getMutateType();
659           if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
660               !mutations.isEmpty()) {
661             // Flush out any Puts or Deletes already collected.
662             doBatchOp(builder, region, quota, mutations, cellScanner);
663             mutations.clear();
664           }
665           switch (type) {
666           case APPEND:
667             r = append(region, quota, action.getMutation(), cellScanner, nonceGroup);
668             break;
669           case INCREMENT:
670             r = increment(region, quota, action.getMutation(), cellScanner,  nonceGroup);
671             break;
672           case PUT:
673           case DELETE:
674             // Collect the individual mutations and apply in a batch
675             if (mutations == null) {
676               mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount());
677             }
678             mutations.add(action);
679             break;
680           default:
681             throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
682           }
683         } else {
684           throw new HBaseIOException("Unexpected Action type");
685         }
686         if (r != null) {
687           ClientProtos.Result pbResult = null;
688           if (isClientCellBlockSupport()) {
689             pbResult = ProtobufUtil.toResultNoData(r);
690             //  Hard to guess the size here.  Just make a rough guess.
691             if (cellsToReturn == null) {
692               cellsToReturn = new ArrayList<CellScannable>();
693             }
694             cellsToReturn.add(r);
695           } else {
696             pbResult = ProtobufUtil.toResult(r);
697           }
698           lastBlock = addSize(context, r, lastBlock);
699           resultOrExceptionBuilder =
700             ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
701         }
702         // Could get to here and there was no result and no exception.  Presumes we added
703         // a Put or Delete to the collecting Mutations List for adding later.  In this
704         // case the corresponding ResultOrException instance for the Put or Delete will be added
705         // down in the doBatchOp method call rather than up here.
706       } catch (IOException ie) {
707         rpcServer.getMetrics().exception(ie);
708         resultOrExceptionBuilder = ResultOrException.newBuilder().
709           setException(ResponseConverter.buildException(ie));
710       }
711       if (resultOrExceptionBuilder != null) {
712         // Propagate index.
713         resultOrExceptionBuilder.setIndex(action.getIndex());
714         builder.addResultOrException(resultOrExceptionBuilder.build());
715       }
716     }
717     // Finish up any outstanding mutations
718     if (mutations != null && !mutations.isEmpty()) {
719       doBatchOp(builder, region, quota, mutations, cellScanner);
720     }
721     return cellsToReturn;
722   }
723 
724   /**
725    * Execute a list of Put/Delete mutations.
726    *
727    * @param builder
728    * @param region
729    * @param mutations
730    */
731   private void doBatchOp(final RegionActionResult.Builder builder, final Region region,
732       final OperationQuota quota,
733       final List<ClientProtos.Action> mutations, final CellScanner cells) {
734     Mutation[] mArray = new Mutation[mutations.size()];
735     long before = EnvironmentEdgeManager.currentTime();
736     boolean batchContainsPuts = false, batchContainsDelete = false;
737     try {
738       int i = 0;
739       for (ClientProtos.Action action: mutations) {
740         MutationProto m = action.getMutation();
741         Mutation mutation;
742         if (m.getMutateType() == MutationType.PUT) {
743           mutation = ProtobufUtil.toPut(m, cells);
744           batchContainsPuts = true;
745         } else {
746           mutation = ProtobufUtil.toDelete(m, cells);
747           batchContainsDelete = true;
748         }
749         mArray[i++] = mutation;
750         quota.addMutation(mutation);
751       }
752 
753       if (!region.getRegionInfo().isMetaTable()) {
754         regionServer.cacheFlusher.reclaimMemStoreMemory();
755       }
756 
757       OperationStatus[] codes = region.batchMutate(mArray, HConstants.NO_NONCE,
758         HConstants.NO_NONCE);
759       for (i = 0; i < codes.length; i++) {
760         int index = mutations.get(i).getIndex();
761         Exception e = null;
762         switch (codes[i].getOperationStatusCode()) {
763           case BAD_FAMILY:
764             e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
765             builder.addResultOrException(getResultOrException(e, index));
766             break;
767 
768           case SANITY_CHECK_FAILURE:
769             e = new FailedSanityCheckException(codes[i].getExceptionMsg());
770             builder.addResultOrException(getResultOrException(e, index));
771             break;
772 
773           default:
774             e = new DoNotRetryIOException(codes[i].getExceptionMsg());
775             builder.addResultOrException(getResultOrException(e, index));
776             break;
777 
778           case SUCCESS:
779             builder.addResultOrException(getResultOrException(
780                 ClientProtos.Result.getDefaultInstance(), index,
781                 ((HRegion) region).getRegionStats()));
782             break;
783         }
784       }
785     } catch (IOException ie) {
786       for (int i = 0; i < mutations.size(); i++) {
787         builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex()));
788       }
789     }
790     if (regionServer.metricsRegionServer != null) {
791       long after = EnvironmentEdgeManager.currentTime();
792       if (batchContainsPuts) {
793         regionServer.metricsRegionServer.updatePut(after - before);
794       }
795       if (batchContainsDelete) {
796         regionServer.metricsRegionServer.updateDelete(after - before);
797       }
798     }
799   }
800 
801   /**
802    * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
803    * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
804    * @param region
805    * @param mutations
806    * @param replaySeqId
807    * @return an array of OperationStatus which internally contains the OperationStatusCode and the
808    *         exceptionMessage if any
809    * @throws IOException
810    */
811   private OperationStatus [] doReplayBatchOp(final Region region,
812       final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
813     long before = EnvironmentEdgeManager.currentTime();
814     boolean batchContainsPuts = false, batchContainsDelete = false;
815     try {
816       for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) {
817         WALSplitter.MutationReplay m = it.next();
818 
819         if (m.type == MutationType.PUT) {
820           batchContainsPuts = true;
821         } else {
822           batchContainsDelete = true;
823         }
824 
825         NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
826         List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
827         if (metaCells != null && !metaCells.isEmpty()) {
828           for (Cell metaCell : metaCells) {
829             CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
830             boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
831             HRegion hRegion = (HRegion)region;
832             if (compactionDesc != null) {
833               // replay the compaction. Remove the files from stores only if we are the primary
834               // region replica (thus own the files)
835               hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
836                 replaySeqId);
837               continue;
838             }
839             FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
840             if (flushDesc != null && !isDefaultReplica) {
841               hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
842               continue;
843             }
844             RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
845             if (regionEvent != null && !isDefaultReplica) {
846               hRegion.replayWALRegionEventMarker(regionEvent);
847               continue;
848             }
849             BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
850             if (bulkLoadEvent != null) {
851               hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
852               continue;
853             }
854           }
855           it.remove();
856         }
857       }
858       requestCount.add(mutations.size());
859       if (!region.getRegionInfo().isMetaTable()) {
860         regionServer.cacheFlusher.reclaimMemStoreMemory();
861       }
862       return region.batchReplay(mutations.toArray(
863         new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
864     } finally {
865       if (regionServer.metricsRegionServer != null) {
866         long after = EnvironmentEdgeManager.currentTime();
867           if (batchContainsPuts) {
868           regionServer.metricsRegionServer.updatePut(after - before);
869         }
870         if (batchContainsDelete) {
871           regionServer.metricsRegionServer.updateDelete(after - before);
872         }
873       }
874     }
875   }
876 
877   private void closeAllScanners() {
878     // Close any outstanding scanners. Means they'll get an UnknownScanner
879     // exception next time they come in.
880     for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
881       try {
882         e.getValue().s.close();
883       } catch (IOException ioe) {
884         LOG.warn("Closing scanner " + e.getKey(), ioe);
885       }
886     }
887   }
888 
889   public RSRpcServices(HRegionServer rs) throws IOException {
890     regionServer = rs;
891 
892     RpcSchedulerFactory rpcSchedulerFactory;
893     try {
894       Class<?> rpcSchedulerFactoryClass = rs.conf.getClass(
895           REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
896           SimpleRpcSchedulerFactory.class);
897       rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());
898     } catch (InstantiationException e) {
899       throw new IllegalArgumentException(e);
900     } catch (IllegalAccessException e) {
901       throw new IllegalArgumentException(e);
902     }
903     // Server to handle client requests.
904     InetSocketAddress initialIsa;
905     InetSocketAddress bindAddress;
906     if(this instanceof MasterRpcServices) {
907       String hostname = getHostname(rs.conf, true);
908       int port = rs.conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
909       // Creation of a HSA will force a resolve.
910       initialIsa = new InetSocketAddress(hostname, port);
911       bindAddress = new InetSocketAddress(rs.conf.get("hbase.master.ipc.address", hostname), port);
912     } else {
913       String hostname = getHostname(rs.conf, false);
914       int port = rs.conf.getInt(HConstants.REGIONSERVER_PORT,
915         HConstants.DEFAULT_REGIONSERVER_PORT);
916       // Creation of a HSA will force a resolve.
917       initialIsa = new InetSocketAddress(hostname, port);
918       bindAddress = new InetSocketAddress(
919         rs.conf.get("hbase.regionserver.ipc.address", hostname), port);
920     }
921     if (initialIsa.getAddress() == null) {
922       throw new IllegalArgumentException("Failed resolve of " + initialIsa);
923     }
924     priority = createPriority();
925     String name = rs.getProcessName() + "/" + initialIsa.toString();
926     // Set how many times to retry talking to another server over HConnection.
927     ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
928     try {
929       rpcServer = new RpcServer(rs, name, getServices(),
930           bindAddress, // use final bindAddress for this server.
931           rs.conf,
932           rpcSchedulerFactory.create(rs.conf, this, rs));
933     } catch (BindException be) {
934       String configName = (this instanceof MasterRpcServices) ? HConstants.MASTER_PORT :
935           HConstants.REGIONSERVER_PORT;
936       throw new IOException(be.getMessage() + ". To switch ports use the '" + configName +
937           "' configuration property.", be.getCause() != null ? be.getCause() : be);
938     }
939 
940     scannerLeaseTimeoutPeriod = rs.conf.getInt(
941       HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
942       HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
943     maxScannerResultSize = rs.conf.getLong(
944       HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
945       HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
946     rpcTimeout = rs.conf.getInt(
947       HConstants.HBASE_RPC_TIMEOUT_KEY,
948       HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
949     minimumScanTimeLimitDelta = rs.conf.getLong(
950       REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
951       DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
952 
953     InetSocketAddress address = rpcServer.getListenerAddress();
954     if (address == null) {
955       throw new IOException("Listener channel is closed");
956     }
957     // Set our address, however we need the final port that was given to rpcServer
958     isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
959     rpcServer.setErrorHandler(this);
960     rs.setName(name);
961   }
962 
963   @Override
964   public void onConfigurationChange(Configuration newConf) {
965     if (rpcServer instanceof ConfigurationObserver) {
966       ((ConfigurationObserver)rpcServer).onConfigurationChange(newConf);
967     }
968   }
969 
970   protected PriorityFunction createPriority() {
971     return new AnnotationReadingPriorityFunction(this);
972   }
973 
974   public static String getHostname(Configuration conf, boolean isMaster)
975       throws UnknownHostException {
976     String hostname = conf.get(isMaster? HRegionServer.MASTER_HOSTNAME_KEY :
977       HRegionServer.RS_HOSTNAME_KEY);
978     if (hostname == null || hostname.isEmpty()) {
979       String masterOrRS = isMaster ? "master" : "regionserver";
980       return Strings.domainNamePointerToHostName(DNS.getDefaultHost(
981         conf.get("hbase." + masterOrRS + ".dns.interface", "default"),
982         conf.get("hbase." + masterOrRS + ".dns.nameserver", "default")));
983     } else {
984       LOG.info("hostname is configured to be " + hostname);
985       return hostname;
986     }
987   }
988 
989   RegionScanner getScanner(long scannerId) {
990     String scannerIdString = Long.toString(scannerId);
991     RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
992     if (scannerHolder != null) {
993       return scannerHolder.s;
994     }
995     return null;
996   }
997 
998   /**
999    * Get the vtime associated with the scanner.
1000    * Currently the vtime is the number of "next" calls.
1001    */
1002   long getScannerVirtualTime(long scannerId) {
1003     String scannerIdString = Long.toString(scannerId);
1004     RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
1005     if (scannerHolder != null) {
1006       return scannerHolder.getNextCallSeq();
1007     }
1008     return 0L;
1009   }
1010 
1011   long addScanner(RegionScanner s, Region r) throws LeaseStillHeldException {
1012     long scannerId = this.scannerIdGen.incrementAndGet();
1013     String scannerName = String.valueOf(scannerId);
1014 
1015     RegionScannerHolder existing =
1016       scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r));
1017     assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
1018 
1019     regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
1020         new ScannerListener(scannerName));
1021     return scannerId;
1022   }
1023 
1024   /**
1025    * Method to account for the size of retained cells and retained data blocks.
1026    * @return an object that represents the last referenced block from this response.
1027    */
1028   Object addSize(RpcCallContext context, Result r, Object lastBlock) {
1029     if (context != null && !r.isEmpty()) {
1030       for (Cell c : r.rawCells()) {
1031         context.incrementResponseCellSize(CellUtil.estimatedHeapSizeOf(c));
1032         // We're using the last block being the same as the current block as
1033         // a proxy for pointing to a new block. This won't be exact.
1034         // If there are multiple gets that bounce back and forth
1035         // Then it's possible that this will over count the size of
1036         // referenced blocks. However it's better to over count and
1037         // use two RPC's than to OOME the RegionServer.
1038         byte[] valueArray = c.getValueArray();
1039         if (valueArray != lastBlock) {
1040           context.incrementResponseBlockSize(valueArray.length);
1041           lastBlock = valueArray;
1042         }
1043       }
1044     }
1045     return lastBlock;
1046   }
1047 
1048 
1049   /**
1050    * Find the HRegion based on a region specifier
1051    *
1052    * @param regionSpecifier the region specifier
1053    * @return the corresponding region
1054    * @throws IOException if the specifier is not null,
1055    *    but failed to find the region
1056    */
1057   Region getRegion(
1058       final RegionSpecifier regionSpecifier) throws IOException {
1059     return regionServer.getRegionByEncodedName(regionSpecifier.getValue().toByteArray(),
1060         ProtobufUtil.getRegionEncodedName(regionSpecifier));
1061   }
1062 
1063   @VisibleForTesting
1064   public PriorityFunction getPriority() {
1065     return priority;
1066   }
1067 
1068   @VisibleForTesting
1069   public Configuration getConfiguration() {
1070     return regionServer.getConfiguration();
1071   }
1072 
1073   private RegionServerQuotaManager getQuotaManager() {
1074     return regionServer.getRegionServerQuotaManager();
1075   }
1076 
1077   void start() {
1078     rpcServer.start();
1079   }
1080 
1081   void stop() {
1082     closeAllScanners();
1083     rpcServer.stop();
1084   }
1085 
1086   /**
1087    * Called to verify that this server is up and running.
1088    *
1089    * @throws IOException
1090    */
1091   protected void checkOpen() throws IOException {
1092     if (regionServer.isAborted()) {
1093       throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting");
1094     }
1095     if (regionServer.isStopped()) {
1096       throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping");
1097     }
1098     if (!regionServer.fsOk) {
1099       throw new RegionServerStoppedException("File system not available");
1100     }
1101     if (!regionServer.isOnline()) {
1102       throw new ServerNotRunningYetException("Server is not running yet");
1103     }
1104   }
1105 
1106   /**
1107    * @return list of blocking services and their security info classes that this server supports
1108    */
1109   protected List<BlockingServiceAndInterface> getServices() {
1110     List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(2);
1111     bssi.add(new BlockingServiceAndInterface(
1112       ClientService.newReflectiveBlockingService(this),
1113       ClientService.BlockingInterface.class));
1114     bssi.add(new BlockingServiceAndInterface(
1115       AdminService.newReflectiveBlockingService(this),
1116       AdminService.BlockingInterface.class));
1117     return bssi;
1118   }
1119 
1120   public InetSocketAddress getSocketAddress() {
1121     return isa;
1122   }
1123 
1124   @Override
1125   public int getPriority(RequestHeader header, Message param, User user) {
1126     return priority.getPriority(header, param, user);
1127   }
1128 
1129   @Override
1130   public long getDeadline(RequestHeader header, Message param) {
1131     return priority.getDeadline(header, param);
1132   }
1133 
1134   /*
1135    * Check if an OOME and, if so, abort immediately to avoid creating more objects.
1136    *
1137    * @param e
1138    *
1139    * @return True if we OOME'd and are aborting.
1140    */
1141   @Override
1142   public boolean checkOOME(final Throwable e) {
1143     boolean stop = false;
1144     try {
1145       if (e instanceof OutOfMemoryError
1146           || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
1147           || (e.getMessage() != null && e.getMessage().contains(
1148               "java.lang.OutOfMemoryError"))) {
1149         stop = true;
1150         LOG.fatal("Run out of memory; " + getClass().getSimpleName()
1151           + " will abort itself immediately", e);
1152       }
1153     } finally {
1154       if (stop) {
1155         Runtime.getRuntime().halt(1);
1156       }
1157     }
1158     return stop;
1159   }
1160 
1161   /**
1162    * Close a region on the region server.
1163    *
1164    * @param controller the RPC controller
1165    * @param request the request
1166    * @throws ServiceException
1167    */
1168   @Override
1169   @QosPriority(priority=HConstants.ADMIN_QOS)
1170   public CloseRegionResponse closeRegion(final RpcController controller,
1171       final CloseRegionRequest request) throws ServiceException {
1172     final ServerName sn = (request.hasDestinationServer() ?
1173       ProtobufUtil.toServerName(request.getDestinationServer()) : null);
1174 
1175     try {
1176       checkOpen();
1177       if (request.hasServerStartCode()) {
1178         // check that we are the same server that this RPC is intended for.
1179         long serverStartCode = request.getServerStartCode();
1180         if (regionServer.serverName.getStartcode() !=  serverStartCode) {
1181           throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1182               "different server with startCode: " + serverStartCode + ", this server is: "
1183               + regionServer.serverName));
1184         }
1185       }
1186       final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1187 
1188       requestCount.increment();
1189       LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1190       CloseRegionCoordination.CloseRegionDetails crd = regionServer.getCoordinatedStateManager()
1191         .getCloseRegionCoordination().parseFromProtoRequest(request);
1192 
1193       boolean closed = regionServer.closeRegion(encodedRegionName, false, crd, sn);
1194       CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1195       return builder.build();
1196     } catch (IOException ie) {
1197       throw new ServiceException(ie);
1198     }
1199   }
1200 
1201   /**
1202    * Compact a region on the region server.
1203    *
1204    * @param controller the RPC controller
1205    * @param request the request
1206    * @throws ServiceException
1207    */
1208   @Override
1209   @QosPriority(priority=HConstants.ADMIN_QOS)
1210   public CompactRegionResponse compactRegion(final RpcController controller,
1211       final CompactRegionRequest request) throws ServiceException {
1212     try {
1213       checkOpen();
1214       requestCount.increment();
1215       Region region = getRegion(request.getRegion());
1216       region.startRegionOperation(Operation.COMPACT_REGION);
1217       LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1218       boolean major = false;
1219       byte [] family = null;
1220       Store store = null;
1221       if (request.hasFamily()) {
1222         family = request.getFamily().toByteArray();
1223         store = region.getStore(family);
1224         if (store == null) {
1225           throw new ServiceException(new IOException("column family " + Bytes.toString(family)
1226             + " does not exist in region " + region.getRegionInfo().getRegionNameAsString()));
1227         }
1228       }
1229       if (request.hasMajor()) {
1230         major = request.getMajor();
1231       }
1232       if (major) {
1233         if (family != null) {
1234           store.triggerMajorCompaction();
1235         } else {
1236           region.triggerMajorCompaction();
1237         }
1238       }
1239 
1240       String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):"";
1241       if (LOG.isTraceEnabled()) {
1242         LOG.trace("User-triggered compaction requested for region "
1243           + region.getRegionInfo().getRegionNameAsString() + familyLogMsg);
1244       }
1245       String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
1246       if(family != null) {
1247         regionServer.compactSplitThread.requestCompaction(region, store, log,
1248           Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1249       } else {
1250         regionServer.compactSplitThread.requestCompaction(region, log,
1251           Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1252       }
1253       return CompactRegionResponse.newBuilder().build();
1254     } catch (IOException ie) {
1255       throw new ServiceException(ie);
1256     }
1257   }
1258 
1259   /**
1260    * Flush a region on the region server.
1261    *
1262    * @param controller the RPC controller
1263    * @param request the request
1264    * @throws ServiceException
1265    */
1266   @Override
1267   @QosPriority(priority=HConstants.ADMIN_QOS)
1268   public FlushRegionResponse flushRegion(final RpcController controller,
1269       final FlushRegionRequest request) throws ServiceException {
1270     try {
1271       checkOpen();
1272       requestCount.increment();
1273       Region region = getRegion(request.getRegion());
1274       LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1275       boolean shouldFlush = true;
1276       if (request.hasIfOlderThanTs()) {
1277         shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1278       }
1279       FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1280       if (shouldFlush) {
1281         boolean writeFlushWalMarker =  request.hasWriteFlushWalMarker() ?
1282             request.getWriteFlushWalMarker() : false;
1283         long startTime = EnvironmentEdgeManager.currentTime();
1284         // Go behind the curtain so we can manage writing of the flush WAL marker
1285         HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl)
1286             ((HRegion)region).flushcache(true, writeFlushWalMarker);
1287         if (flushResult.isFlushSucceeded()) {
1288           long endTime = EnvironmentEdgeManager.currentTime();
1289           regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1290         }
1291         boolean compactionNeeded = flushResult.isCompactionNeeded();
1292         if (compactionNeeded) {
1293           regionServer.compactSplitThread.requestSystemCompaction(region,
1294             "Compaction through user triggered flush");
1295         }
1296         builder.setFlushed(flushResult.isFlushSucceeded());
1297         builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1298       }
1299       builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1300       return builder.build();
1301     } catch (DroppedSnapshotException ex) {
1302       // Cache flush can fail in a few places. If it fails in a critical
1303       // section, we get a DroppedSnapshotException and a replay of wal
1304       // is required. Currently the only way to do this is a restart of
1305       // the server.
1306       regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1307       throw new ServiceException(ex);
1308     } catch (IOException ie) {
1309       throw new ServiceException(ie);
1310     }
1311   }
1312 
1313   @Override
1314   @QosPriority(priority=HConstants.ADMIN_QOS)
1315   public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1316       final GetOnlineRegionRequest request) throws ServiceException {
1317     try {
1318       checkOpen();
1319       requestCount.increment();
1320       Map<String, Region> onlineRegions = regionServer.onlineRegions;
1321       List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
1322       for (Region region: onlineRegions.values()) {
1323         list.add(region.getRegionInfo());
1324       }
1325       Collections.sort(list);
1326       return ResponseConverter.buildGetOnlineRegionResponse(list);
1327     } catch (IOException ie) {
1328       throw new ServiceException(ie);
1329     }
1330   }
1331 
1332   @Override
1333   @QosPriority(priority=HConstants.ADMIN_QOS)
1334   public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1335       final GetRegionInfoRequest request) throws ServiceException {
1336     try {
1337       checkOpen();
1338       requestCount.increment();
1339       Region region = getRegion(request.getRegion());
1340       HRegionInfo info = region.getRegionInfo();
1341       GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1342       builder.setRegionInfo(HRegionInfo.convert(info));
1343       if (request.hasCompactionState() && request.getCompactionState()) {
1344         builder.setCompactionState(region.getCompactionState());
1345       }
1346       builder.setIsRecovering(region.isRecovering());
1347       return builder.build();
1348     } catch (IOException ie) {
1349       throw new ServiceException(ie);
1350     }
1351   }
1352 
1353   /**
1354    * Get some information of the region server.
1355    *
1356    * @param controller the RPC controller
1357    * @param request the request
1358    * @throws ServiceException
1359    */
1360   @Override
1361   @QosPriority(priority=HConstants.ADMIN_QOS)
1362   public GetServerInfoResponse getServerInfo(final RpcController controller,
1363       final GetServerInfoRequest request) throws ServiceException {
1364     try {
1365       checkOpen();
1366     } catch (IOException ie) {
1367       throw new ServiceException(ie);
1368     }
1369     requestCount.increment();
1370     int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1;
1371     return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort);
1372   }
1373 
1374   @Override
1375   @QosPriority(priority=HConstants.ADMIN_QOS)
1376   public GetStoreFileResponse getStoreFile(final RpcController controller,
1377       final GetStoreFileRequest request) throws ServiceException {
1378     try {
1379       checkOpen();
1380       Region region = getRegion(request.getRegion());
1381       requestCount.increment();
1382       Set<byte[]> columnFamilies;
1383       if (request.getFamilyCount() == 0) {
1384         columnFamilies = region.getTableDesc().getFamiliesKeys();
1385       } else {
1386         columnFamilies = new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
1387         for (ByteString cf: request.getFamilyList()) {
1388           columnFamilies.add(cf.toByteArray());
1389         }
1390       }
1391       int nCF = columnFamilies.size();
1392       List<String>  fileList = region.getStoreFileList(
1393         columnFamilies.toArray(new byte[nCF][]));
1394       GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1395       builder.addAllStoreFile(fileList);
1396       return builder.build();
1397     } catch (IOException ie) {
1398       throw new ServiceException(ie);
1399     }
1400   }
1401 
1402   /**
1403    * Merge regions on the region server.
1404    *
1405    * @param controller the RPC controller
1406    * @param request the request
1407    * @return merge regions response
1408    * @throws ServiceException
1409    */
1410   @Override
1411   @QosPriority(priority = HConstants.ADMIN_QOS)
1412   public MergeRegionsResponse mergeRegions(final RpcController controller,
1413       final MergeRegionsRequest request) throws ServiceException {
1414     try {
1415       checkOpen();
1416       requestCount.increment();
1417       Region regionA = getRegion(request.getRegionA());
1418       Region regionB = getRegion(request.getRegionB());
1419       boolean forcible = request.getForcible();
1420       long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1421       regionA.startRegionOperation(Operation.MERGE_REGION);
1422       regionB.startRegionOperation(Operation.MERGE_REGION);
1423       if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
1424           regionB.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1425         throw new ServiceException(new MergeRegionException("Can't merge non-default replicas"));
1426       }
1427       LOG.info("Receiving merging request for  " + regionA + ", " + regionB
1428           + ",forcible=" + forcible);
1429       long startTime = EnvironmentEdgeManager.currentTime();
1430       FlushResult flushResult = regionA.flush(true);
1431       if (flushResult.isFlushSucceeded()) {
1432         long endTime = EnvironmentEdgeManager.currentTime();
1433         regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1434       }
1435       startTime = EnvironmentEdgeManager.currentTime();
1436       flushResult = regionB.flush(true);
1437       if (flushResult.isFlushSucceeded()) {
1438         long endTime = EnvironmentEdgeManager.currentTime();
1439         regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1440       }
1441       regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
1442           masterSystemTime, RpcServer.getRequestUser());
1443       return MergeRegionsResponse.newBuilder().build();
1444     } catch (DroppedSnapshotException ex) {
1445       regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1446       throw new ServiceException(ex);
1447     } catch (IOException ie) {
1448       throw new ServiceException(ie);
1449     }
1450   }
1451 
1452   /**
1453    * Open asynchronously a region or a set of regions on the region server.
1454    *
1455    * The opening is coordinated by ZooKeeper, and this method requires the znode to be created
1456    *  before being called. As a consequence, this method should be called only from the master.
1457    * <p>
1458    * Different manages states for the region are:
1459    * </p><ul>
1460    *  <li>region not opened: the region opening will start asynchronously.</li>
1461    *  <li>a close is already in progress: this is considered as an error.</li>
1462    *  <li>an open is already in progress: this new open request will be ignored. This is important
1463    *  because the Master can do multiple requests if it crashes.</li>
1464    *  <li>the region is already opened:  this new open request will be ignored.</li>
1465    *  </ul>
1466    * <p>
1467    * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
1468    * For a single region opening, errors are sent through a ServiceException. For bulk assign,
1469    * errors are put in the response as FAILED_OPENING.
1470    * </p>
1471    * @param controller the RPC controller
1472    * @param request the request
1473    * @throws ServiceException
1474    */
1475   @Override
1476   @QosPriority(priority=HConstants.ADMIN_QOS)
1477   public OpenRegionResponse openRegion(final RpcController controller,
1478       final OpenRegionRequest request) throws ServiceException {
1479     requestCount.increment();
1480     if (request.hasServerStartCode()) {
1481       // check that we are the same server that this RPC is intended for.
1482       long serverStartCode = request.getServerStartCode();
1483       if (regionServer.serverName.getStartcode() !=  serverStartCode) {
1484         throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1485             "different server with startCode: " + serverStartCode + ", this server is: "
1486             + regionServer.serverName));
1487       }
1488     }
1489 
1490     OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
1491     final int regionCount = request.getOpenInfoCount();
1492     final Map<TableName, HTableDescriptor> htds =
1493         new HashMap<TableName, HTableDescriptor>(regionCount);
1494     final boolean isBulkAssign = regionCount > 1;
1495     try {
1496       checkOpen();
1497     } catch (IOException ie) {
1498       TableName tableName = null;
1499       if (regionCount == 1) {
1500         RegionInfo ri = request.getOpenInfo(0).getRegion();
1501         if (ri != null) {
1502           tableName = ProtobufUtil.toTableName(ri.getTableName());
1503         }
1504       }
1505       if (!TableName.META_TABLE_NAME.equals(tableName)) {
1506         throw new ServiceException(ie);
1507       }
1508       // We are assigning meta, wait a little for regionserver to finish initialization.
1509       int timeout = regionServer.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1510         HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; // Quarter of RPC timeout
1511       long endTime = System.currentTimeMillis() + timeout;
1512       synchronized (regionServer.online) {
1513         try {
1514           while (System.currentTimeMillis() <= endTime
1515               && !regionServer.isStopped() && !regionServer.isOnline()) {
1516             regionServer.online.wait(regionServer.msgInterval);
1517           }
1518           checkOpen();
1519         } catch (InterruptedException t) {
1520           Thread.currentThread().interrupt();
1521           throw new ServiceException(t);
1522         } catch (IOException e) {
1523           throw new ServiceException(e);
1524         }
1525       }
1526     }
1527 
1528     long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1529 
1530     for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
1531       final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
1532       OpenRegionCoordination coordination = regionServer.getCoordinatedStateManager().
1533         getOpenRegionCoordination();
1534       OpenRegionCoordination.OpenRegionDetails ord =
1535         coordination.parseFromProtoRequest(regionOpenInfo);
1536 
1537       HTableDescriptor htd;
1538       try {
1539         final Region onlineRegion = regionServer.getFromOnlineRegions(region.getEncodedName());
1540         if (onlineRegion != null) {
1541           //Check if the region can actually be opened.
1542           if (onlineRegion.getCoprocessorHost() != null) {
1543             onlineRegion.getCoprocessorHost().preOpen();
1544           }
1545           // See HBASE-5094. Cross check with hbase:meta if still this RS is owning
1546           // the region.
1547           Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
1548             regionServer.getConnection(), region.getRegionName());
1549           if (regionServer.serverName.equals(p.getSecond())) {
1550             Boolean closing = regionServer.regionsInTransitionInRS.get(region.getEncodedNameAsBytes());
1551             // Map regionsInTransitionInRSOnly has an entry for a region only if the region
1552             // is in transition on this RS, so here closing can be null. If not null, it can
1553             // be true or false. True means the region is opening on this RS; while false
1554             // means the region is closing. Only return ALREADY_OPENED if not closing (i.e.
1555             // not in transition any more, or still transition to open.
1556             if (!Boolean.FALSE.equals(closing)
1557                 && regionServer.getFromOnlineRegions(region.getEncodedName()) != null) {
1558               LOG.warn("Attempted open of " + region.getEncodedName()
1559                 + " but already online on this server");
1560               builder.addOpeningState(RegionOpeningState.ALREADY_OPENED);
1561               continue;
1562             }
1563           } else {
1564             LOG.warn("The region " + region.getEncodedName() + " is online on this server"
1565               + " but hbase:meta does not have this server - continue opening.");
1566             regionServer.removeFromOnlineRegions(onlineRegion, null);
1567           }
1568         }
1569         LOG.info("Open " + region.getRegionNameAsString());
1570         htd = htds.get(region.getTable());
1571         if (htd == null) {
1572           htd = regionServer.tableDescriptors.get(region.getTable());
1573           if (htd == null) {
1574             throw new IOException("missing table descriptor for " + region.getEncodedName());
1575           }
1576           htds.put(region.getTable(), htd);
1577         }
1578 
1579         final Boolean previous = regionServer.regionsInTransitionInRS.putIfAbsent(
1580           region.getEncodedNameAsBytes(), Boolean.TRUE);
1581 
1582         if (Boolean.FALSE.equals(previous)) {
1583           // There is a close in progress. We need to mark this open as failed in ZK.
1584 
1585           coordination.tryTransitionFromOfflineToFailedOpen(regionServer, region, ord);
1586 
1587           throw new RegionAlreadyInTransitionException("Received OPEN for the region:"
1588             + region.getRegionNameAsString() + " , which we are already trying to CLOSE ");
1589         }
1590 
1591         if (Boolean.TRUE.equals(previous)) {
1592           // An open is in progress. This is supported, but let's log this.
1593           LOG.info("Receiving OPEN for the region:" +
1594             region.getRegionNameAsString() + " , which we are already trying to OPEN"
1595               + " - ignoring this new request for this region.");
1596         }
1597 
1598         // We are opening this region. If it moves back and forth for whatever reason, we don't
1599         // want to keep returning the stale moved record while we are opening/if we close again.
1600         regionServer.removeFromMovedRegions(region.getEncodedName());
1601 
1602         if (previous == null) {
1603           // check if the region to be opened is marked in recovering state in ZK
1604           if (ZKSplitLog.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(),
1605               region.getEncodedName())) {
1606             // Check if current region open is for distributedLogReplay. This check is to support
1607             // rolling restart/upgrade where we want to Master/RS see same configuration
1608             if (!regionOpenInfo.hasOpenForDistributedLogReplay()
1609                   || regionOpenInfo.getOpenForDistributedLogReplay()) {
1610               regionServer.recoveringRegions.put(region.getEncodedName(), null);
1611             } else {
1612               // Remove stale recovery region from ZK when we open region not for recovering which
1613               // could happen when turn distributedLogReplay off from on.
1614               List<String> tmpRegions = new ArrayList<String>();
1615               tmpRegions.add(region.getEncodedName());
1616               ZKSplitLog.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(),
1617                 tmpRegions);
1618             }
1619           }
1620           // If there is no action in progress, we can submit a specific handler.
1621           // Need to pass the expected version in the constructor.
1622           if (region.isMetaRegion()) {
1623             regionServer.service.submit(new OpenMetaHandler(
1624               regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1625           } else {
1626             regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
1627               regionOpenInfo.getFavoredNodesList());
1628             regionServer.service.submit(new OpenRegionHandler(
1629               regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1630           }
1631         }
1632 
1633         builder.addOpeningState(RegionOpeningState.OPENED);
1634 
1635       } catch (KeeperException zooKeeperEx) {
1636         LOG.error("Can't retrieve recovering state from zookeeper", zooKeeperEx);
1637         throw new ServiceException(zooKeeperEx);
1638       } catch (IOException ie) {
1639         LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
1640         if (isBulkAssign) {
1641           builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
1642         } else {
1643           throw new ServiceException(ie);
1644         }
1645       }
1646     }
1647     return builder.build();
1648   }
1649 
1650   /**
1651    *  Wamrmup a region on this server.
1652    *
1653    * This method should only be called by Master. It synchrnously opens the region and
1654    * closes the region bringing the most important pages in cache.
1655    * <p>
1656    *
1657    * @param controller the RPC controller
1658    * @param request the request
1659    * @throws ServiceException
1660    */
1661   @Override
1662   public WarmupRegionResponse warmupRegion(final RpcController controller,
1663       final WarmupRegionRequest request) throws ServiceException {
1664 
1665     RegionInfo regionInfo = request.getRegionInfo();
1666     final HRegionInfo region = HRegionInfo.convert(regionInfo);
1667     HTableDescriptor htd;
1668     WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
1669 
1670     try {
1671       checkOpen();
1672       String encodedName = region.getEncodedName();
1673       byte[] encodedNameBytes = region.getEncodedNameAsBytes();
1674       final Region onlineRegion = regionServer.getFromOnlineRegions(encodedName);
1675 
1676       if (onlineRegion != null) {
1677         LOG.info("Region already online. Skipping warming up " + region);
1678         return response;
1679       }
1680 
1681       if (LOG.isDebugEnabled()) {
1682         LOG.debug("Warming up Region " + region.getRegionNameAsString());
1683       }
1684 
1685       htd = regionServer.tableDescriptors.get(region.getTable());
1686 
1687       if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
1688         LOG.info("Region is in transition. Skipping warmup " + region);
1689         return response;
1690       }
1691 
1692       HRegion.warmupHRegion(region, htd, regionServer.getWAL(region),
1693           regionServer.getConfiguration(), regionServer, null);
1694 
1695     } catch (IOException ie) {
1696       LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie);
1697       throw new ServiceException(ie);
1698     }
1699 
1700     return response;
1701   }
1702 
1703   /**
1704    * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
1705    * that the given mutations will be durable on the receiving RS if this method returns without any
1706    * exception.
1707    * @param controller the RPC controller
1708    * @param request the request
1709    * @throws ServiceException
1710    */
1711   @Override
1712   @QosPriority(priority = HConstants.REPLAY_QOS)
1713   public ReplicateWALEntryResponse replay(final RpcController controller,
1714       final ReplicateWALEntryRequest request) throws ServiceException {
1715     long before = EnvironmentEdgeManager.currentTime();
1716     CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
1717     try {
1718       checkOpen();
1719       List<WALEntry> entries = request.getEntryList();
1720       if (entries == null || entries.isEmpty()) {
1721         // empty input
1722         return ReplicateWALEntryResponse.newBuilder().build();
1723       }
1724       ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
1725       Region region = regionServer.getRegionByEncodedName(regionName.toStringUtf8());
1726       RegionCoprocessorHost coprocessorHost =
1727           ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
1728             ? region.getCoprocessorHost()
1729             : null; // do not invoke coprocessors if this is a secondary region replica
1730       List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>();
1731 
1732       // Skip adding the edits to WAL if this is a secondary region replica
1733       boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1734       Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
1735 
1736       for (WALEntry entry : entries) {
1737         if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
1738           throw new NotServingRegionException("Replay request contains entries from multiple " +
1739               "regions. First region:" + regionName.toStringUtf8() + " , other region:"
1740               + entry.getKey().getEncodedRegionName());
1741         }
1742         if (regionServer.nonceManager != null && isPrimary) {
1743           long nonceGroup = entry.getKey().hasNonceGroup()
1744             ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
1745           long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
1746           regionServer.nonceManager.reportOperationFromWal(
1747               nonceGroup,
1748               nonce,
1749               entry.getKey().getWriteTime());
1750         }
1751         Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
1752           new Pair<WALKey, WALEdit>();
1753         List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
1754           cells, walEntry, durability);
1755         if (coprocessorHost != null) {
1756           // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
1757           // KeyValue.
1758           if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
1759             walEntry.getSecond())) {
1760             // if bypass this log entry, ignore it ...
1761             continue;
1762           }
1763           walEntries.add(walEntry);
1764         }
1765         if(edits!=null && !edits.isEmpty()) {
1766           long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
1767             entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
1768           OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
1769           // check if it's a partial success
1770           for (int i = 0; result != null && i < result.length; i++) {
1771             if (result[i] != OperationStatus.SUCCESS) {
1772               throw new IOException(result[i].getExceptionMsg());
1773             }
1774           }
1775         }
1776       }
1777 
1778       //sync wal at the end because ASYNC_WAL is used above
1779       WAL wal = getWAL(region);
1780       if (wal != null) {
1781         wal.sync();
1782       }
1783 
1784       if (coprocessorHost != null) {
1785         for (Pair<WALKey, WALEdit> entry : walEntries) {
1786           coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(),
1787             entry.getSecond());
1788         }
1789       }
1790       return ReplicateWALEntryResponse.newBuilder().build();
1791     } catch (IOException ie) {
1792       throw new ServiceException(ie);
1793     } finally {
1794       if (regionServer.metricsRegionServer != null) {
1795         regionServer.metricsRegionServer.updateReplay(
1796           EnvironmentEdgeManager.currentTime() - before);
1797       }
1798     }
1799   }
1800 
1801   WAL getWAL(Region region) {
1802     return ((HRegion)region).getWAL();
1803   }
1804 
1805   /**
1806    * Replicate WAL entries on the region server.
1807    *
1808    * @param controller the RPC controller
1809    * @param request the request
1810    * @throws ServiceException
1811    */
1812   @Override
1813   @QosPriority(priority=HConstants.REPLICATION_QOS)
1814   public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
1815       final ReplicateWALEntryRequest request) throws ServiceException {
1816     try {
1817       checkOpen();
1818       if (regionServer.replicationSinkHandler != null) {
1819         requestCount.increment();
1820         List<WALEntry> entries = request.getEntryList();
1821         CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner();
1822         regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
1823         regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner);
1824         regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner);
1825         return ReplicateWALEntryResponse.newBuilder().build();
1826       } else {
1827         throw new ServiceException("Replication services are not initialized yet");
1828       }
1829     } catch (IOException ie) {
1830       throw new ServiceException(ie);
1831     }
1832   }
1833 
1834   /**
1835    * Roll the WAL writer of the region server.
1836    * @param controller the RPC controller
1837    * @param request the request
1838    * @throws ServiceException
1839    */
1840   @Override
1841   public RollWALWriterResponse rollWALWriter(final RpcController controller,
1842       final RollWALWriterRequest request) throws ServiceException {
1843     try {
1844       checkOpen();
1845       requestCount.increment();
1846       regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest();
1847       regionServer.walRoller.requestRollAll();
1848       regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest();
1849       RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
1850       return builder.build();
1851     } catch (IOException ie) {
1852       throw new ServiceException(ie);
1853     }
1854   }
1855 
1856   /**
1857    * Split a region on the region server.
1858    *
1859    * @param controller the RPC controller
1860    * @param request the request
1861    * @throws ServiceException
1862    */
1863   @Override
1864   @QosPriority(priority=HConstants.ADMIN_QOS)
1865   public SplitRegionResponse splitRegion(final RpcController controller,
1866       final SplitRegionRequest request) throws ServiceException {
1867     try {
1868       checkOpen();
1869       requestCount.increment();
1870       Region region = getRegion(request.getRegion());
1871       region.startRegionOperation(Operation.SPLIT_REGION);
1872       if (region.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1873         throw new IOException("Can't split replicas directly. "
1874             + "Replicas are auto-split when their primary is split.");
1875       }
1876       LOG.info("Splitting " + region.getRegionInfo().getRegionNameAsString());
1877       long startTime = EnvironmentEdgeManager.currentTime();
1878       FlushResult flushResult = region.flush(true);
1879       if (flushResult.isFlushSucceeded()) {
1880         long endTime = EnvironmentEdgeManager.currentTime();
1881         regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1882       }
1883       byte[] splitPoint = null;
1884       if (request.hasSplitPoint()) {
1885         splitPoint = request.getSplitPoint().toByteArray();
1886       }
1887       ((HRegion)region).forceSplit(splitPoint);
1888       regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit(),
1889         RpcServer.getRequestUser());
1890       return SplitRegionResponse.newBuilder().build();
1891     } catch (DroppedSnapshotException ex) {
1892       regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1893       throw new ServiceException(ex);
1894     } catch (IOException ie) {
1895       throw new ServiceException(ie);
1896     }
1897   }
1898 
1899   /**
1900    * Stop the region server.
1901    *
1902    * @param controller the RPC controller
1903    * @param request the request
1904    * @throws ServiceException
1905    */
1906   @Override
1907   @QosPriority(priority=HConstants.ADMIN_QOS)
1908   public StopServerResponse stopServer(final RpcController controller,
1909       final StopServerRequest request) throws ServiceException {
1910     requestCount.increment();
1911     String reason = request.getReason();
1912     regionServer.stop(reason);
1913     return StopServerResponse.newBuilder().build();
1914   }
1915 
1916   @Override
1917   public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
1918       UpdateFavoredNodesRequest request) throws ServiceException {
1919     List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
1920     UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
1921     for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
1922       HRegionInfo hri = HRegionInfo.convert(regionUpdateInfo.getRegion());
1923       regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(),
1924         regionUpdateInfo.getFavoredNodesList());
1925     }
1926     respBuilder.setResponse(openInfoList.size());
1927     return respBuilder.build();
1928   }
1929 
1930   /**
1931    * Atomically bulk load several HFiles into an open region
1932    * @return true if successful, false is failed but recoverably (no action)
1933    * @throws ServiceException if failed unrecoverably
1934    */
1935   @Override
1936   public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
1937       final BulkLoadHFileRequest request) throws ServiceException {
1938     try {
1939       checkOpen();
1940       requestCount.increment();
1941       Region region = getRegion(request.getRegion());
1942       List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
1943       for (FamilyPath familyPath: request.getFamilyPathList()) {
1944         familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
1945           familyPath.getPath()));
1946       }
1947       boolean bypass = false;
1948       if (region.getCoprocessorHost() != null) {
1949         bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
1950       }
1951       boolean loaded = false;
1952       if (!bypass) {
1953         loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
1954       }
1955       if (region.getCoprocessorHost() != null) {
1956         loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
1957       }
1958       BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
1959       builder.setLoaded(loaded);
1960       return builder.build();
1961     } catch (IOException ie) {
1962       throw new ServiceException(ie);
1963     }
1964   }
1965 
1966   @Override
1967   public CoprocessorServiceResponse execService(final RpcController controller,
1968       final CoprocessorServiceRequest request) throws ServiceException {
1969     try {
1970       checkOpen();
1971       requestCount.increment();
1972       Region region = getRegion(request.getRegion());
1973       Message result = execServiceOnRegion(region, request.getCall());
1974       CoprocessorServiceResponse.Builder builder =
1975         CoprocessorServiceResponse.newBuilder();
1976       builder.setRegion(RequestConverter.buildRegionSpecifier(
1977         RegionSpecifierType.REGION_NAME, region.getRegionInfo().getRegionName()));
1978       builder.setValue(
1979         builder.getValueBuilder().setName(result.getClass().getName())
1980           .setValue(result.toByteString()));
1981       return builder.build();
1982     } catch (IOException ie) {
1983       throw new ServiceException(ie);
1984     }
1985   }
1986 
1987   private Message execServiceOnRegion(Region region,
1988       final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
1989     // ignore the passed in controller (from the serialized call)
1990     ServerRpcController execController = new ServerRpcController();
1991     return region.execService(execController, serviceCall);
1992   }
1993 
1994   /**
1995    * Get data from a table.
1996    *
1997    * @param controller the RPC controller
1998    * @param request the get request
1999    * @throws ServiceException
2000    */
2001   @Override
2002   public GetResponse get(final RpcController controller,
2003       final GetRequest request) throws ServiceException {
2004     long before = EnvironmentEdgeManager.currentTime();
2005     OperationQuota quota = null;
2006     try {
2007       checkOpen();
2008       requestCount.increment();
2009       Region region = getRegion(request.getRegion());
2010 
2011       GetResponse.Builder builder = GetResponse.newBuilder();
2012       ClientProtos.Get get = request.getGet();
2013       Boolean existence = null;
2014       Result r = null;
2015       quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
2016 
2017       if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2018         if (get.getColumnCount() != 1) {
2019           throw new DoNotRetryIOException(
2020             "get ClosestRowBefore supports one and only one family now, not "
2021               + get.getColumnCount() + " families");
2022         }
2023         byte[] row = get.getRow().toByteArray();
2024         byte[] family = get.getColumn(0).getFamily().toByteArray();
2025         r = region.getClosestRowBefore(row, family);
2026       } else {
2027         Get clientGet = ProtobufUtil.toGet(get);
2028         if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2029           existence = region.getCoprocessorHost().preExists(clientGet);
2030         }
2031         if (existence == null) {
2032           r = region.get(clientGet);
2033           if (get.getExistenceOnly()) {
2034             boolean exists = r.getExists();
2035             if (region.getCoprocessorHost() != null) {
2036               exists = region.getCoprocessorHost().postExists(clientGet, exists);
2037             }
2038             existence = exists;
2039           }
2040         }
2041       }
2042       if (existence != null){
2043         ClientProtos.Result pbr =
2044             ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2045         builder.setResult(pbr);
2046       } else  if (r != null) {
2047         ClientProtos.Result pbr = ProtobufUtil.toResult(r);
2048         builder.setResult(pbr);
2049       }
2050       if (r != null) {
2051         quota.addGetResult(r);
2052       }
2053       return builder.build();
2054     } catch (IOException ie) {
2055       throw new ServiceException(ie);
2056     } finally {
2057       if (regionServer.metricsRegionServer != null) {
2058         regionServer.metricsRegionServer.updateGet(
2059           EnvironmentEdgeManager.currentTime() - before);
2060       }
2061       if (quota != null) {
2062         quota.close();
2063       }
2064     }
2065   }
2066 
2067   /**
2068    * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
2069    *
2070    * @param rpcc the RPC controller
2071    * @param request the multi request
2072    * @throws ServiceException
2073    */
2074   @Override
2075   public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2076   throws ServiceException {
2077     try {
2078       checkOpen();
2079     } catch (IOException ie) {
2080       throw new ServiceException(ie);
2081     }
2082 
2083     // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
2084     // It is also the conduit via which we pass back data.
2085     PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2086     CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
2087     if (controller != null) {
2088       controller.setCellScanner(null);
2089     }
2090 
2091     long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2092 
2093     // this will contain all the cells that we need to return. It's created later, if needed.
2094     List<CellScannable> cellsToReturn = null;
2095     MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2096     RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2097     Boolean processed = null;
2098 
2099     for (RegionAction regionAction : request.getRegionActionList()) {
2100       this.requestCount.add(regionAction.getActionCount());
2101       OperationQuota quota;
2102       Region region;
2103       regionActionResultBuilder.clear();
2104       try {
2105         region = getRegion(regionAction.getRegion());
2106         quota = getQuotaManager().checkQuota(region, regionAction.getActionList());
2107       } catch (IOException e) {
2108         rpcServer.getMetrics().exception(e);
2109         regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2110         responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2111         // All Mutations in this RegionAction not executed as we can not see the Region online here
2112         // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
2113         // corresponding to these Mutations.
2114         if (cellScanner != null) {
2115           skipCellsForMutations(regionAction.getActionList(), cellScanner);
2116         }
2117         continue;  // For this region it's a failure.
2118       }
2119 
2120       if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2121         // How does this call happen?  It may need some work to play well w/ the surroundings.
2122         // Need to return an item per Action along w/ Action index.  TODO.
2123         try {
2124           if (request.hasCondition()) {
2125             Condition condition = request.getCondition();
2126             byte[] row = condition.getRow().toByteArray();
2127             byte[] family = condition.getFamily().toByteArray();
2128             byte[] qualifier = condition.getQualifier().toByteArray();
2129             CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2130             ByteArrayComparable comparator =
2131                 ProtobufUtil.toComparator(condition.getComparator());
2132             processed = checkAndRowMutate(region, regionAction.getActionList(),
2133                   cellScanner, row, family, qualifier, compareOp, comparator);
2134           } else {
2135             ClientProtos.RegionLoadStats stats = mutateRows(region, regionAction.getActionList(),
2136                 cellScanner);
2137             // add the stats to the request
2138             if(stats != null) {
2139               responseBuilder.addRegionActionResult(RegionActionResult.newBuilder()
2140                   .addResultOrException(ResultOrException.newBuilder().setLoadStats(stats)));
2141             }
2142             processed = Boolean.TRUE;
2143           }
2144         } catch (IOException e) {
2145           rpcServer.getMetrics().exception(e);
2146           // As it's atomic, we may expect it's a global failure.
2147           regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2148         }
2149       } else {
2150         // doNonAtomicRegionMutation manages the exception internally
2151         cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2152             regionActionResultBuilder, cellsToReturn, nonceGroup);
2153       }
2154       responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2155       quota.close();
2156     }
2157     // Load the controller with the Cells to return.
2158     if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2159       controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
2160     }
2161     if (processed != null) {
2162       responseBuilder.setProcessed(processed);
2163     }
2164     return responseBuilder.build();
2165   }
2166 
2167   private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2168     for (Action action : actions) {
2169       skipCellsForMutation(action, cellScanner);
2170     }
2171   }
2172 
2173   private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2174     try {
2175       if (action.hasMutation()) {
2176         MutationProto m = action.getMutation();
2177         if (m.hasAssociatedCellCount()) {
2178           for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2179             cellScanner.advance();
2180           }
2181         }
2182       }
2183     } catch (IOException e) {
2184       // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
2185       // marked as failed as we could not see the Region here. At client side the top level
2186       // RegionAction exception will be considered first.
2187       LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2188     }
2189   }
2190 
2191   /**
2192    * Mutate data in a table.
2193    *
2194    * @param rpcc the RPC controller
2195    * @param request the mutate request
2196    * @throws ServiceException
2197    */
2198   @Override
2199   public MutateResponse mutate(final RpcController rpcc,
2200       final MutateRequest request) throws ServiceException {
2201     // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
2202     // It is also the conduit via which we pass back data.
2203     PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2204     CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2205     OperationQuota quota = null;
2206     // Clear scanner so we are not holding on to reference across call.
2207     if (controller != null) {
2208       controller.setCellScanner(null);
2209     }
2210     try {
2211       checkOpen();
2212       requestCount.increment();
2213       Region region = getRegion(request.getRegion());
2214       MutateResponse.Builder builder = MutateResponse.newBuilder();
2215       MutationProto mutation = request.getMutation();
2216       if (!region.getRegionInfo().isMetaTable()) {
2217         regionServer.cacheFlusher.reclaimMemStoreMemory();
2218       }
2219       long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2220       Result r = null;
2221       Boolean processed = null;
2222       MutationType type = mutation.getMutateType();
2223       long mutationSize = 0;
2224       quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
2225       switch (type) {
2226       case APPEND:
2227         // TODO: this doesn't actually check anything.
2228         r = append(region, quota, mutation, cellScanner, nonceGroup);
2229         break;
2230       case INCREMENT:
2231         // TODO: this doesn't actually check anything.
2232         r = increment(region, quota, mutation, cellScanner, nonceGroup);
2233         break;
2234       case PUT:
2235         Put put = ProtobufUtil.toPut(mutation, cellScanner);
2236         quota.addMutation(put);
2237         if (request.hasCondition()) {
2238           Condition condition = request.getCondition();
2239           byte[] row = condition.getRow().toByteArray();
2240           byte[] family = condition.getFamily().toByteArray();
2241           byte[] qualifier = condition.getQualifier().toByteArray();
2242           CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2243           ByteArrayComparable comparator =
2244             ProtobufUtil.toComparator(condition.getComparator());
2245           if (region.getCoprocessorHost() != null) {
2246             processed = region.getCoprocessorHost().preCheckAndPut(
2247               row, family, qualifier, compareOp, comparator, put);
2248           }
2249           if (processed == null) {
2250             boolean result = region.checkAndMutate(row, family,
2251               qualifier, compareOp, comparator, put, true);
2252             if (region.getCoprocessorHost() != null) {
2253               result = region.getCoprocessorHost().postCheckAndPut(row, family,
2254                 qualifier, compareOp, comparator, put, result);
2255             }
2256             processed = result;
2257           }
2258         } else {
2259           region.put(put);
2260           processed = Boolean.TRUE;
2261         }
2262         break;
2263       case DELETE:
2264         Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
2265         quota.addMutation(delete);
2266         if (request.hasCondition()) {
2267           Condition condition = request.getCondition();
2268           byte[] row = condition.getRow().toByteArray();
2269           byte[] family = condition.getFamily().toByteArray();
2270           byte[] qualifier = condition.getQualifier().toByteArray();
2271           CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2272           ByteArrayComparable comparator =
2273             ProtobufUtil.toComparator(condition.getComparator());
2274           if (region.getCoprocessorHost() != null) {
2275             processed = region.getCoprocessorHost().preCheckAndDelete(
2276               row, family, qualifier, compareOp, comparator, delete);
2277           }
2278           if (processed == null) {
2279             boolean result = region.checkAndMutate(row, family,
2280               qualifier, compareOp, comparator, delete, true);
2281             if (region.getCoprocessorHost() != null) {
2282               result = region.getCoprocessorHost().postCheckAndDelete(row, family,
2283                 qualifier, compareOp, comparator, delete, result);
2284             }
2285             processed = result;
2286           }
2287         } else {
2288           region.delete(delete);
2289           processed = Boolean.TRUE;
2290         }
2291         break;
2292       default:
2293           throw new DoNotRetryIOException(
2294             "Unsupported mutate type: " + type.name());
2295       }
2296       if (processed != null) builder.setProcessed(processed.booleanValue());
2297       addResult(builder, r, controller);
2298       return builder.build();
2299     } catch (IOException ie) {
2300       regionServer.checkFileSystem();
2301       throw new ServiceException(ie);
2302     } finally {
2303       if (quota != null) {
2304         quota.close();
2305       }
2306     }
2307   }
2308 
2309   /**
2310    * Scan data in a table.
2311    *
2312    * @param controller the RPC controller
2313    * @param request the scan request
2314    * @throws ServiceException
2315    */
2316   @Override
2317   public ScanResponse scan(final RpcController controller, final ScanRequest request)
2318   throws ServiceException {
2319     OperationQuota quota = null;
2320     Leases.Lease lease = null;
2321     String scannerName = null;
2322     try {
2323       if (!request.hasScannerId() && !request.hasScan()) {
2324         throw new DoNotRetryIOException(
2325           "Missing required input: scannerId or scan");
2326       }
2327       long scannerId = -1;
2328       if (request.hasScannerId()) {
2329         scannerId = request.getScannerId();
2330         scannerName = String.valueOf(scannerId);
2331       }
2332       try {
2333         checkOpen();
2334       } catch (IOException e) {
2335         // If checkOpen failed, server not running or filesystem gone,
2336         // cancel this lease; filesystem is gone or we're closing or something.
2337         if (scannerName != null) {
2338           LOG.debug("Server shutting down and client tried to access missing scanner "
2339             + scannerName);
2340           if (regionServer.leases != null) {
2341             try {
2342               regionServer.leases.cancelLease(scannerName);
2343             } catch (LeaseException le) {
2344               // No problem, ignore
2345               if (LOG.isTraceEnabled()) {
2346                 LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
2347               }
2348              }
2349           }
2350         }
2351         throw e;
2352       }
2353       requestCount.increment();
2354 
2355       int ttl = 0;
2356       Region region = null;
2357       RegionScanner scanner = null;
2358       RegionScannerHolder rsh = null;
2359       boolean moreResults = true;
2360       boolean closeScanner = false;
2361       boolean isSmallScan = false;
2362       RpcCallContext context = RpcServer.getCurrentCall();
2363       Object lastBlock = null;
2364 
2365       ScanResponse.Builder builder = ScanResponse.newBuilder();
2366       if (request.hasCloseScanner()) {
2367         closeScanner = request.getCloseScanner();
2368       }
2369       int rows = closeScanner ? 0 : 1;
2370       if (request.hasNumberOfRows()) {
2371         rows = request.getNumberOfRows();
2372       }
2373       if (request.hasScannerId()) {
2374         rsh = scanners.get(scannerName);
2375         if (rsh == null) {
2376           LOG.warn("Client tried to access missing scanner " + scannerName);
2377           throw new UnknownScannerException(
2378             "Unknown scanner '" + scannerName + "'. This can happen due to any of the following "
2379                 + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of "
2380                 + "long wait between consecutive client checkins, c) Server may be closing down, "
2381                 + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a "
2382                 + "possible fix would be increasing the value of"
2383                 + "'hbase.client.scanner.timeout.period' configuration.");
2384         }
2385         scanner = rsh.s;
2386         HRegionInfo hri = scanner.getRegionInfo();
2387         region = regionServer.getRegion(hri.getRegionName());
2388         if (region != rsh.r) { // Yes, should be the same instance
2389           throw new NotServingRegionException("Region was re-opened after the scanner"
2390             + scannerName + " was created: " + hri.getRegionNameAsString());
2391         }
2392       } else {
2393         region = getRegion(request.getRegion());
2394         ClientProtos.Scan protoScan = request.getScan();
2395         boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
2396         Scan scan = ProtobufUtil.toScan(protoScan);
2397         // if the request doesn't set this, get the default region setting.
2398         if (!isLoadingCfsOnDemandSet) {
2399           scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2400         }
2401 
2402         isSmallScan = scan.isSmall();
2403         if (!scan.hasFamilies()) {
2404           // Adding all families to scanner
2405           for (byte[] family: region.getTableDesc().getFamiliesKeys()) {
2406             scan.addFamily(family);
2407           }
2408         }
2409 
2410         if (region.getCoprocessorHost() != null) {
2411           scanner = region.getCoprocessorHost().preScannerOpen(scan);
2412         }
2413         if (scanner == null) {
2414           scanner = region.getScanner(scan);
2415         }
2416         if (region.getCoprocessorHost() != null) {
2417           scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
2418         }
2419         scannerId = addScanner(scanner, region);
2420         scannerName = String.valueOf(scannerId);
2421         ttl = this.scannerLeaseTimeoutPeriod;
2422       }
2423       if (request.hasRenew() && request.getRenew()) {
2424         rsh = scanners.get(scannerName);
2425         lease = regionServer.leases.removeLease(scannerName);
2426         if (lease != null && rsh != null) {
2427           regionServer.leases.addLease(lease);
2428           // Increment the nextCallSeq value which is the next expected from client.
2429           rsh.incNextCallSeq();
2430         }
2431         return builder.build();
2432       }
2433 
2434       quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
2435       long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
2436       if (rows > 0) {
2437         // if nextCallSeq does not match throw Exception straight away. This needs to be
2438         // performed even before checking of Lease.
2439         // See HBASE-5974
2440         if (request.hasNextCallSeq()) {
2441           if (rsh == null) {
2442             rsh = scanners.get(scannerName);
2443           }
2444           if (rsh != null) {
2445             if (request.getNextCallSeq() != rsh.getNextCallSeq()) {
2446               throw new OutOfOrderScannerNextException(
2447                 "Expected nextCallSeq: " + rsh.getNextCallSeq()
2448                 + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
2449                 "; request=" + TextFormat.shortDebugString(request));
2450             }
2451             // Increment the nextCallSeq value which is the next expected from client.
2452             rsh.incNextCallSeq();
2453           }
2454         }
2455         try {
2456           // Remove lease while its being processed in server; protects against case
2457           // where processing of request takes > lease expiration time.
2458           lease = regionServer.leases.removeLease(scannerName);
2459           List<Result> results = new ArrayList<Result>();
2460 
2461           boolean done = false;
2462           // Call coprocessor. Get region info from scanner.
2463           if (region != null && region.getCoprocessorHost() != null) {
2464             Boolean bypass = region.getCoprocessorHost().preScannerNext(
2465               scanner, results, rows);
2466             if (!results.isEmpty()) {
2467               for (Result r : results) {
2468                 lastBlock = addSize(context, r, lastBlock);
2469               }
2470             }
2471             if (bypass != null && bypass.booleanValue()) {
2472               done = true;
2473             }
2474           }
2475 
2476           if (!done) {
2477             long maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
2478             if (maxResultSize <= 0) {
2479               maxResultSize = maxQuotaResultSize;
2480             }
2481             // This is cells inside a row. Default size is 10 so if many versions or many cfs,
2482             // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
2483             // arbitrary 32. TODO: keep record of general size of results being returned.
2484             List<Cell> values = new ArrayList<Cell>(32);
2485             region.startRegionOperation(Operation.SCAN);
2486             try {
2487               int i = 0;
2488               synchronized(scanner) {
2489                 boolean stale = (region.getRegionInfo().getReplicaId() != 0);
2490                 boolean clientHandlesPartials =
2491                     request.hasClientHandlesPartials() && request.getClientHandlesPartials();
2492                 boolean clientHandlesHeartbeats =
2493                     request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
2494 
2495                 // On the server side we must ensure that the correct ordering of partial results is
2496                 // returned to the client to allow them to properly reconstruct the partial results.
2497                 // If the coprocessor host is adding to the result list, we cannot guarantee the
2498                 // correct ordering of partial results and so we prevent partial results from being
2499                 // formed.
2500                 boolean serverGuaranteesOrderOfPartials = results.isEmpty();
2501                 boolean allowPartialResults =
2502                     clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
2503                 boolean moreRows = false;
2504 
2505                 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
2506                 // certain time threshold on the server. When the time threshold is exceeded, the
2507                 // server stops the scan and sends back whatever Results it has accumulated within
2508                 // that time period (may be empty). Since heartbeat messages have the potential to
2509                 // create partial Results (in the event that the timeout occurs in the middle of a
2510                 // row), we must only generate heartbeat messages when the client can handle both
2511                 // heartbeats AND partials
2512                 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
2513 
2514                 // Default value of timeLimit is negative to indicate no timeLimit should be
2515                 // enforced.
2516                 long timeLimit = -1;
2517 
2518                 // Set the time limit to be half of the more restrictive timeout value (one of the
2519                 // timeout values must be positive). In the event that both values are positive, the
2520                 // more restrictive of the two is used to calculate the limit.
2521                 if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
2522                   long timeLimitDelta;
2523                   if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
2524                     timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
2525                   } else {
2526                     timeLimitDelta =
2527                         scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
2528                   }
2529                   // Use half of whichever timeout value was more restrictive... But don't allow
2530                   // the time limit to be less than the allowable minimum (could cause an
2531                   // immediatate timeout before scanning any data).
2532                   timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
2533                   timeLimit = System.currentTimeMillis() + timeLimitDelta;
2534                 }
2535 
2536                 final LimitScope sizeScope =
2537                     allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2538                 final LimitScope timeScope =
2539                     allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2540 
2541                 boolean trackMetrics =
2542                     request.hasTrackScanMetrics() && request.getTrackScanMetrics();
2543 
2544                 // Configure with limits for this RPC. Set keep progress true since size progress
2545                 // towards size limit should be kept between calls to nextRaw
2546                 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
2547                 contextBuilder.setSizeLimit(sizeScope, maxResultSize);
2548                 contextBuilder.setBatchLimit(scanner.getBatch());
2549                 contextBuilder.setTimeLimit(timeScope, timeLimit);
2550                 contextBuilder.setTrackMetrics(trackMetrics);
2551                 ScannerContext scannerContext = contextBuilder.build();
2552 
2553                 boolean limitReached = false;
2554                 while (i < rows) {
2555                   // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
2556                   // batch limit is a limit on the number of cells per Result. Thus, if progress is
2557                   // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
2558                   // reset the batch progress between nextRaw invocations since we don't want the
2559                   // batch progress from previous calls to affect future calls
2560                   scannerContext.setBatchProgress(0);
2561 
2562                   // Collect values to be returned here
2563                   moreRows = scanner.nextRaw(values, scannerContext);
2564 
2565                   if (!values.isEmpty()) {
2566                     final boolean partial = scannerContext.partialResultFormed();
2567                     Result r = Result.create(values, null, stale, partial);
2568                     lastBlock = addSize(context, r, lastBlock);
2569                     results.add(r);
2570                     i++;
2571                   }
2572 
2573                   boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
2574                   boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
2575                   boolean rowLimitReached = i >= rows;
2576                   limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
2577 
2578                   if (limitReached || !moreRows) {
2579                     if (LOG.isTraceEnabled()) {
2580                       LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: "
2581                           + moreRows + " scannerContext: " + scannerContext);
2582                     }
2583                     // We only want to mark a ScanResponse as a heartbeat message in the event that
2584                     // there are more values to be read server side. If there aren't more values,
2585                     // marking it as a heartbeat is wasteful because the client will need to issue
2586                     // another ScanRequest only to realize that they already have all the values
2587                     if (moreRows) {
2588                       // Heartbeat messages occur when the time limit has been reached.
2589                       builder.setHeartbeatMessage(timeLimitReached);
2590                     }
2591                     break;
2592                   }
2593                   values.clear();
2594                 }
2595 
2596                 if (limitReached || moreRows) {
2597                   // We stopped prematurely
2598                   builder.setMoreResultsInRegion(true);
2599                 } else {
2600                   // We didn't get a single batch
2601                   builder.setMoreResultsInRegion(false);
2602                 }
2603 
2604                 // Check to see if the client requested that we track metrics server side. If the
2605                 // client requested metrics, retrieve the metrics from the scanner context.
2606                 if (trackMetrics) {
2607                   Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
2608                   ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
2609                   NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
2610 
2611                   for (Entry<String, Long> entry : metrics.entrySet()) {
2612                     pairBuilder.setName(entry.getKey());
2613                     pairBuilder.setValue(entry.getValue());
2614                     metricBuilder.addMetrics(pairBuilder.build());
2615                   }
2616 
2617                   builder.setScanMetrics(metricBuilder.build());
2618                 }
2619               }
2620               region.updateReadRequestsCount(i);
2621               long responseCellSize = context != null ? context.getResponseCellSize() : 0;
2622               region.getMetrics().updateScanNext(responseCellSize);
2623               if (regionServer.metricsRegionServer != null) {
2624                 regionServer.metricsRegionServer.updateScannerNext(responseCellSize);
2625               }
2626             } finally {
2627               region.closeRegionOperation();
2628             }
2629 
2630             // coprocessor postNext hook
2631             if (region != null && region.getCoprocessorHost() != null) {
2632               region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
2633             }
2634           }
2635 
2636           quota.addScanResult(results);
2637 
2638           // If the scanner's filter - if any - is done with the scan
2639           // and wants to tell the client to stop the scan. This is done by passing
2640           // a null result, and setting moreResults to false.
2641           if (scanner.isFilterDone() && results.isEmpty()) {
2642             moreResults = false;
2643             results = null;
2644           } else {
2645             addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
2646           }
2647         } catch (IOException e) {
2648           // The scanner state might be left in a dirty state, so we will tell the Client to
2649           // fail this RPC and close the scanner while opening up another one from the start of
2650           // row that the client has last seen.
2651           closeScanner(region, scanner, scannerName);
2652 
2653           // We closed the scanner already. Instead of throwing the IOException, and client
2654           // retrying with the same scannerId only to get USE on the next RPC, we directly throw
2655           // a special exception to save an RPC.
2656           if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
2657             // 1.4.0+ clients know how to handle
2658             throw new ScannerResetException("Scanner is closed on the server-side", e);
2659           } else {
2660             // older clients do not know about SRE. Just throw USE, which they will handle
2661             throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
2662                 + " scanner state for clients older than 1.3.", e);
2663           }
2664         } finally {
2665           // We're done. On way out re-add the above removed lease.
2666           // Adding resets expiration time on lease.
2667           if (scanners.containsKey(scannerName)) {
2668             if (lease != null) regionServer.leases.addLease(lease);
2669             ttl = this.scannerLeaseTimeoutPeriod;
2670           }
2671         }
2672       }
2673 
2674       if (!moreResults || closeScanner) {
2675         ttl = 0;
2676         moreResults = false;
2677         closeScanner(region, scanner, scannerName);
2678       }
2679 
2680       if (ttl > 0) {
2681         builder.setTtl(ttl);
2682       }
2683       builder.setScannerId(scannerId);
2684       builder.setMoreResults(moreResults);
2685       return builder.build();
2686     } catch (IOException ie) {
2687       if (scannerName != null && ie instanceof NotServingRegionException) {
2688         RegionScannerHolder rsh = scanners.remove(scannerName);
2689         if (rsh != null) {
2690           try {
2691             RegionScanner scanner = rsh.s;
2692             LOG.warn(scannerName + " encountered " + ie.getMessage() + ", closing ...");
2693             scanner.close();
2694             regionServer.leases.cancelLease(scannerName);
2695           } catch (IOException e) {
2696            LOG.warn("Getting exception closing " + scannerName, e);
2697           }
2698         }
2699       }
2700       throw new ServiceException(ie);
2701     } finally {
2702       if (quota != null) {
2703         quota.close();
2704       }
2705     }
2706   }
2707 
2708   private boolean closeScanner(Region region, RegionScanner scanner, String scannerName)
2709       throws IOException {
2710     if (region != null && region.getCoprocessorHost() != null) {
2711       if (region.getCoprocessorHost().preScannerClose(scanner)) {
2712         return true; // bypass
2713       }
2714     }
2715     RegionScannerHolder rsh = scanners.remove(scannerName);
2716     if (rsh != null) {
2717       scanner = rsh.s;
2718       scanner.close();
2719       try {
2720         regionServer.leases.cancelLease(scannerName);
2721       } catch (LeaseException le) {
2722         // No problem, ignore
2723         if (LOG.isTraceEnabled()) {
2724           LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
2725         }
2726       }
2727       if (region != null && region.getCoprocessorHost() != null) {
2728         region.getCoprocessorHost().postScannerClose(scanner);
2729       }
2730     }
2731     return false;
2732   }
2733 
2734   @Override
2735   public CoprocessorServiceResponse execRegionServerService(RpcController controller,
2736       CoprocessorServiceRequest request) throws ServiceException {
2737     return regionServer.execRegionServerService(controller, request);
2738   }
2739 
2740   @Override
2741   public UpdateConfigurationResponse updateConfiguration(
2742       RpcController controller, UpdateConfigurationRequest request)
2743       throws ServiceException {
2744     try {
2745       this.regionServer.updateConfiguration();
2746     } catch (Exception e) {
2747       throw new ServiceException(e);
2748     }
2749     return UpdateConfigurationResponse.getDefaultInstance();
2750   }
2751 }